from enum import Enum, Flag
from functools import lru_cache, partial, wraps
import io
-import keyword
+import itertools
import logging
-from multiprocessing import Manager
+from multiprocessing import Manager, freeze_support
import os
from pathlib import Path
import pickle
import re
import signal
import sys
+import tempfile
import tokenize
from typing import (
Any,
__version__ = "18.9b0"
DEFAULT_LINE_LENGTH = 88
DEFAULT_EXCLUDES = (
- r"/(\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|_build|buck-out|build|dist)/"
+ r"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|_build|buck-out|build|dist)/"
)
DEFAULT_INCLUDES = r"\.pyi?$"
CACHE_DIR = Path(user_cache_dir("black", version=__version__))
pyproject_toml = toml.load(value)
config = pyproject_toml.get("tool", {}).get("black", {})
except (toml.TomlDecodeError, OSError) as e:
- raise click.BadOptionUsage(f"Error reading configuration file: {e}", ctx)
+ raise click.FileError(
+ filename=value, hint=f"Error reading configuration file: {e}"
+ )
if not config:
return None
list(v.visit(code))
-KEYWORDS = set(keyword.kwlist)
WHITESPACE = {token.DEDENT, token.INDENT, token.NEWLINE}
-FLOW_CONTROL = {"return", "raise", "break", "continue"}
STATEMENT = {
syms.if_stmt,
syms.while_stmt,
depth: int = 0
leaves: List[Leaf] = Factory(list)
- comments: List[Tuple[Index, Leaf]] = Factory(list)
+ # The LeafID keys of comments must remain ordered by the corresponding leaf's index
+ # in leaves
+ comments: Dict[LeafID, List[Leaf]] = Factory(dict)
bracket_tracker: BracketTracker = Factory(BracketTracker)
inside_brackets: bool = False
should_explode: bool = False
if comment.type != token.COMMENT:
return False
- after = len(self.leaves) - 1
- if after == -1:
+ if not self.leaves:
comment.type = STANDALONE_COMMENT
comment.prefix = ""
return False
else:
- self.comments.append((after, comment))
- return True
-
- def comments_after(self, leaf: Leaf, _index: int = -1) -> Iterator[Leaf]:
- """Generate comments that should appear directly after `leaf`.
-
- Provide a non-negative leaf `_index` to speed up the function.
- """
- if not self.comments:
- return
-
- if _index == -1:
- for _index, _leaf in enumerate(self.leaves):
- if leaf is _leaf:
- break
-
+ leaf_id = id(self.leaves[-1])
+ if leaf_id not in self.comments:
+ self.comments[leaf_id] = [comment]
else:
- return
+ self.comments[leaf_id].append(comment)
+ return True
- for index, comment_after in self.comments:
- if _index == index:
- yield comment_after
+ def comments_after(self, leaf: Leaf) -> List[Leaf]:
+ """Generate comments that should appear directly after `leaf`."""
+ return self.comments.get(id(leaf), [])
def remove_trailing_comma(self) -> None:
"""Remove the trailing comma and moves the comments attached to it."""
- comma_index = len(self.leaves) - 1
- for i in range(len(self.comments)):
- comment_index, comment = self.comments[i]
- if comment_index == comma_index:
- self.comments[i] = (comma_index - 1, comment)
+ # Remember, the LeafID keys of self.comments are ordered by the
+ # corresponding leaf's index in self.leaves
+ # If id(self.leaves[-2]) is in self.comments, the order doesn't change.
+ # Otherwise, we insert it into self.comments, and it becomes the last entry.
+ # However, since we delete id(self.leaves[-1]) from self.comments, the invariant
+ # is maintained
+ self.comments.setdefault(id(self.leaves[-2]), []).extend(
+ self.comments.get(id(self.leaves[-1]), [])
+ )
+ self.comments.pop(id(self.leaves[-1]), None)
self.leaves.pop()
def is_complex_subscript(self, leaf: Leaf) -> bool:
res = f"{first.prefix}{indent}{first.value}"
for leaf in leaves:
res += str(leaf)
- for _, comment in self.comments:
+ for comment in itertools.chain.from_iterable(self.comments.values()):
res += str(comment)
return res + "\n"
ALWAYS_NO_SPACE = CLOSING_BRACKETS | {token.COMMA, STANDALONE_COMMENT}
-def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str: # noqa C901
+def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str: # noqa: C901
"""Return whitespace prefix if needed for the given `leaf`.
`complex_subscript` signals whether the given leaf is part of a subscription
return
line_str = str(line).strip("\n")
- if not line.should_explode and is_line_short_enough(
- line, line_length=line_length, line_str=line_str
+
+ # we don't want to split special comments like type annotations
+ # https://github.com/python/typing/issues/186
+ has_special_comment = False
+ for leaf in line.leaves:
+ for comment in line.comments_after(leaf):
+ if leaf.type == token.COMMA and is_special_comment(comment):
+ has_special_comment = True
+
+ if (
+ not has_special_comment
+ and not line.should_explode
+ and is_line_short_enough(line, line_length=line_length, line_str=line_str)
):
yield line
return
current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
current_line.append(leaf)
- for index, leaf in enumerate(line.leaves):
+ for leaf in line.leaves:
yield from append_to_line(leaf)
- for comment_after in line.comments_after(leaf, index):
+ for comment_after in line.comments_after(leaf):
yield from append_to_line(comment_after)
lowest_depth = min(lowest_depth, leaf.bracket_depth)
current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
current_line.append(leaf)
- for index, leaf in enumerate(line.leaves):
+ for leaf in line.leaves:
yield from append_to_line(leaf)
- for comment_after in line.comments_after(leaf, index):
+ for comment_after in line.comments_after(leaf):
yield from append_to_line(comment_after)
if current_line:
)
+def is_special_comment(leaf: Leaf) -> bool:
+ """Return True if the given leaf is a special comment.
+ Only returns true for type comments for now."""
+ t = leaf.type
+ v = leaf.value
+ return bool(
+ (t == token.COMMENT or t == STANDALONE_COMMENT) and (v.startswith("# type:"))
+ )
+
+
def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
"""Leave existing extra newlines if not `inside_brackets`. Remove everything
else.
def should_explode(line: Line, opening_bracket: Leaf) -> bool:
"""Should `line` immediately be split with `delimiter_split()` after RHS?"""
+
if not (
opening_bracket.parent
and opening_bracket.parent.type in {syms.atom, syms.import_from}
return # Multiline strings, we can't continue.
comment: Optional[Leaf]
- for comment in line.comments_after(leaf, index):
+ for comment in line.comments_after(leaf):
length += len(comment.value)
yield index, leaf, length
"""Update the cache file."""
cache_file = get_cache_file(line_length, mode)
try:
- if not CACHE_DIR.exists():
- CACHE_DIR.mkdir(parents=True)
+ CACHE_DIR.mkdir(parents=True, exist_ok=True)
new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
- with cache_file.open("wb") as fobj:
- pickle.dump(new_cache, fobj, protocol=pickle.HIGHEST_PROTOCOL)
+ with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
+ pickle.dump(new_cache, f, protocol=pickle.HIGHEST_PROTOCOL)
+ os.replace(f.name, cache_file)
except OSError:
pass
def patched_main() -> None:
+ freeze_support()
patch_click()
main()