import pickle
from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
-from enum import Enum
+from enum import Enum, Flag
from functools import partial, wraps
import keyword
import logging
List,
Optional,
Pattern,
+ Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
+ cast,
)
from appdirs import user_cache_dir
from blib2to3.pgen2 import driver, token
from blib2to3.pgen2.parse import ParseError
-__version__ = "18.4a6"
+
+__version__ = "18.5b1"
DEFAULT_LINE_LENGTH = 88
+CACHE_DIR = Path(user_cache_dir("black", version=__version__))
+
# types
-syms = pygram.python_symbols
FileContent = str
Encoding = str
Depth = int
out = partial(click.secho, bold=True, err=True)
err = partial(click.secho, fg="red", err=True)
+pygram.initialize(CACHE_DIR)
+syms = pygram.python_symbols
+
class NothingChanged(UserWarning):
"""Raised by :func:`format_file` when reformatted code is the same as source."""
YES = 2
+class FileMode(Flag):
+ AUTO_DETECT = 0
+ PYTHON36 = 1
+ PYI = 2
+
+
@click.command()
@click.option(
"-l",
"silence those with 2>/dev/null."
),
)
+@click.option(
+ "--pyi",
+ is_flag=True,
+ help=(
+ "Consider all input files typing stubs regardless of file extension "
+ "(useful when piping source on standard input)."
+ ),
+)
+@click.option(
+ "--py36",
+ is_flag=True,
+ help=(
+ "Allow using Python 3.6-only syntax on all input files. This will put "
+ "trailing commas in function signatures and calls also after *args and "
+ "**kwargs. [default: per-file auto-detection]"
+ ),
+)
@click.version_option(version=__version__)
@click.argument(
"src",
check: bool,
diff: bool,
fast: bool,
+ pyi: bool,
+ py36: bool,
quiet: bool,
src: List[str],
) -> None:
write_back = WriteBack.DIFF
else:
write_back = WriteBack.YES
+ mode = FileMode.AUTO_DETECT
+ if py36:
+ mode |= FileMode.PYTHON36
+ if pyi:
+ mode |= FileMode.PYI
report = Report(check=check, quiet=quiet)
if len(sources) == 0:
out("No paths given. Nothing to do 😴")
return
elif len(sources) == 1:
- reformat_one(sources[0], line_length, fast, write_back, report)
+ reformat_one(
+ src=sources[0],
+ line_length=line_length,
+ fast=fast,
+ write_back=write_back,
+ mode=mode,
+ report=report,
+ )
else:
loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=os.cpu_count())
try:
loop.run_until_complete(
schedule_formatting(
- sources, line_length, fast, write_back, report, loop, executor
+ sources=sources,
+ line_length=line_length,
+ fast=fast,
+ write_back=write_back,
+ mode=mode,
+ report=report,
+ loop=loop,
+ executor=executor,
)
)
finally:
def reformat_one(
- src: Path, line_length: int, fast: bool, write_back: WriteBack, report: "Report"
+ src: Path,
+ line_length: int,
+ fast: bool,
+ write_back: WriteBack,
+ mode: FileMode,
+ report: "Report",
) -> None:
"""Reformat a single file under `src` without spawning child processes.
If `quiet` is True, non-error messages are not output. `line_length`,
- `write_back`, and `fast` options are passed to :func:`format_file_in_place`.
+ `write_back`, `fast` and `pyi` options are passed to
+ :func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
"""
try:
changed = Changed.NO
if not src.is_file() and str(src) == "-":
if format_stdin_to_stdout(
- line_length=line_length, fast=fast, write_back=write_back
+ line_length=line_length, fast=fast, write_back=write_back, mode=mode
):
changed = Changed.YES
else:
cache: Cache = {}
if write_back != WriteBack.DIFF:
- cache = read_cache(line_length)
+ cache = read_cache(line_length, mode)
src = src.resolve()
if src in cache and cache[src] == get_cache_info(src):
changed = Changed.CACHED
- if (
- changed is not Changed.CACHED
- and format_file_in_place(
- src, line_length=line_length, fast=fast, write_back=write_back
- )
+ if changed is not Changed.CACHED and format_file_in_place(
+ src,
+ line_length=line_length,
+ fast=fast,
+ write_back=write_back,
+ mode=mode,
):
changed = Changed.YES
if write_back == WriteBack.YES and changed is not Changed.NO:
- write_cache(cache, [src], line_length)
+ write_cache(cache, [src], line_length, mode)
report.done(src, changed)
except Exception as exc:
report.failed(src, str(exc))
line_length: int,
fast: bool,
write_back: WriteBack,
+ mode: FileMode,
report: "Report",
loop: BaseEventLoop,
executor: Executor,
(Use ProcessPoolExecutors for actual parallelism.)
- `line_length`, `write_back`, and `fast` options are passed to
+ `line_length`, `write_back`, `fast`, and `pyi` options are passed to
:func:`format_file_in_place`.
"""
cache: Cache = {}
if write_back != WriteBack.DIFF:
- cache = read_cache(line_length)
+ cache = read_cache(line_length, mode)
sources, cached = filter_cached(cache, sources)
for src in cached:
report.done(src, Changed.CACHED)
manager = Manager()
lock = manager.Lock()
tasks = {
- src: loop.run_in_executor(
- executor, format_file_in_place, src, line_length, fast, write_back, lock
- )
- for src in sources
+ loop.run_in_executor(
+ executor,
+ format_file_in_place,
+ src,
+ line_length,
+ fast,
+ write_back,
+ mode,
+ lock,
+ ): src
+ for src in sorted(sources)
}
- _task_values = list(tasks.values())
+ pending: Iterable[asyncio.Task] = tasks.keys()
try:
- loop.add_signal_handler(signal.SIGINT, cancel, _task_values)
- loop.add_signal_handler(signal.SIGTERM, cancel, _task_values)
+ loop.add_signal_handler(signal.SIGINT, cancel, pending)
+ loop.add_signal_handler(signal.SIGTERM, cancel, pending)
except NotImplementedError:
# There are no good alternatives for these on Windows
pass
- await asyncio.wait(_task_values)
- for src, task in tasks.items():
- if not task.done():
- report.failed(src, "timed out, cancelling")
- task.cancel()
- cancelled.append(task)
- elif task.cancelled():
- cancelled.append(task)
- elif task.exception():
- report.failed(src, str(task.exception()))
- else:
- formatted.append(src)
- report.done(src, Changed.YES if task.result() else Changed.NO)
-
+ while pending:
+ done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
+ for task in done:
+ src = tasks.pop(task)
+ if task.cancelled():
+ cancelled.append(task)
+ elif task.exception():
+ report.failed(src, str(task.exception()))
+ else:
+ formatted.append(src)
+ report.done(src, Changed.YES if task.result() else Changed.NO)
if cancelled:
await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
if write_back == WriteBack.YES and formatted:
- write_cache(cache, formatted, line_length)
+ write_cache(cache, formatted, line_length, mode)
def format_file_in_place(
line_length: int,
fast: bool,
write_back: WriteBack = WriteBack.NO,
+ mode: FileMode = FileMode.AUTO_DETECT,
lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
) -> bool:
"""Format file under `src` path. Return True if changed.
If `write_back` is True, write reformatted code back to stdout.
`line_length` and `fast` options are passed to :func:`format_file_contents`.
"""
-
+ if src.suffix == ".pyi":
+ mode |= FileMode.PYI
with tokenize.open(src) as src_buffer:
src_contents = src_buffer.read()
try:
dst_contents = format_file_contents(
- src_contents, line_length=line_length, fast=fast
+ src_contents, line_length=line_length, fast=fast, mode=mode
)
except NothingChanged:
return False
def format_stdin_to_stdout(
- line_length: int, fast: bool, write_back: WriteBack = WriteBack.NO
+ line_length: int,
+ fast: bool,
+ write_back: WriteBack = WriteBack.NO,
+ mode: FileMode = FileMode.AUTO_DETECT,
) -> bool:
"""Format file on stdin. Return True if changed.
If `write_back` is True, write reformatted code back to stdout.
- `line_length` and `fast` arguments are passed to :func:`format_file_contents`.
+ `line_length`, `fast`, `is_pyi`, and `force_py36` arguments are passed to
+ :func:`format_file_contents`.
"""
src = sys.stdin.read()
dst = src
try:
- dst = format_file_contents(src, line_length=line_length, fast=fast)
+ dst = format_file_contents(src, line_length=line_length, fast=fast, mode=mode)
return True
except NothingChanged:
def format_file_contents(
- src_contents: str, line_length: int, fast: bool
+ src_contents: str,
+ *,
+ line_length: int,
+ fast: bool,
+ mode: FileMode = FileMode.AUTO_DETECT,
) -> FileContent:
"""Reformat contents a file and return new contents.
if src_contents.strip() == "":
raise NothingChanged
- dst_contents = format_str(src_contents, line_length=line_length)
+ dst_contents = format_str(src_contents, line_length=line_length, mode=mode)
if src_contents == dst_contents:
raise NothingChanged
if not fast:
assert_equivalent(src_contents, dst_contents)
- assert_stable(src_contents, dst_contents, line_length=line_length)
+ assert_stable(src_contents, dst_contents, line_length=line_length, mode=mode)
return dst_contents
-def format_str(src_contents: str, line_length: int) -> FileContent:
+def format_str(
+ src_contents: str, line_length: int, *, mode: FileMode = FileMode.AUTO_DETECT
+) -> FileContent:
"""Reformat a string and return new contents.
`line_length` determines how many characters per line are allowed.
src_node = lib2to3_parse(src_contents)
dst_contents = ""
future_imports = get_future_imports(src_node)
- py36 = is_python36(src_node)
- lines = LineGenerator(remove_u_prefix=py36 or "unicode_literals" in future_imports)
- elt = EmptyLineTracker()
+ is_pyi = bool(mode & FileMode.PYI)
+ py36 = bool(mode & FileMode.PYTHON36) or is_python36(src_node)
+ lines = LineGenerator(
+ remove_u_prefix=py36 or "unicode_literals" in future_imports, is_pyi=is_pyi
+ )
+ elt = EmptyLineTracker(is_pyi=is_pyi)
empty_line = Line()
after = 0
for current_line in lines.visit(src_node):
STRING_PRIORITY = 12
COMPARATOR_PRIORITY = 10
MATH_PRIORITIES = {
- token.VBAR: 8,
- token.CIRCUMFLEX: 7,
- token.AMPER: 6,
- token.LEFTSHIFT: 5,
- token.RIGHTSHIFT: 5,
- token.PLUS: 4,
- token.MINUS: 4,
- token.STAR: 3,
- token.SLASH: 3,
- token.DOUBLESLASH: 3,
- token.PERCENT: 3,
- token.AT: 3,
- token.TILDE: 2,
- token.DOUBLESTAR: 1,
+ token.VBAR: 9,
+ token.CIRCUMFLEX: 8,
+ token.AMPER: 7,
+ token.LEFTSHIFT: 6,
+ token.RIGHTSHIFT: 6,
+ token.PLUS: 5,
+ token.MINUS: 5,
+ token.STAR: 4,
+ token.SLASH: 4,
+ token.DOUBLESLASH: 4,
+ token.PERCENT: 4,
+ token.AT: 4,
+ token.TILDE: 3,
+ token.DOUBLESTAR: 2,
}
+DOT_PRIORITY = 1
@dataclass
"""
return max(v for k, v in self.delimiters.items() if k not in exclude)
+ def delimiter_count_with_priority(self, priority: int = 0) -> int:
+ """Return the number of delimiters with the given `priority`.
+
+ If no `priority` is passed, defaults to max priority on the line.
+ """
+ if not self.delimiters:
+ return 0
+
+ priority = priority or self.max_delimiter_priority()
+ return sum(1 for p in self.delimiters.values() if p == priority)
+
def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
"""In a for loop, or comprehension, the variables are often unpacks.
comments: List[Tuple[Index, Leaf]] = Factory(list)
bracket_tracker: BracketTracker = Factory(BracketTracker)
inside_brackets: bool = False
+ should_explode: bool = False
def append(self, leaf: Leaf, preformatted: bool = False) -> None:
"""Add a new `leaf` to the end of the line.
and self.leaves[0].value == "class"
)
+ @property
+ def is_stub_class(self) -> bool:
+ """Is this line a class definition with a body consisting only of "..."?"""
+ return self.is_class and self.leaves[-3:] == [
+ Leaf(token.DOT, ".") for _ in range(3)
+ ]
+
@property
def is_def(self) -> bool:
"""Is this a function definition? (Also returns True for async defs.)"""
second_leaf: Optional[Leaf] = self.leaves[1]
except IndexError:
second_leaf = None
- return (
- (first_leaf.type == token.NAME and first_leaf.value == "def")
- or (
- first_leaf.type == token.ASYNC
- and second_leaf is not None
- and second_leaf.type == token.NAME
- and second_leaf.value == "def"
- )
- )
-
- @property
- def is_flow_control(self) -> bool:
- """Is this line a flow control statement?
-
- Those are `return`, `raise`, `break`, and `continue`.
- """
- return (
- bool(self)
- and self.leaves[0].type == token.NAME
- and self.leaves[0].value in FLOW_CONTROL
- )
-
- @property
- def is_yield(self) -> bool:
- """Is this line a yield statement?"""
- return (
- bool(self)
- and self.leaves[0].type == token.NAME
- and self.leaves[0].value == "yield"
+ return (first_leaf.type == token.NAME and first_leaf.value == "def") or (
+ first_leaf.type == token.ASYNC
+ and second_leaf is not None
+ and second_leaf.type == token.NAME
+ and second_leaf.value == "def"
)
@property
and self.leaves[3].value == ")"
)
+ @property
+ def is_triple_quoted_string(self) -> bool:
+ """Is the line a triple quoted string?"""
+ return (
+ bool(self)
+ and self.leaves[0].type == token.STRING
+ and self.leaves[0].value.startswith(('"""', "'''"))
+ )
+
def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
"""If so, needs to be split before emitting."""
for leaf in self.leaves:
self.comments.append((after, comment))
return True
- def comments_after(self, leaf: Leaf) -> Iterator[Leaf]:
- """Generate comments that should appear directly after `leaf`."""
- for _leaf_index, _leaf in enumerate(self.leaves):
- if leaf is _leaf:
- break
+ def comments_after(self, leaf: Leaf, _index: int = -1) -> Iterator[Leaf]:
+ """Generate comments that should appear directly after `leaf`.
- else:
- return
+ Provide a non-negative leaf `_index` to speed up the function.
+ """
+ if _index == -1:
+ for _index, _leaf in enumerate(self.leaves):
+ if leaf is _leaf:
+ break
+
+ else:
+ return
for index, comment_after in self.comments:
- if _leaf_index == index:
+ if _index == index:
yield comment_after
def remove_trailing_comma(self) -> None:
and subscript_start.type == syms.subscriptlist
):
subscript_start = child_towards(subscript_start, leaf)
- return (
- subscript_start is not None
- and any(n.type in TEST_DESCENDANTS for n in subscript_start.pre_order())
+ return subscript_start is not None and any(
+ n.type in TEST_DESCENDANTS for n in subscript_start.pre_order()
)
def __str__(self) -> str:
the prefix of the first leaf consists of optional newlines. Those newlines
are consumed by `maybe_empty_lines()` and included in the computation.
"""
+
+ is_pyi: bool = False
previous_line: Optional[Line] = None
previous_after: int = 0
previous_defs: List[int] = Factory(list)
"""Return the number of extra empty lines before and after the `current_line`.
This is for separating `def`, `async def` and `class` with extra empty
- lines (two on module-level), as well as providing an extra empty line
- after flow control keywords to make them more prominent.
+ lines (two on module-level).
"""
if isinstance(current_line, UnformattedLines):
return 0, 0
def _maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
max_allowed = 1
if current_line.depth == 0:
- max_allowed = 2
+ max_allowed = 1 if self.is_pyi else 2
if current_line.leaves:
# Consume the first leaf's extra newlines.
first_leaf = current_line.leaves[0]
depth = current_line.depth
while self.previous_defs and self.previous_defs[-1] >= depth:
self.previous_defs.pop()
- before = 1 if depth else 2
+ if self.is_pyi:
+ before = 0 if depth else 1
+ else:
+ before = 1 if depth else 2
is_decorator = current_line.is_decorator
if is_decorator or current_line.is_def or current_line.is_class:
if not is_decorator:
if self.previous_line.is_decorator:
return 0, 0
+ if self.previous_line.depth < current_line.depth and (
+ self.previous_line.is_class or self.previous_line.is_def
+ ):
+ return 0, 0
+
if (
self.previous_line.is_comment
and self.previous_line.depth == current_line.depth
):
return 0, 0
- newlines = 2
- if current_line.depth:
+ if self.is_pyi:
+ if self.previous_line.depth > current_line.depth:
+ newlines = 1
+ elif current_line.is_class or self.previous_line.is_class:
+ if current_line.is_stub_class and self.previous_line.is_stub_class:
+ newlines = 0
+ else:
+ newlines = 1
+ else:
+ newlines = 0
+ else:
+ newlines = 2
+ if current_line.depth and newlines:
newlines -= 1
return newlines, 0
):
return (before or 1), 0
+ if (
+ self.previous_line
+ and self.previous_line.is_class
+ and current_line.is_triple_quoted_string
+ ):
+ return before, 1
+
return before, 0
Note: destroys the tree it's visiting by mutating prefixes of its leaves
in ways that will no longer stringify to valid Python code on the tree.
"""
+
+ is_pyi: bool = False
current_line: Line = Factory(Line)
remove_u_prefix: bool = False
The relevant Python language `keywords` for a given statement will be
NAME leaves within it. This methods puts those on a separate line.
- `parens` holds a set of string leaf values immeditely after which
+ `parens` holds a set of string leaf values immediately after which
invisible parens should be put.
"""
normalize_invisible_parens(node, parens_after=parens)
yield from self.visit(child)
+ def visit_suite(self, node: Node) -> Iterator[Line]:
+ """Visit a suite."""
+ if self.is_pyi and is_stub_suite(node):
+ yield from self.visit(node.children[2])
+ else:
+ yield from self.visit_default(node)
+
def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
"""Visit a statement without nested statements."""
is_suite_like = node.parent and node.parent.type in STATEMENT
if is_suite_like:
- yield from self.line(+1)
- yield from self.visit_default(node)
- yield from self.line(-1)
+ if self.is_pyi and is_stub_body(node):
+ yield from self.visit_default(node)
+ else:
+ yield from self.line(+1)
+ yield from self.visit_default(node)
+ yield from self.line(-1)
else:
- yield from self.line()
+ if not self.is_pyi or not node.parent or not is_stub_suite(node.parent):
+ yield from self.line()
yield from self.visit_default(node)
def visit_async_stmt(self, node: Node) -> Iterator[Line]:
yield from self.line()
yield from self.visit(child)
- def visit_import_from(self, node: Node) -> Iterator[Line]:
- """Visit import_from and maybe put invisible parentheses.
-
- This is separate from `visit_stmt` because import statements don't
- support arbitrary atoms and thus handling of parentheses is custom.
- """
- check_lpar = False
- for index, child in enumerate(node.children):
- if check_lpar:
- if child.type == token.LPAR:
- # make parentheses invisible
- child.value = "" # type: ignore
- node.children[-1].value = "" # type: ignore
- else:
- # insert invisible parentheses
- node.insert_child(index, Leaf(token.LPAR, ""))
- node.append_child(Leaf(token.RPAR, ""))
- break
-
- check_lpar = (
- child.type == token.NAME and child.value == "import" # type: ignore
- )
-
- for child in node.children:
- yield from self.visit(child)
-
def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
"""Remove a semicolon and put the other statement on a separate line."""
yield from self.line()
self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
+ self.visit_import_from = partial(v, keywords=Ø, parens={"import"})
self.visit_async_funcdef = self.visit_async_stmt
self.visit_decorated = self.visit_decorators
return DOUBLESPACE
assert p is not None, f"INTERNAL ERROR: hand-made leaf without parent: {leaf!r}"
- if (
- t == token.COLON
- and p.type not in {syms.subscript, syms.subscriptlist, syms.sliceop}
- ):
+ if t == token.COLON and p.type not in {
+ syms.subscript,
+ syms.subscriptlist,
+ syms.sliceop,
+ }:
return NO
prev = leaf.prev_sibling
if prevp.type == token.EQUAL:
if prevp.parent:
if prevp.parent.type in {
- syms.arglist, syms.argument, syms.parameters, syms.varargslist
+ syms.arglist,
+ syms.argument,
+ syms.parameters,
+ syms.varargslist,
}:
return NO
prevp_parent = prevp.parent
assert prevp_parent is not None
- if (
- prevp.type == token.COLON
- and prevp_parent.type in {syms.subscript, syms.sliceop}
- ):
+ if prevp.type == token.COLON and prevp_parent.type in {
+ syms.subscript,
+ syms.sliceop,
+ }:
return NO
elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
# Don't treat them as a delimiter.
return 0
+ if (
+ leaf.type == token.DOT
+ and leaf.parent
+ and leaf.parent.type not in {syms.import_from, syms.dotted_name}
+ and (previous is None or previous.type in CLOSING_BRACKETS)
+ ):
+ return DOT_PRIORITY
+
if (
leaf.type in MATH_OPERATORS
and leaf.parent
):
return STRING_PRIORITY
+ if leaf.type != token.NAME:
+ return 0
+
if (
- leaf.type == token.NAME
- and leaf.value == "for"
+ leaf.value == "for"
and leaf.parent
and leaf.parent.type in {syms.comp_for, syms.old_comp_for}
):
return COMPREHENSION_PRIORITY
if (
- leaf.type == token.NAME
- and leaf.value == "if"
+ leaf.value == "if"
and leaf.parent
and leaf.parent.type in {syms.comp_if, syms.old_comp_if}
):
return COMPREHENSION_PRIORITY
+ if leaf.value in {"if", "else"} and leaf.parent and leaf.parent.type == syms.test:
+ return TERNARY_PRIORITY
+
+ if leaf.value == "is":
+ return COMPARATOR_PRIORITY
+
+ if (
+ leaf.value == "in"
+ and leaf.parent
+ and leaf.parent.type in {syms.comp_op, syms.comparison}
+ and not (
+ previous is not None
+ and previous.type == token.NAME
+ and previous.value == "not"
+ )
+ ):
+ return COMPARATOR_PRIORITY
+
if (
- leaf.type == token.NAME
- and leaf.value in {"if", "else"}
+ leaf.value == "not"
and leaf.parent
- and leaf.parent.type == syms.test
+ and leaf.parent.type == syms.comp_op
+ and not (
+ previous is not None
+ and previous.type == token.NAME
+ and previous.value == "is"
+ )
):
- return TERNARY_PRIORITY
+ return COMPARATOR_PRIORITY
- if leaf.type == token.NAME and leaf.value in LOGIC_OPERATORS and leaf.parent:
+ if leaf.value in LOGIC_OPERATORS and leaf.parent:
return LOGIC_PRIORITY
return 0
-def generate_comments(leaf: Leaf) -> Iterator[Leaf]:
+def generate_comments(leaf: LN) -> Iterator[Leaf]:
"""Clean the prefix of the `leaf` and generate comments from it, if any.
Comments in lib2to3 are shoved into the whitespace prefix. This happens
return
line_str = str(line).strip("\n")
- if (
- len(line_str) <= line_length
- and "\n" not in line_str # multiline strings
- and not line.contains_standalone_comments()
+ if not line.should_explode and is_line_short_enough(
+ line, line_length=line_length, line_str=line_str
):
yield line
return
split_funcs: List[SplitFunc]
if line.is_def:
split_funcs = [left_hand_split]
- elif line.is_import:
- split_funcs = [explode_split]
- elif line.inside_brackets:
- split_funcs = [delimiter_split, standalone_comment_split, right_hand_split]
else:
- split_funcs = [right_hand_split]
+
+ def rhs(line: Line, py36: bool = False) -> Iterator[Line]:
+ for omit in generate_trailers_to_omit(line, line_length):
+ lines = list(right_hand_split(line, line_length, py36, omit=omit))
+ if is_line_short_enough(lines[0], line_length=line_length):
+ yield from lines
+ return
+
+ # All splits failed, best effort split with no omits.
+ # This mostly happens to multiline strings that are by definition
+ # reported as not fitting a single line.
+ yield from right_hand_split(line, py36)
+
+ if line.inside_brackets:
+ split_funcs = [delimiter_split, standalone_comment_split, rhs]
+ else:
+ split_funcs = [rhs]
for split_func in split_funcs:
# We are accumulating lines in `result` because we might want to abort
# mission and return the original line in the end, or attempt a different
def right_hand_split(
- line: Line, py36: bool = False, omit: Collection[LeafID] = ()
+ line: Line, line_length: int, py36: bool = False, omit: Collection[LeafID] = ()
) -> Iterator[Line]:
"""Split line into many lines, starting with the last matching bracket pair.
If the split was by optional parentheses, attempt splitting without them, too.
+ `omit` is a collection of closing bracket IDs that shouldn't be considered for
+ this split.
+
+ Note: running this function modifies `bracket_depth` on the leaves of `line`.
"""
head = Line(depth=line.depth)
body = Line(depth=line.depth + 1, inside_brackets=True)
# Since body is a new indent level, remove spurious leading whitespace.
if body_leaves:
normalize_prefix(body_leaves[0], inside_brackets=True)
- elif not head_leaves:
- # No `head` and no `body` means the split failed. `tail` has all content.
+ if not head_leaves:
+ # No `head` means the split failed. Either `tail` has all content or
+ # the matching `opening_bracket` wasn't available on `line` anymore.
raise CannotSplit("No brackets found")
# Build the new lines.
# the closing bracket is an optional paren
and closing_bracket.type == token.RPAR
and not closing_bracket.value
- # there are no delimiters or standalone comments in the body
- and not body.bracket_tracker.delimiters
+ # there are no standalone comments in the body
and not line.contains_standalone_comments(0)
# and it's not an import (optional parens are the only thing we can split
# on in this case; attempting a split without them is a waste of time)
and not line.is_import
):
omit = {id(closing_bracket), *omit}
- try:
- yield from right_hand_split(line, py36=py36, omit=omit)
- return
- except CannotSplit:
- pass
+ if can_omit_invisible_parens(body, line_length):
+ try:
+ yield from right_hand_split(line, line_length, py36=py36, omit=omit)
+ return
+ except CannotSplit:
+ pass
ensure_visible(opening_bracket)
ensure_visible(closing_bracket)
+ body.should_explode = should_explode(body, opening_bracket)
for result in (head, body, tail):
if result:
yield result
except IndexError:
raise CannotSplit("Line empty")
- delimiters = line.bracket_tracker.delimiters
+ bt = line.bracket_tracker
try:
- delimiter_priority = line.bracket_tracker.max_delimiter_priority(
- exclude={id(last_leaf)}
- )
+ delimiter_priority = bt.max_delimiter_priority(exclude={id(last_leaf)})
except ValueError:
raise CannotSplit("No delimiters found")
+ if delimiter_priority == DOT_PRIORITY:
+ if bt.delimiter_count_with_priority(delimiter_priority) == 1:
+ raise CannotSplit("Splitting a single attribute from its owner looks wrong")
+
current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
lowest_depth = sys.maxsize
trailing_comma_safe = True
current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
current_line.append(leaf)
- for leaf in line.leaves:
+ for index, leaf in enumerate(line.leaves):
yield from append_to_line(leaf)
- for comment_after in line.comments_after(leaf):
+ for comment_after in line.comments_after(leaf, index):
yield from append_to_line(comment_after)
lowest_depth = min(lowest_depth, leaf.bracket_depth)
- if (
- leaf.bracket_depth == lowest_depth
- and is_vararg(leaf, within=VARARGS_PARENTS)
+ if leaf.bracket_depth == lowest_depth and is_vararg(
+ leaf, within=VARARGS_PARENTS
):
trailing_comma_safe = trailing_comma_safe and py36
- leaf_priority = delimiters.get(id(leaf))
+ leaf_priority = bt.delimiters.get(id(leaf))
if leaf_priority == delimiter_priority:
yield current_line
current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
current_line.append(leaf)
- for leaf in line.leaves:
+ for index, leaf in enumerate(line.leaves):
yield from append_to_line(leaf)
- for comment_after in line.comments_after(leaf):
+ for comment_after in line.comments_after(leaf, index):
yield from append_to_line(comment_after)
if current_line:
yield current_line
-def explode_split(
- line: Line, py36: bool = False, omit: Collection[LeafID] = ()
-) -> Iterator[Line]:
- """Split by rightmost bracket and immediately split contents by a delimiter."""
- new_lines = list(right_hand_split(line, py36, omit))
- if len(new_lines) != 3:
- yield from new_lines
- return
-
- yield new_lines[0]
-
- try:
- yield from delimiter_split(new_lines[1], py36)
-
- except CannotSplit:
- yield new_lines[1]
-
- yield new_lines[2]
-
-
def is_import(leaf: Leaf) -> bool:
"""Return True if the given leaf starts an import statement."""
p = leaf.parent
Standardizes on visible parentheses for single-element tuples, and keeps
existing visible parentheses for other tuples and generator expressions.
"""
+ try:
+ list(generate_comments(node))
+ except FormatOff:
+ return # This `node` has a prefix with `# fmt: off`, don't mess with parens.
+
check_lpar = False
- for child in list(node.children):
+ for index, child in enumerate(list(node.children)):
if check_lpar:
if child.type == syms.atom:
maybe_make_parens_invisible_in_atom(child)
# wrap child in visible parentheses
lpar = Leaf(token.LPAR, "(")
rpar = Leaf(token.RPAR, ")")
- index = child.remove() or 0
+ child.remove()
node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
- else:
+ elif node.type == syms.import_from:
+ # "import from" nodes store parentheses directly as part of
+ # the statement
+ if child.type == token.LPAR:
+ # make parentheses invisible
+ child.value = "" # type: ignore
+ node.children[-1].value = "" # type: ignore
+ elif child.type != token.STAR:
+ # insert invisible parentheses
+ node.insert_child(index, Leaf(token.LPAR, ""))
+ node.append_child(Leaf(token.RPAR, ""))
+ break
+
+ elif not (isinstance(child, Leaf) and is_multiline_string(child)):
# wrap child in invisible parentheses
lpar = Leaf(token.LPAR, "")
rpar = Leaf(token.RPAR, "")
return p.type in within
+def is_multiline_string(leaf: Leaf) -> bool:
+ """Return True if `leaf` is a multiline string that actually spans many lines."""
+ value = leaf.value.lstrip("furbFURB")
+ return value[:3] in {'"""', "'''"} and "\n" in value
+
+
+def is_stub_suite(node: Node) -> bool:
+ """Return True if `node` is a suite with a stub body."""
+ if (
+ len(node.children) != 4
+ or node.children[0].type != token.NEWLINE
+ or node.children[1].type != token.INDENT
+ or node.children[3].type != token.DEDENT
+ ):
+ return False
+
+ return is_stub_body(node.children[2])
+
+
+def is_stub_body(node: LN) -> bool:
+ """Return True if `node` is a simple statement containing an ellipsis."""
+ if not isinstance(node, Node) or node.type != syms.simple_stmt:
+ return False
+
+ if len(node.children) != 2:
+ return False
+
+ child = node.children[0]
+ return (
+ child.type == syms.atom
+ and len(child.children) == 3
+ and all(leaf == Leaf(token.DOT, ".") for leaf in child.children)
+ )
+
+
def max_delimiter_priority_in_atom(node: LN) -> int:
"""Return maximum delimiter priority inside `node`.
leaf.value = ")"
+def should_explode(line: Line, opening_bracket: Leaf) -> bool:
+ """Should `line` immediately be split with `delimiter_split()` after RHS?"""
+ if not (
+ opening_bracket.parent
+ and opening_bracket.parent.type in {syms.atom, syms.import_from}
+ and opening_bracket.value in "[{("
+ ):
+ return False
+
+ try:
+ last_leaf = line.leaves[-1]
+ exclude = {id(last_leaf)} if last_leaf.type == token.COMMA else set()
+ max_priority = line.bracket_tracker.max_delimiter_priority(exclude=exclude)
+ except (IndexError, ValueError):
+ return False
+
+ return max_priority == COMMA_PRIORITY
+
+
def is_python36(node: Node) -> bool:
"""Return True if the current file is using Python 3.6+ features.
return False
+def generate_trailers_to_omit(line: Line, line_length: int) -> Iterator[Set[LeafID]]:
+ """Generate sets of closing bracket IDs that should be omitted in a RHS.
+
+ Brackets can be omitted if the entire trailer up to and including
+ a preceding closing bracket fits in one line.
+
+ Yielded sets are cumulative (contain results of previous yields, too). First
+ set is empty.
+ """
+
+ omit: Set[LeafID] = set()
+ yield omit
+
+ length = 4 * line.depth
+ opening_bracket = None
+ closing_bracket = None
+ optional_brackets: Set[LeafID] = set()
+ inner_brackets: Set[LeafID] = set()
+ for index, leaf, leaf_length in enumerate_with_length(line, reversed=True):
+ length += leaf_length
+ if length > line_length:
+ break
+
+ has_inline_comment = leaf_length > len(leaf.value) + len(leaf.prefix)
+ if leaf.type == STANDALONE_COMMENT or has_inline_comment:
+ break
+
+ optional_brackets.discard(id(leaf))
+ if opening_bracket:
+ if leaf is opening_bracket:
+ opening_bracket = None
+ elif leaf.type in CLOSING_BRACKETS:
+ inner_brackets.add(id(leaf))
+ elif leaf.type in CLOSING_BRACKETS:
+ if not leaf.value:
+ optional_brackets.add(id(opening_bracket))
+ continue
+
+ if index > 0 and line.leaves[index - 1].type in OPENING_BRACKETS:
+ # Empty brackets would fail a split so treat them as "inner"
+ # brackets (e.g. only add them to the `omit` set if another
+ # pair of brackets was good enough.
+ inner_brackets.add(id(leaf))
+ continue
+
+ opening_bracket = leaf.opening_bracket
+ if closing_bracket:
+ omit.add(id(closing_bracket))
+ omit.update(inner_brackets)
+ inner_brackets.clear()
+ yield omit
+ closing_bracket = leaf
+
+
def get_future_imports(node: Node) -> Set[str]:
"""Return a set of __future__ imports in the file."""
imports = set()
return imports
-PYTHON_EXTENSIONS = {".py"}
+PYTHON_EXTENSIONS = {".py", ".pyi"}
BLACKLISTED_DIRECTORIES = {
- "build", "buck-out", "dist", "_build", ".git", ".hg", ".mypy_cache", ".tox", ".venv"
+ "build",
+ "buck-out",
+ "dist",
+ "_build",
+ ".git",
+ ".hg",
+ ".mypy_cache",
+ ".tox",
+ ".venv",
}
@dataclass
class Report:
"""Provides a reformatting counter. Can be rendered with `str(report)`."""
+
check: bool = False
quiet: bool = False
change_count: int = 0
) from None
-def assert_stable(src: str, dst: str, line_length: int) -> None:
+def assert_stable(
+ src: str, dst: str, line_length: int, mode: FileMode = FileMode.AUTO_DETECT
+) -> None:
"""Raise AssertionError if `dst` reformats differently the second time."""
- newdst = format_str(dst, line_length=line_length)
+ newdst = format_str(dst, line_length=line_length, mode=mode)
if dst != newdst:
log = dump_to_file(
diff(src, dst, "source", "first pass"),
)
-def cancel(tasks: List[asyncio.Task]) -> None:
+def cancel(tasks: Iterable[asyncio.Task]) -> None:
"""asyncio signal handler that cancels all `tasks` and reports to stderr."""
err("Aborted!")
for task in tasks:
return regex.sub(replacement, regex.sub(replacement, original))
-CACHE_DIR = Path(user_cache_dir("black", version=__version__))
+def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
+ """Like `reversed(enumerate(sequence))` if that were possible."""
+ index = len(sequence) - 1
+ for element in reversed(sequence):
+ yield (index, element)
+ index -= 1
+
+
+def enumerate_with_length(
+ line: Line, reversed: bool = False
+) -> Iterator[Tuple[Index, Leaf, int]]:
+ """Return an enumeration of leaves with their length.
+
+ Stops prematurely on multiline strings and standalone comments.
+ """
+ op = cast(
+ Callable[[Sequence[Leaf]], Iterator[Tuple[Index, Leaf]]],
+ enumerate_reversed if reversed else enumerate,
+ )
+ for index, leaf in op(line.leaves):
+ length = len(leaf.prefix) + len(leaf.value)
+ if "\n" in leaf.value:
+ return # Multiline strings, we can't continue.
+
+ comment: Optional[Leaf]
+ for comment in line.comments_after(leaf, index):
+ length += len(comment.value)
+ yield index, leaf, length
-def get_cache_file(line_length: int) -> Path:
- return CACHE_DIR / f"cache.{line_length}.pickle"
+
+def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
+ """Return True if `line` is no longer than `line_length`.
+
+ Uses the provided `line_str` rendering, if any, otherwise computes a new one.
+ """
+ if not line_str:
+ line_str = str(line).strip("\n")
+ return (
+ len(line_str) <= line_length
+ and "\n" not in line_str # multiline strings
+ and not line.contains_standalone_comments()
+ )
-def read_cache(line_length: int) -> Cache:
+def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
+ """Does `line` have a shape safe to reformat without optional parens around it?
+
+ Returns True for only a subset of potentially nice looking formattings but
+ the point is to not return false positives that end up producing lines that
+ are too long.
+ """
+ bt = line.bracket_tracker
+ if not bt.delimiters:
+ # Without delimiters the optional parentheses are useless.
+ return True
+
+ max_priority = bt.max_delimiter_priority()
+ if bt.delimiter_count_with_priority(max_priority) > 1:
+ # With more than one delimiter of a kind the optional parentheses read better.
+ return False
+
+ if max_priority == DOT_PRIORITY:
+ # A single stranded method call doesn't require optional parentheses.
+ return True
+
+ assert len(line.leaves) >= 2, "Stranded delimiter"
+
+ first = line.leaves[0]
+ second = line.leaves[1]
+ penultimate = line.leaves[-2]
+ last = line.leaves[-1]
+
+ # With a single delimiter, omit if the expression starts or ends with
+ # a bracket.
+ if first.type in OPENING_BRACKETS and second.type not in CLOSING_BRACKETS:
+ remainder = False
+ length = 4 * line.depth
+ for _index, leaf, leaf_length in enumerate_with_length(line):
+ if leaf.type in CLOSING_BRACKETS and leaf.opening_bracket is first:
+ remainder = True
+ if remainder:
+ length += leaf_length
+ if length > line_length:
+ break
+
+ if leaf.type in OPENING_BRACKETS:
+ # There are brackets we can further split on.
+ remainder = False
+
+ else:
+ # checked the entire string and line length wasn't exceeded
+ if len(line.leaves) == _index + 1:
+ return True
+
+ # Note: we are not returning False here because a line might have *both*
+ # a leading opening bracket and a trailing closing bracket. If the
+ # opening bracket doesn't match our rule, maybe the closing will.
+
+ if (
+ last.type == token.RPAR
+ or last.type == token.RBRACE
+ or (
+ # don't use indexing for omitting optional parentheses;
+ # it looks weird
+ last.type == token.RSQB
+ and last.parent
+ and last.parent.type != syms.trailer
+ )
+ ):
+ if penultimate.type in OPENING_BRACKETS:
+ # Empty brackets don't help.
+ return False
+
+ if is_multiline_string(first):
+ # Additional wrapping of a multiline string in this situation is
+ # unnecessary.
+ return True
+
+ length = 4 * line.depth
+ seen_other_brackets = False
+ for _index, leaf, leaf_length in enumerate_with_length(line):
+ length += leaf_length
+ if leaf is last.opening_bracket:
+ if seen_other_brackets or length <= line_length:
+ return True
+
+ elif leaf.type in OPENING_BRACKETS:
+ # There are brackets we can further split on.
+ seen_other_brackets = True
+
+ return False
+
+
+def get_cache_file(line_length: int, mode: FileMode) -> Path:
+ pyi = bool(mode & FileMode.PYI)
+ py36 = bool(mode & FileMode.PYTHON36)
+ return (
+ CACHE_DIR
+ / f"cache.{line_length}{'.pyi' if pyi else ''}{'.py36' if py36 else ''}.pickle"
+ )
+
+
+def read_cache(line_length: int, mode: FileMode) -> Cache:
"""Read the cache if it exists and is well formed.
If it is not well formed, the call to write_cache later should resolve the issue.
"""
- cache_file = get_cache_file(line_length)
+ cache_file = get_cache_file(line_length, mode)
if not cache_file.exists():
return {}
return todo, done
-def write_cache(cache: Cache, sources: List[Path], line_length: int) -> None:
+def write_cache(
+ cache: Cache, sources: List[Path], line_length: int, mode: FileMode
+) -> None:
"""Update the cache file."""
- cache_file = get_cache_file(line_length)
+ cache_file = get_cache_file(line_length, mode)
try:
if not CACHE_DIR.exists():
CACHE_DIR.mkdir(parents=True)