Type,
TypeVar,
Union,
+ cast,
)
from appdirs import user_cache_dir
src = src.resolve()
if src in cache and cache[src] == get_cache_info(src):
changed = Changed.CACHED
- if (
- changed is not Changed.CACHED
- and format_file_in_place(
- src, line_length=line_length, fast=fast, write_back=write_back
- )
+ if changed is not Changed.CACHED and format_file_in_place(
+ src, line_length=line_length, fast=fast, write_back=write_back
):
changed = Changed.YES
if write_back == WriteBack.YES and changed is not Changed.NO:
manager = Manager()
lock = manager.Lock()
tasks = {
- src: loop.run_in_executor(
+ loop.run_in_executor(
executor, format_file_in_place, src, line_length, fast, write_back, lock
- )
- for src in sources
+ ): src
+ for src in sorted(sources)
}
- _task_values = list(tasks.values())
+ pending: Iterable[asyncio.Task] = tasks.keys()
try:
- loop.add_signal_handler(signal.SIGINT, cancel, _task_values)
- loop.add_signal_handler(signal.SIGTERM, cancel, _task_values)
+ loop.add_signal_handler(signal.SIGINT, cancel, pending)
+ loop.add_signal_handler(signal.SIGTERM, cancel, pending)
except NotImplementedError:
# There are no good alternatives for these on Windows
pass
- await asyncio.wait(_task_values)
- for src, task in tasks.items():
- if not task.done():
- report.failed(src, "timed out, cancelling")
- task.cancel()
- cancelled.append(task)
- elif task.cancelled():
- cancelled.append(task)
- elif task.exception():
- report.failed(src, str(task.exception()))
- else:
- formatted.append(src)
- report.done(src, Changed.YES if task.result() else Changed.NO)
-
+ while pending:
+ done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
+ for task in done:
+ src = tasks.pop(task)
+ if task.cancelled():
+ cancelled.append(task)
+ elif task.exception():
+ report.failed(src, str(task.exception()))
+ else:
+ formatted.append(src)
+ report.done(src, Changed.YES if task.result() else Changed.NO)
if cancelled:
await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
if write_back == WriteBack.YES and formatted:
STRING_PRIORITY = 12
COMPARATOR_PRIORITY = 10
MATH_PRIORITIES = {
- token.VBAR: 8,
- token.CIRCUMFLEX: 7,
- token.AMPER: 6,
- token.LEFTSHIFT: 5,
- token.RIGHTSHIFT: 5,
- token.PLUS: 4,
- token.MINUS: 4,
- token.STAR: 3,
- token.SLASH: 3,
- token.DOUBLESLASH: 3,
- token.PERCENT: 3,
- token.AT: 3,
- token.TILDE: 2,
- token.DOUBLESTAR: 1,
+ token.VBAR: 9,
+ token.CIRCUMFLEX: 8,
+ token.AMPER: 7,
+ token.LEFTSHIFT: 6,
+ token.RIGHTSHIFT: 6,
+ token.PLUS: 5,
+ token.MINUS: 5,
+ token.STAR: 4,
+ token.SLASH: 4,
+ token.DOUBLESLASH: 4,
+ token.PERCENT: 4,
+ token.AT: 4,
+ token.TILDE: 3,
+ token.DOUBLESTAR: 2,
}
+DOT_PRIORITY = 1
@dataclass
"""
return max(v for k, v in self.delimiters.items() if k not in exclude)
+ def delimiter_count_with_priority(self, priority: int = 0) -> int:
+ """Return the number of delimiters with the given `priority`.
+
+ If no `priority` is passed, defaults to max priority on the line.
+ """
+ if not self.delimiters:
+ return 0
+
+ priority = priority or self.max_delimiter_priority()
+ return sum(1 for p in self.delimiters.values() if p == priority)
+
def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
"""In a for loop, or comprehension, the variables are often unpacks.
comments: List[Tuple[Index, Leaf]] = Factory(list)
bracket_tracker: BracketTracker = Factory(BracketTracker)
inside_brackets: bool = False
+ should_explode: bool = False
def append(self, leaf: Leaf, preformatted: bool = False) -> None:
"""Add a new `leaf` to the end of the line.
@property
def is_stub_class(self) -> bool:
"""Is this line a class definition with a body consisting only of "..."?"""
- return (
- self.is_class
- and self.leaves[-3:] == [Leaf(token.DOT, ".") for _ in range(3)]
- )
+ return self.is_class and self.leaves[-3:] == [
+ Leaf(token.DOT, ".") for _ in range(3)
+ ]
@property
def is_def(self) -> bool:
second_leaf: Optional[Leaf] = self.leaves[1]
except IndexError:
second_leaf = None
- return (
- (first_leaf.type == token.NAME and first_leaf.value == "def")
- or (
- first_leaf.type == token.ASYNC
- and second_leaf is not None
- and second_leaf.type == token.NAME
- and second_leaf.value == "def"
- )
+ return (first_leaf.type == token.NAME and first_leaf.value == "def") or (
+ first_leaf.type == token.ASYNC
+ and second_leaf is not None
+ and second_leaf.type == token.NAME
+ and second_leaf.value == "def"
)
@property
and subscript_start.type == syms.subscriptlist
):
subscript_start = child_towards(subscript_start, leaf)
- return (
- subscript_start is not None
- and any(n.type in TEST_DESCENDANTS for n in subscript_start.pre_order())
+ return subscript_start is not None and any(
+ n.type in TEST_DESCENDANTS for n in subscript_start.pre_order()
)
def __str__(self) -> str:
The relevant Python language `keywords` for a given statement will be
NAME leaves within it. This methods puts those on a separate line.
- `parens` holds a set of string leaf values immeditely after which
+ `parens` holds a set of string leaf values immediately after which
invisible parens should be put.
"""
normalize_invisible_parens(node, parens_after=parens)
return DOUBLESPACE
assert p is not None, f"INTERNAL ERROR: hand-made leaf without parent: {leaf!r}"
- if (
- t == token.COLON
- and p.type not in {syms.subscript, syms.subscriptlist, syms.sliceop}
- ):
+ if t == token.COLON and p.type not in {
+ syms.subscript,
+ syms.subscriptlist,
+ syms.sliceop,
+ }:
return NO
prev = leaf.prev_sibling
if prevp.type == token.EQUAL:
if prevp.parent:
if prevp.parent.type in {
- syms.arglist, syms.argument, syms.parameters, syms.varargslist
+ syms.arglist,
+ syms.argument,
+ syms.parameters,
+ syms.varargslist,
}:
return NO
prevp_parent = prevp.parent
assert prevp_parent is not None
- if (
- prevp.type == token.COLON
- and prevp_parent.type in {syms.subscript, syms.sliceop}
- ):
+ if prevp.type == token.COLON and prevp_parent.type in {
+ syms.subscript,
+ syms.sliceop,
+ }:
return NO
elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
# Don't treat them as a delimiter.
return 0
+ if (
+ leaf.type == token.DOT
+ and leaf.parent
+ and leaf.parent.type not in {syms.import_from, syms.dotted_name}
+ and (previous is None or previous.type in CLOSING_BRACKETS)
+ ):
+ return DOT_PRIORITY
+
if (
leaf.type in MATH_OPERATORS
and leaf.parent
):
return STRING_PRIORITY
+ if leaf.type != token.NAME:
+ return 0
+
if (
- leaf.type == token.NAME
- and leaf.value == "for"
+ leaf.value == "for"
and leaf.parent
and leaf.parent.type in {syms.comp_for, syms.old_comp_for}
):
return COMPREHENSION_PRIORITY
if (
- leaf.type == token.NAME
- and leaf.value == "if"
+ leaf.value == "if"
and leaf.parent
and leaf.parent.type in {syms.comp_if, syms.old_comp_if}
):
return COMPREHENSION_PRIORITY
+ if leaf.value in {"if", "else"} and leaf.parent and leaf.parent.type == syms.test:
+ return TERNARY_PRIORITY
+
+ if leaf.value == "is":
+ return COMPARATOR_PRIORITY
+
if (
- leaf.type == token.NAME
- and leaf.value in {"if", "else"}
+ leaf.value == "in"
and leaf.parent
- and leaf.parent.type == syms.test
+ and leaf.parent.type in {syms.comp_op, syms.comparison}
+ and not (
+ previous is not None
+ and previous.type == token.NAME
+ and previous.value == "not"
+ )
):
- return TERNARY_PRIORITY
+ return COMPARATOR_PRIORITY
- if leaf.type == token.NAME and leaf.value in LOGIC_OPERATORS and leaf.parent:
+ if (
+ leaf.value == "not"
+ and leaf.parent
+ and leaf.parent.type == syms.comp_op
+ and not (
+ previous is not None
+ and previous.type == token.NAME
+ and previous.value == "is"
+ )
+ ):
+ return COMPARATOR_PRIORITY
+
+ if leaf.value in LOGIC_OPERATORS and leaf.parent:
return LOGIC_PRIORITY
return 0
return
line_str = str(line).strip("\n")
- if is_line_short_enough(line, line_length=line_length, line_str=line_str):
+ if not line.should_explode and is_line_short_enough(
+ line, line_length=line_length, line_str=line_str
+ ):
yield line
return
split_funcs: List[SplitFunc]
if line.is_def:
split_funcs = [left_hand_split]
- elif line.is_import:
- split_funcs = [explode_split]
else:
def rhs(line: Line, py36: bool = False) -> Iterator[Line]:
for omit in generate_trailers_to_omit(line, line_length):
- lines = list(right_hand_split(line, py36, omit=omit))
+ lines = list(right_hand_split(line, line_length, py36, omit=omit))
if is_line_short_enough(lines[0], line_length=line_length):
yield from lines
return
# All splits failed, best effort split with no omits.
+ # This mostly happens to multiline strings that are by definition
+ # reported as not fitting a single line.
yield from right_hand_split(line, py36)
if line.inside_brackets:
def right_hand_split(
- line: Line, py36: bool = False, omit: Collection[LeafID] = ()
+ line: Line, line_length: int, py36: bool = False, omit: Collection[LeafID] = ()
) -> Iterator[Line]:
"""Split line into many lines, starting with the last matching bracket pair.
# Since body is a new indent level, remove spurious leading whitespace.
if body_leaves:
normalize_prefix(body_leaves[0], inside_brackets=True)
- elif not head_leaves:
- # No `head` and no `body` means the split failed. `tail` has all content.
+ if not head_leaves:
+ # No `head` means the split failed. Either `tail` has all content or
+ # the matching `opening_bracket` wasn't available on `line` anymore.
raise CannotSplit("No brackets found")
# Build the new lines.
# the closing bracket is an optional paren
and closing_bracket.type == token.RPAR
and not closing_bracket.value
- # there are no delimiters or standalone comments in the body
- and not body.bracket_tracker.delimiters
+ # there are no standalone comments in the body
and not line.contains_standalone_comments(0)
# and it's not an import (optional parens are the only thing we can split
# on in this case; attempting a split without them is a waste of time)
and not line.is_import
):
omit = {id(closing_bracket), *omit}
- try:
- yield from right_hand_split(line, py36=py36, omit=omit)
- return
- except CannotSplit:
- pass
+ if can_omit_invisible_parens(body, line_length):
+ try:
+ yield from right_hand_split(line, line_length, py36=py36, omit=omit)
+ return
+ except CannotSplit:
+ pass
ensure_visible(opening_bracket)
ensure_visible(closing_bracket)
+ body.should_explode = should_explode(body, opening_bracket)
for result in (head, body, tail):
if result:
yield result
except IndexError:
raise CannotSplit("Line empty")
- delimiters = line.bracket_tracker.delimiters
+ bt = line.bracket_tracker
try:
- delimiter_priority = line.bracket_tracker.max_delimiter_priority(
- exclude={id(last_leaf)}
- )
+ delimiter_priority = bt.max_delimiter_priority(exclude={id(last_leaf)})
except ValueError:
raise CannotSplit("No delimiters found")
+ if delimiter_priority == DOT_PRIORITY:
+ if bt.delimiter_count_with_priority(delimiter_priority) == 1:
+ raise CannotSplit("Splitting a single attribute from its owner looks wrong")
+
current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
lowest_depth = sys.maxsize
trailing_comma_safe = True
yield from append_to_line(comment_after)
lowest_depth = min(lowest_depth, leaf.bracket_depth)
- if (
- leaf.bracket_depth == lowest_depth
- and is_vararg(leaf, within=VARARGS_PARENTS)
+ if leaf.bracket_depth == lowest_depth and is_vararg(
+ leaf, within=VARARGS_PARENTS
):
trailing_comma_safe = trailing_comma_safe and py36
- leaf_priority = delimiters.get(id(leaf))
+ leaf_priority = bt.delimiters.get(id(leaf))
if leaf_priority == delimiter_priority:
yield current_line
yield current_line
-def explode_split(
- line: Line, py36: bool = False, omit: Collection[LeafID] = ()
-) -> Iterator[Line]:
- """Split by rightmost bracket and immediately split contents by a delimiter."""
- new_lines = list(right_hand_split(line, py36, omit))
- if len(new_lines) != 3:
- yield from new_lines
- return
-
- yield new_lines[0]
-
- try:
- yield from delimiter_split(new_lines[1], py36)
-
- except CannotSplit:
- yield new_lines[1]
-
- yield new_lines[2]
-
-
def is_import(leaf: Leaf) -> bool:
"""Return True if the given leaf starts an import statement."""
p = leaf.parent
rpar = Leaf(token.RPAR, ")")
index = child.remove() or 0
node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
- else:
+ elif not (isinstance(child, Leaf) and is_multiline_string(child)):
# wrap child in invisible parentheses
lpar = Leaf(token.LPAR, "")
rpar = Leaf(token.RPAR, "")
return p.type in within
+def is_multiline_string(leaf: Leaf) -> bool:
+ """Return True if `leaf` is a multiline string that actually spans many lines."""
+ value = leaf.value.lstrip("furbFURB")
+ return value[:3] in {'"""', "'''"} and "\n" in value
+
+
def is_stub_suite(node: Node) -> bool:
"""Return True if `node` is a suite with a stub body."""
if (
leaf.value = ")"
+def should_explode(line: Line, opening_bracket: Leaf) -> bool:
+ """Should `line` immediately be split with `delimiter_split()` after RHS?"""
+ if not (
+ opening_bracket.parent
+ and opening_bracket.parent.type in {syms.atom, syms.import_from}
+ and opening_bracket.value in "[{("
+ ):
+ return False
+
+ try:
+ last_leaf = line.leaves[-1]
+ exclude = {id(last_leaf)} if last_leaf.type == token.COMMA else set()
+ max_priority = line.bracket_tracker.max_delimiter_priority(exclude=exclude)
+ except (IndexError, ValueError):
+ return False
+
+ return max_priority == COMMA_PRIORITY
+
+
def is_python36(node: Node) -> bool:
"""Return True if the current file is using Python 3.6+ features.
closing_bracket = None
optional_brackets: Set[LeafID] = set()
inner_brackets: Set[LeafID] = set()
- for index, leaf in enumerate_reversed(line.leaves):
- length += len(leaf.prefix) + len(leaf.value)
+ for index, leaf, leaf_length in enumerate_with_length(line, reversed=True):
+ length += leaf_length
if length > line_length:
break
- comment: Optional[Leaf]
- for comment in line.comments_after(leaf, index):
- if "\n" in comment.prefix:
- break # Oops, standalone comment!
-
- length += len(comment.value)
- else:
- comment = None
- if comment is not None:
- break # There was a standalone comment, we can't continue.
-
optional_brackets.discard(id(leaf))
if opening_bracket:
if leaf is opening_bracket:
PYTHON_EXTENSIONS = {".py", ".pyi"}
BLACKLISTED_DIRECTORIES = {
- "build", "buck-out", "dist", "_build", ".git", ".hg", ".mypy_cache", ".tox", ".venv"
+ "build",
+ "buck-out",
+ "dist",
+ "_build",
+ ".git",
+ ".hg",
+ ".mypy_cache",
+ ".tox",
+ ".venv",
}
)
-def cancel(tasks: List[asyncio.Task]) -> None:
+def cancel(tasks: Iterable[asyncio.Task]) -> None:
"""asyncio signal handler that cancels all `tasks` and reports to stderr."""
err("Aborted!")
for task in tasks:
index -= 1
+def enumerate_with_length(
+ line: Line, reversed: bool = False
+) -> Iterator[Tuple[Index, Leaf, int]]:
+ """Return an enumeration of leaves with their length.
+
+ Stops prematurely on multiline strings and standalone comments.
+ """
+ op = cast(
+ Callable[[Sequence[Leaf]], Iterator[Tuple[Index, Leaf]]],
+ enumerate_reversed if reversed else enumerate,
+ )
+ for index, leaf in op(line.leaves):
+ length = len(leaf.prefix) + len(leaf.value)
+ if "\n" in leaf.value:
+ return # Multiline strings, we can't continue.
+
+ comment: Optional[Leaf]
+ for comment in line.comments_after(leaf, index):
+ if "\n" in comment.prefix:
+ return # Oops, standalone comment!
+
+ length += len(comment.value)
+
+ yield index, leaf, length
+
+
def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
"""Return True if `line` is no longer than `line_length`.
)
+def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
+ """Does `line` have a shape safe to reformat without optional parens around it?
+
+ Returns True for only a subset of potentially nice looking formattings but
+ the point is to not return false positives that end up producing lines that
+ are too long.
+ """
+ bt = line.bracket_tracker
+ if not bt.delimiters:
+ # Without delimiters the optional parentheses are useless.
+ return True
+
+ max_priority = bt.max_delimiter_priority()
+ if bt.delimiter_count_with_priority(max_priority) > 1:
+ # With more than one delimiter of a kind the optional parentheses read better.
+ return False
+
+ if max_priority == DOT_PRIORITY:
+ # A single stranded method call doesn't require optional parentheses.
+ return True
+
+ assert len(line.leaves) >= 2, "Stranded delimiter"
+
+ first = line.leaves[0]
+ second = line.leaves[1]
+ penultimate = line.leaves[-2]
+ last = line.leaves[-1]
+
+ # With a single delimiter, omit if the expression starts or ends with
+ # a bracket.
+ if first.type in OPENING_BRACKETS and second.type not in CLOSING_BRACKETS:
+ remainder = False
+ length = 4 * line.depth
+ for _index, leaf, leaf_length in enumerate_with_length(line):
+ if leaf.type in CLOSING_BRACKETS and leaf.opening_bracket is first:
+ remainder = True
+ if remainder:
+ length += leaf_length
+ if length > line_length:
+ break
+
+ if leaf.type in OPENING_BRACKETS:
+ # There are brackets we can further split on.
+ remainder = False
+
+ else:
+ # checked the entire string and line length wasn't exceeded
+ if len(line.leaves) == _index + 1:
+ return True
+
+ # Note: we are not returning False here because a line might have *both*
+ # a leading opening bracket and a trailing closing bracket. If the
+ # opening bracket doesn't match our rule, maybe the closing will.
+
+ if (
+ last.type == token.RPAR
+ or last.type == token.RBRACE
+ or (
+ # don't use indexing for omitting optional parentheses;
+ # it looks weird
+ last.type == token.RSQB
+ and last.parent
+ and last.parent.type != syms.trailer
+ )
+ ):
+ if penultimate.type in OPENING_BRACKETS:
+ # Empty brackets don't help.
+ return False
+
+ if is_multiline_string(first):
+ # Additional wrapping of a multiline string in this situation is
+ # unnecessary.
+ return True
+
+ length = 4 * line.depth
+ seen_other_brackets = False
+ for _index, leaf, leaf_length in enumerate_with_length(line):
+ length += leaf_length
+ if leaf is last.opening_bracket:
+ if seen_other_brackets or length <= line_length:
+ return True
+
+ elif leaf.type in OPENING_BRACKETS:
+ # There are brackets we can further split on.
+ seen_other_brackets = True
+
+ return False
+
+
CACHE_DIR = Path(user_cache_dir("black", version=__version__))