from functools import lru_cache, partial, wraps
import io
import itertools
-import keyword
import logging
from multiprocessing import Manager, freeze_support
import os
import re
import signal
import sys
+import tempfile
import tokenize
from typing import (
Any,
list(v.visit(code))
-KEYWORDS = set(keyword.kwlist)
WHITESPACE = {token.DEDENT, token.INDENT, token.NEWLINE}
-FLOW_CONTROL = {"return", "raise", "break", "continue"}
STATEMENT = {
syms.if_stmt,
syms.while_stmt,
ALWAYS_NO_SPACE = CLOSING_BRACKETS | {token.COMMA, STANDALONE_COMMENT}
-def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str: # noqa C901
+def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str: # noqa: C901
"""Return whitespace prefix if needed for the given `leaf`.
`complex_subscript` signals whether the given leaf is part of a subscription
return
line_str = str(line).strip("\n")
- if not line.should_explode and is_line_short_enough(
- line, line_length=line_length, line_str=line_str
+
+ # we don't want to split special comments like type annotations
+ # https://github.com/python/typing/issues/186
+ has_special_comment = False
+ for leaf in line.leaves:
+ for comment in line.comments_after(leaf):
+ if leaf.type == token.COMMA and is_special_comment(comment):
+ has_special_comment = True
+
+ if (
+ not has_special_comment
+ and not line.should_explode
+ and is_line_short_enough(line, line_length=line_length, line_str=line_str)
):
yield line
return
)
+def is_special_comment(leaf: Leaf) -> bool:
+ """Return True if the given leaf is a special comment.
+ Only returns true for type comments for now."""
+ t = leaf.type
+ v = leaf.value
+ return bool(
+ (t == token.COMMENT or t == STANDALONE_COMMENT) and (v.startswith("# type:"))
+ )
+
+
def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
"""Leave existing extra newlines if not `inside_brackets`. Remove everything
else.
def should_explode(line: Line, opening_bracket: Leaf) -> bool:
"""Should `line` immediately be split with `delimiter_split()` after RHS?"""
+
if not (
opening_bracket.parent
and opening_bracket.parent.type in {syms.atom, syms.import_from}
"""Update the cache file."""
cache_file = get_cache_file(line_length, mode)
try:
- if not CACHE_DIR.exists():
- CACHE_DIR.mkdir(parents=True)
+ CACHE_DIR.mkdir(parents=True, exist_ok=True)
new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
- with cache_file.open("wb") as fobj:
- pickle.dump(new_cache, fobj, protocol=pickle.HIGHEST_PROTOCOL)
+ with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
+ pickle.dump(new_cache, f, protocol=pickle.HIGHEST_PROTOCOL)
+ os.replace(f.name, cache_file)
except OSError:
pass