All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
3 from asyncio.base_events import BaseEventLoop
4 from concurrent.futures import Executor, ProcessPoolExecutor
6 from functools import partial, wraps
9 from multiprocessing import Manager
11 from pathlib import Path
35 from appdirs import user_cache_dir
36 from attr import dataclass, Factory
40 from blib2to3.pytree import Node, Leaf, type_repr
41 from blib2to3 import pygram, pytree
42 from blib2to3.pgen2 import driver, token
43 from blib2to3.pgen2.parse import ParseError
46 __version__ = "18.4a6"
47 DEFAULT_LINE_LENGTH = 88
50 syms = pygram.python_symbols
58 LN = Union[Leaf, Node]
59 SplitFunc = Callable[["Line", bool], Iterator["Line"]]
62 CacheInfo = Tuple[Timestamp, FileSize]
63 Cache = Dict[Path, CacheInfo]
64 out = partial(click.secho, bold=True, err=True)
65 err = partial(click.secho, fg="red", err=True)
68 class NothingChanged(UserWarning):
69 """Raised by :func:`format_file` when reformatted code is the same as source."""
72 class CannotSplit(Exception):
73 """A readable split that fits the allotted line length is impossible.
75 Raised by :func:`left_hand_split`, :func:`right_hand_split`, and
76 :func:`delimiter_split`.
80 class FormatError(Exception):
81 """Base exception for `# fmt: on` and `# fmt: off` handling.
83 It holds the number of bytes of the prefix consumed before the format
84 control comment appeared.
87 def __init__(self, consumed: int) -> None:
88 super().__init__(consumed)
89 self.consumed = consumed
91 def trim_prefix(self, leaf: Leaf) -> None:
92 leaf.prefix = leaf.prefix[self.consumed :]
94 def leaf_from_consumed(self, leaf: Leaf) -> Leaf:
95 """Returns a new Leaf from the consumed part of the prefix."""
96 unformatted_prefix = leaf.prefix[: self.consumed]
97 return Leaf(token.NEWLINE, unformatted_prefix)
100 class FormatOn(FormatError):
101 """Found a comment like `# fmt: on` in the file."""
104 class FormatOff(FormatError):
105 """Found a comment like `# fmt: off` in the file."""
108 class WriteBack(Enum):
125 default=DEFAULT_LINE_LENGTH,
126 help="How many character per line to allow.",
133 "Don't write the files back, just return the status. Return code 0 "
134 "means nothing would change. Return code 1 means some files would be "
135 "reformatted. Return code 123 means there was an internal error."
141 help="Don't write the files back, just output a diff for each file on stdout.",
146 help="If --fast given, skip temporary sanity checks. [default: --safe]",
153 "Don't emit non-error messages to stderr. Errors are still emitted, "
154 "silence those with 2>/dev/null."
157 @click.version_option(version=__version__)
162 exists=True, file_okay=True, dir_okay=True, readable=True, allow_dash=True
175 """The uncompromising code formatter."""
176 sources: List[Path] = []
180 sources.extend(gen_python_files_in_dir(p))
182 # if a file was explicitly given, we don't care about its extension
185 sources.append(Path("-"))
187 err(f"invalid path: {s}")
189 if check and not diff:
190 write_back = WriteBack.NO
192 write_back = WriteBack.DIFF
194 write_back = WriteBack.YES
195 report = Report(check=check, quiet=quiet)
196 if len(sources) == 0:
197 out("No paths given. Nothing to do 😴")
201 elif len(sources) == 1:
202 reformat_one(sources[0], line_length, fast, write_back, report)
204 loop = asyncio.get_event_loop()
205 executor = ProcessPoolExecutor(max_workers=os.cpu_count())
207 loop.run_until_complete(
209 sources, line_length, fast, write_back, report, loop, executor
215 out("All done! ✨ 🍰 ✨")
216 click.echo(str(report))
217 ctx.exit(report.return_code)
221 src: Path, line_length: int, fast: bool, write_back: WriteBack, report: "Report"
223 """Reformat a single file under `src` without spawning child processes.
225 If `quiet` is True, non-error messages are not output. `line_length`,
226 `write_back`, and `fast` options are passed to :func:`format_file_in_place`.
230 if not src.is_file() and str(src) == "-":
231 if format_stdin_to_stdout(
232 line_length=line_length, fast=fast, write_back=write_back
234 changed = Changed.YES
237 if write_back != WriteBack.DIFF:
238 cache = read_cache(line_length)
240 if src in cache and cache[src] == get_cache_info(src):
241 changed = Changed.CACHED
242 if changed is not Changed.CACHED and format_file_in_place(
243 src, line_length=line_length, fast=fast, write_back=write_back
245 changed = Changed.YES
246 if write_back == WriteBack.YES and changed is not Changed.NO:
247 write_cache(cache, [src], line_length)
248 report.done(src, changed)
249 except Exception as exc:
250 report.failed(src, str(exc))
253 async def schedule_formatting(
257 write_back: WriteBack,
262 """Run formatting of `sources` in parallel using the provided `executor`.
264 (Use ProcessPoolExecutors for actual parallelism.)
266 `line_length`, `write_back`, and `fast` options are passed to
267 :func:`format_file_in_place`.
270 if write_back != WriteBack.DIFF:
271 cache = read_cache(line_length)
272 sources, cached = filter_cached(cache, sources)
274 report.done(src, Changed.CACHED)
279 if write_back == WriteBack.DIFF:
280 # For diff output, we need locks to ensure we don't interleave output
281 # from different processes.
283 lock = manager.Lock()
285 loop.run_in_executor(
286 executor, format_file_in_place, src, line_length, fast, write_back, lock
288 for src in sorted(sources)
290 pending: Iterable[asyncio.Task] = tasks.keys()
292 loop.add_signal_handler(signal.SIGINT, cancel, pending)
293 loop.add_signal_handler(signal.SIGTERM, cancel, pending)
294 except NotImplementedError:
295 # There are no good alternatives for these on Windows
298 done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
300 src = tasks.pop(task)
302 cancelled.append(task)
303 elif task.exception():
304 report.failed(src, str(task.exception()))
306 formatted.append(src)
307 report.done(src, Changed.YES if task.result() else Changed.NO)
309 await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
310 if write_back == WriteBack.YES and formatted:
311 write_cache(cache, formatted, line_length)
314 def format_file_in_place(
318 write_back: WriteBack = WriteBack.NO,
319 lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
321 """Format file under `src` path. Return True if changed.
323 If `write_back` is True, write reformatted code back to stdout.
324 `line_length` and `fast` options are passed to :func:`format_file_contents`.
326 is_pyi = src.suffix == ".pyi"
328 with tokenize.open(src) as src_buffer:
329 src_contents = src_buffer.read()
331 dst_contents = format_file_contents(
332 src_contents, line_length=line_length, fast=fast, is_pyi=is_pyi
334 except NothingChanged:
337 if write_back == write_back.YES:
338 with open(src, "w", encoding=src_buffer.encoding) as f:
339 f.write(dst_contents)
340 elif write_back == write_back.DIFF:
341 src_name = f"{src} (original)"
342 dst_name = f"{src} (formatted)"
343 diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
347 sys.stdout.write(diff_contents)
354 def format_stdin_to_stdout(
355 line_length: int, fast: bool, write_back: WriteBack = WriteBack.NO
357 """Format file on stdin. Return True if changed.
359 If `write_back` is True, write reformatted code back to stdout.
360 `line_length` and `fast` arguments are passed to :func:`format_file_contents`.
362 src = sys.stdin.read()
365 dst = format_file_contents(src, line_length=line_length, fast=fast)
368 except NothingChanged:
372 if write_back == WriteBack.YES:
373 sys.stdout.write(dst)
374 elif write_back == WriteBack.DIFF:
375 src_name = "<stdin> (original)"
376 dst_name = "<stdin> (formatted)"
377 sys.stdout.write(diff(src, dst, src_name, dst_name))
380 def format_file_contents(
381 src_contents: str, *, line_length: int, fast: bool, is_pyi: bool = False
383 """Reformat contents a file and return new contents.
385 If `fast` is False, additionally confirm that the reformatted code is
386 valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
387 `line_length` is passed to :func:`format_str`.
389 if src_contents.strip() == "":
392 dst_contents = format_str(src_contents, line_length=line_length, is_pyi=is_pyi)
393 if src_contents == dst_contents:
397 assert_equivalent(src_contents, dst_contents)
399 src_contents, dst_contents, line_length=line_length, is_pyi=is_pyi
405 src_contents: str, line_length: int, *, is_pyi: bool = False
407 """Reformat a string and return new contents.
409 `line_length` determines how many characters per line are allowed.
411 src_node = lib2to3_parse(src_contents)
413 future_imports = get_future_imports(src_node)
414 elt = EmptyLineTracker(is_pyi=is_pyi)
415 py36 = is_python36(src_node)
416 lines = LineGenerator(
417 remove_u_prefix=py36 or "unicode_literals" in future_imports, is_pyi=is_pyi
421 for current_line in lines.visit(src_node):
422 for _ in range(after):
423 dst_contents += str(empty_line)
424 before, after = elt.maybe_empty_lines(current_line)
425 for _ in range(before):
426 dst_contents += str(empty_line)
427 for line in split_line(current_line, line_length=line_length, py36=py36):
428 dst_contents += str(line)
433 pygram.python_grammar_no_print_statement_no_exec_statement,
434 pygram.python_grammar_no_print_statement,
435 pygram.python_grammar,
439 def lib2to3_parse(src_txt: str) -> Node:
440 """Given a string with source, return the lib2to3 Node."""
441 grammar = pygram.python_grammar_no_print_statement
442 if src_txt[-1] != "\n":
443 nl = "\r\n" if "\r\n" in src_txt[:1024] else "\n"
445 for grammar in GRAMMARS:
446 drv = driver.Driver(grammar, pytree.convert)
448 result = drv.parse_string(src_txt, True)
451 except ParseError as pe:
452 lineno, column = pe.context[1]
453 lines = src_txt.splitlines()
455 faulty_line = lines[lineno - 1]
457 faulty_line = "<line number missing in source>"
458 exc = ValueError(f"Cannot parse: {lineno}:{column}: {faulty_line}")
462 if isinstance(result, Leaf):
463 result = Node(syms.file_input, [result])
467 def lib2to3_unparse(node: Node) -> str:
468 """Given a lib2to3 node, return its string representation."""
476 class Visitor(Generic[T]):
477 """Basic lib2to3 visitor that yields things of type `T` on `visit()`."""
479 def visit(self, node: LN) -> Iterator[T]:
480 """Main method to visit `node` and its children.
482 It tries to find a `visit_*()` method for the given `node.type`, like
483 `visit_simple_stmt` for Node objects or `visit_INDENT` for Leaf objects.
484 If no dedicated `visit_*()` method is found, chooses `visit_default()`
487 Then yields objects of type `T` from the selected visitor.
490 name = token.tok_name[node.type]
492 name = type_repr(node.type)
493 yield from getattr(self, f"visit_{name}", self.visit_default)(node)
495 def visit_default(self, node: LN) -> Iterator[T]:
496 """Default `visit_*()` implementation. Recurses to children of `node`."""
497 if isinstance(node, Node):
498 for child in node.children:
499 yield from self.visit(child)
503 class DebugVisitor(Visitor[T]):
506 def visit_default(self, node: LN) -> Iterator[T]:
507 indent = " " * (2 * self.tree_depth)
508 if isinstance(node, Node):
509 _type = type_repr(node.type)
510 out(f"{indent}{_type}", fg="yellow")
512 for child in node.children:
513 yield from self.visit(child)
516 out(f"{indent}/{_type}", fg="yellow", bold=False)
518 _type = token.tok_name.get(node.type, str(node.type))
519 out(f"{indent}{_type}", fg="blue", nl=False)
521 # We don't have to handle prefixes for `Node` objects since
522 # that delegates to the first child anyway.
523 out(f" {node.prefix!r}", fg="green", bold=False, nl=False)
524 out(f" {node.value!r}", fg="blue", bold=False)
527 def show(cls, code: str) -> None:
528 """Pretty-print the lib2to3 AST of a given string of `code`.
530 Convenience method for debugging.
532 v: DebugVisitor[None] = DebugVisitor()
533 list(v.visit(lib2to3_parse(code)))
536 KEYWORDS = set(keyword.kwlist)
537 WHITESPACE = {token.DEDENT, token.INDENT, token.NEWLINE}
538 FLOW_CONTROL = {"return", "raise", "break", "continue"}
549 STANDALONE_COMMENT = 153
550 LOGIC_OPERATORS = {"and", "or"}
575 STARS = {token.STAR, token.DOUBLESTAR}
578 syms.argument, # double star in arglist
579 syms.trailer, # single argument to call
581 syms.varargslist, # lambdas
583 UNPACKING_PARENTS = {
584 syms.atom, # single element of a list or set literal
622 COMPREHENSION_PRIORITY = 20
624 TERNARY_PRIORITY = 16
627 COMPARATOR_PRIORITY = 10
638 token.DOUBLESLASH: 4,
648 class BracketTracker:
649 """Keeps track of brackets on a line."""
652 bracket_match: Dict[Tuple[Depth, NodeType], Leaf] = Factory(dict)
653 delimiters: Dict[LeafID, Priority] = Factory(dict)
654 previous: Optional[Leaf] = None
655 _for_loop_variable: int = 0
656 _lambda_arguments: int = 0
658 def mark(self, leaf: Leaf) -> None:
659 """Mark `leaf` with bracket-related metadata. Keep track of delimiters.
661 All leaves receive an int `bracket_depth` field that stores how deep
662 within brackets a given leaf is. 0 means there are no enclosing brackets
663 that started on this line.
665 If a leaf is itself a closing bracket, it receives an `opening_bracket`
666 field that it forms a pair with. This is a one-directional link to
667 avoid reference cycles.
669 If a leaf is a delimiter (a token on which Black can split the line if
670 needed) and it's on depth 0, its `id()` is stored in the tracker's
673 if leaf.type == token.COMMENT:
676 self.maybe_decrement_after_for_loop_variable(leaf)
677 self.maybe_decrement_after_lambda_arguments(leaf)
678 if leaf.type in CLOSING_BRACKETS:
680 opening_bracket = self.bracket_match.pop((self.depth, leaf.type))
681 leaf.opening_bracket = opening_bracket
682 leaf.bracket_depth = self.depth
684 delim = is_split_before_delimiter(leaf, self.previous)
685 if delim and self.previous is not None:
686 self.delimiters[id(self.previous)] = delim
688 delim = is_split_after_delimiter(leaf, self.previous)
690 self.delimiters[id(leaf)] = delim
691 if leaf.type in OPENING_BRACKETS:
692 self.bracket_match[self.depth, BRACKET[leaf.type]] = leaf
695 self.maybe_increment_lambda_arguments(leaf)
696 self.maybe_increment_for_loop_variable(leaf)
698 def any_open_brackets(self) -> bool:
699 """Return True if there is an yet unmatched open bracket on the line."""
700 return bool(self.bracket_match)
702 def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> int:
703 """Return the highest priority of a delimiter found on the line.
705 Values are consistent with what `is_split_*_delimiter()` return.
706 Raises ValueError on no delimiters.
708 return max(v for k, v in self.delimiters.items() if k not in exclude)
710 def delimiter_count_with_priority(self, priority: int = 0) -> int:
711 """Return the number of delimiters with the given `priority`.
713 If no `priority` is passed, defaults to max priority on the line.
715 if not self.delimiters:
718 priority = priority or self.max_delimiter_priority()
719 return sum(1 for p in self.delimiters.values() if p == priority)
721 def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
722 """In a for loop, or comprehension, the variables are often unpacks.
724 To avoid splitting on the comma in this situation, increase the depth of
725 tokens between `for` and `in`.
727 if leaf.type == token.NAME and leaf.value == "for":
729 self._for_loop_variable += 1
734 def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
735 """See `maybe_increment_for_loop_variable` above for explanation."""
736 if self._for_loop_variable and leaf.type == token.NAME and leaf.value == "in":
738 self._for_loop_variable -= 1
743 def maybe_increment_lambda_arguments(self, leaf: Leaf) -> bool:
744 """In a lambda expression, there might be more than one argument.
746 To avoid splitting on the comma in this situation, increase the depth of
747 tokens between `lambda` and `:`.
749 if leaf.type == token.NAME and leaf.value == "lambda":
751 self._lambda_arguments += 1
756 def maybe_decrement_after_lambda_arguments(self, leaf: Leaf) -> bool:
757 """See `maybe_increment_lambda_arguments` above for explanation."""
758 if self._lambda_arguments and leaf.type == token.COLON:
760 self._lambda_arguments -= 1
765 def get_open_lsqb(self) -> Optional[Leaf]:
766 """Return the most recent opening square bracket (if any)."""
767 return self.bracket_match.get((self.depth - 1, token.RSQB))
772 """Holds leaves and comments. Can be printed with `str(line)`."""
775 leaves: List[Leaf] = Factory(list)
776 comments: List[Tuple[Index, Leaf]] = Factory(list)
777 bracket_tracker: BracketTracker = Factory(BracketTracker)
778 inside_brackets: bool = False
779 should_explode: bool = False
781 def append(self, leaf: Leaf, preformatted: bool = False) -> None:
782 """Add a new `leaf` to the end of the line.
784 Unless `preformatted` is True, the `leaf` will receive a new consistent
785 whitespace prefix and metadata applied by :class:`BracketTracker`.
786 Trailing commas are maybe removed, unpacked for loop variables are
787 demoted from being delimiters.
789 Inline comments are put aside.
791 has_value = leaf.type in BRACKETS or bool(leaf.value.strip())
795 if token.COLON == leaf.type and self.is_class_paren_empty:
797 if self.leaves and not preformatted:
798 # Note: at this point leaf.prefix should be empty except for
799 # imports, for which we only preserve newlines.
800 leaf.prefix += whitespace(
801 leaf, complex_subscript=self.is_complex_subscript(leaf)
803 if self.inside_brackets or not preformatted:
804 self.bracket_tracker.mark(leaf)
805 self.maybe_remove_trailing_comma(leaf)
806 if not self.append_comment(leaf):
807 self.leaves.append(leaf)
809 def append_safe(self, leaf: Leaf, preformatted: bool = False) -> None:
810 """Like :func:`append()` but disallow invalid standalone comment structure.
812 Raises ValueError when any `leaf` is appended after a standalone comment
813 or when a standalone comment is not the first leaf on the line.
815 if self.bracket_tracker.depth == 0:
817 raise ValueError("cannot append to standalone comments")
819 if self.leaves and leaf.type == STANDALONE_COMMENT:
821 "cannot append standalone comments to a populated line"
824 self.append(leaf, preformatted=preformatted)
827 def is_comment(self) -> bool:
828 """Is this line a standalone comment?"""
829 return len(self.leaves) == 1 and self.leaves[0].type == STANDALONE_COMMENT
832 def is_decorator(self) -> bool:
833 """Is this line a decorator?"""
834 return bool(self) and self.leaves[0].type == token.AT
837 def is_import(self) -> bool:
838 """Is this an import line?"""
839 return bool(self) and is_import(self.leaves[0])
842 def is_class(self) -> bool:
843 """Is this line a class definition?"""
846 and self.leaves[0].type == token.NAME
847 and self.leaves[0].value == "class"
851 def is_stub_class(self) -> bool:
852 """Is this line a class definition with a body consisting only of "..."?"""
853 return self.is_class and self.leaves[-3:] == [
854 Leaf(token.DOT, ".") for _ in range(3)
858 def is_def(self) -> bool:
859 """Is this a function definition? (Also returns True for async defs.)"""
861 first_leaf = self.leaves[0]
866 second_leaf: Optional[Leaf] = self.leaves[1]
869 return (first_leaf.type == token.NAME and first_leaf.value == "def") or (
870 first_leaf.type == token.ASYNC
871 and second_leaf is not None
872 and second_leaf.type == token.NAME
873 and second_leaf.value == "def"
877 def is_flow_control(self) -> bool:
878 """Is this line a flow control statement?
880 Those are `return`, `raise`, `break`, and `continue`.
884 and self.leaves[0].type == token.NAME
885 and self.leaves[0].value in FLOW_CONTROL
889 def is_yield(self) -> bool:
890 """Is this line a yield statement?"""
893 and self.leaves[0].type == token.NAME
894 and self.leaves[0].value == "yield"
898 def is_class_paren_empty(self) -> bool:
899 """Is this a class with no base classes but using parentheses?
901 Those are unnecessary and should be removed.
905 and len(self.leaves) == 4
907 and self.leaves[2].type == token.LPAR
908 and self.leaves[2].value == "("
909 and self.leaves[3].type == token.RPAR
910 and self.leaves[3].value == ")"
913 def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
914 """If so, needs to be split before emitting."""
915 for leaf in self.leaves:
916 if leaf.type == STANDALONE_COMMENT:
917 if leaf.bracket_depth <= depth_limit:
922 def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
923 """Remove trailing comma if there is one and it's safe."""
926 and self.leaves[-1].type == token.COMMA
927 and closing.type in CLOSING_BRACKETS
931 if closing.type == token.RBRACE:
932 self.remove_trailing_comma()
935 if closing.type == token.RSQB:
936 comma = self.leaves[-1]
937 if comma.parent and comma.parent.type == syms.listmaker:
938 self.remove_trailing_comma()
941 # For parens let's check if it's safe to remove the comma.
942 # Imports are always safe.
944 self.remove_trailing_comma()
947 # Otheriwsse, if the trailing one is the only one, we might mistakenly
948 # change a tuple into a different type by removing the comma.
949 depth = closing.bracket_depth + 1
951 opening = closing.opening_bracket
952 for _opening_index, leaf in enumerate(self.leaves):
959 for leaf in self.leaves[_opening_index + 1 :]:
963 bracket_depth = leaf.bracket_depth
964 if bracket_depth == depth and leaf.type == token.COMMA:
966 if leaf.parent and leaf.parent.type == syms.arglist:
971 self.remove_trailing_comma()
976 def append_comment(self, comment: Leaf) -> bool:
977 """Add an inline or standalone comment to the line."""
979 comment.type == STANDALONE_COMMENT
980 and self.bracket_tracker.any_open_brackets()
985 if comment.type != token.COMMENT:
988 after = len(self.leaves) - 1
990 comment.type = STANDALONE_COMMENT
995 self.comments.append((after, comment))
998 def comments_after(self, leaf: Leaf, _index: int = -1) -> Iterator[Leaf]:
999 """Generate comments that should appear directly after `leaf`.
1001 Provide a non-negative leaf `_index` to speed up the function.
1004 for _index, _leaf in enumerate(self.leaves):
1011 for index, comment_after in self.comments:
1015 def remove_trailing_comma(self) -> None:
1016 """Remove the trailing comma and moves the comments attached to it."""
1017 comma_index = len(self.leaves) - 1
1018 for i in range(len(self.comments)):
1019 comment_index, comment = self.comments[i]
1020 if comment_index == comma_index:
1021 self.comments[i] = (comma_index - 1, comment)
1024 def is_complex_subscript(self, leaf: Leaf) -> bool:
1025 """Return True iff `leaf` is part of a slice with non-trivial exprs."""
1027 leaf if leaf.type == token.LSQB else self.bracket_tracker.get_open_lsqb()
1029 if open_lsqb is None:
1032 subscript_start = open_lsqb.next_sibling
1034 isinstance(subscript_start, Node)
1035 and subscript_start.type == syms.subscriptlist
1037 subscript_start = child_towards(subscript_start, leaf)
1038 return subscript_start is not None and any(
1039 n.type in TEST_DESCENDANTS for n in subscript_start.pre_order()
1042 def __str__(self) -> str:
1043 """Render the line."""
1047 indent = " " * self.depth
1048 leaves = iter(self.leaves)
1049 first = next(leaves)
1050 res = f"{first.prefix}{indent}{first.value}"
1053 for _, comment in self.comments:
1057 def __bool__(self) -> bool:
1058 """Return True if the line has leaves or comments."""
1059 return bool(self.leaves or self.comments)
1062 class UnformattedLines(Line):
1063 """Just like :class:`Line` but stores lines which aren't reformatted."""
1065 def append(self, leaf: Leaf, preformatted: bool = True) -> None:
1066 """Just add a new `leaf` to the end of the lines.
1068 The `preformatted` argument is ignored.
1070 Keeps track of indentation `depth`, which is useful when the user
1071 says `# fmt: on`. Otherwise, doesn't do anything with the `leaf`.
1074 list(generate_comments(leaf))
1075 except FormatOn as f_on:
1076 self.leaves.append(f_on.leaf_from_consumed(leaf))
1079 self.leaves.append(leaf)
1080 if leaf.type == token.INDENT:
1082 elif leaf.type == token.DEDENT:
1085 def __str__(self) -> str:
1086 """Render unformatted lines from leaves which were added with `append()`.
1088 `depth` is not used for indentation in this case.
1094 for leaf in self.leaves:
1098 def append_comment(self, comment: Leaf) -> bool:
1099 """Not implemented in this class. Raises `NotImplementedError`."""
1100 raise NotImplementedError("Unformatted lines don't store comments separately.")
1102 def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
1103 """Does nothing and returns False."""
1106 def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
1107 """Does nothing and returns False."""
1112 class EmptyLineTracker:
1113 """Provides a stateful method that returns the number of potential extra
1114 empty lines needed before and after the currently processed line.
1116 Note: this tracker works on lines that haven't been split yet. It assumes
1117 the prefix of the first leaf consists of optional newlines. Those newlines
1118 are consumed by `maybe_empty_lines()` and included in the computation.
1120 is_pyi: bool = False
1121 previous_line: Optional[Line] = None
1122 previous_after: int = 0
1123 previous_defs: List[int] = Factory(list)
1125 def maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1126 """Return the number of extra empty lines before and after the `current_line`.
1128 This is for separating `def`, `async def` and `class` with extra empty
1129 lines (two on module-level), as well as providing an extra empty line
1130 after flow control keywords to make them more prominent.
1132 if isinstance(current_line, UnformattedLines):
1135 before, after = self._maybe_empty_lines(current_line)
1136 before -= self.previous_after
1137 self.previous_after = after
1138 self.previous_line = current_line
1139 return before, after
1141 def _maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1143 if current_line.depth == 0:
1144 max_allowed = 1 if self.is_pyi else 2
1145 if current_line.leaves:
1146 # Consume the first leaf's extra newlines.
1147 first_leaf = current_line.leaves[0]
1148 before = first_leaf.prefix.count("\n")
1149 before = min(before, max_allowed)
1150 first_leaf.prefix = ""
1153 depth = current_line.depth
1154 while self.previous_defs and self.previous_defs[-1] >= depth:
1155 self.previous_defs.pop()
1157 before = 0 if depth else 1
1159 before = 1 if depth else 2
1160 is_decorator = current_line.is_decorator
1161 if is_decorator or current_line.is_def or current_line.is_class:
1162 if not is_decorator:
1163 self.previous_defs.append(depth)
1164 if self.previous_line is None:
1165 # Don't insert empty lines before the first line in the file.
1168 if self.previous_line.is_decorator:
1172 self.previous_line.is_comment
1173 and self.previous_line.depth == current_line.depth
1179 if self.previous_line.depth > current_line.depth:
1181 elif current_line.is_class or self.previous_line.is_class:
1182 if current_line.is_stub_class and self.previous_line.is_stub_class:
1190 if current_line.depth and newlines:
1196 and self.previous_line.is_import
1197 and not current_line.is_import
1198 and depth == self.previous_line.depth
1200 return (before or 1), 0
1206 class LineGenerator(Visitor[Line]):
1207 """Generates reformatted Line objects. Empty lines are not emitted.
1209 Note: destroys the tree it's visiting by mutating prefixes of its leaves
1210 in ways that will no longer stringify to valid Python code on the tree.
1212 is_pyi: bool = False
1213 current_line: Line = Factory(Line)
1214 remove_u_prefix: bool = False
1216 def line(self, indent: int = 0, type: Type[Line] = Line) -> Iterator[Line]:
1219 If the line is empty, only emit if it makes sense.
1220 If the line is too long, split it first and then generate.
1222 If any lines were generated, set up a new current_line.
1224 if not self.current_line:
1225 if self.current_line.__class__ == type:
1226 self.current_line.depth += indent
1228 self.current_line = type(depth=self.current_line.depth + indent)
1229 return # Line is empty, don't emit. Creating a new one unnecessary.
1231 complete_line = self.current_line
1232 self.current_line = type(depth=complete_line.depth + indent)
1235 def visit(self, node: LN) -> Iterator[Line]:
1236 """Main method to visit `node` and its children.
1238 Yields :class:`Line` objects.
1240 if isinstance(self.current_line, UnformattedLines):
1241 # File contained `# fmt: off`
1242 yield from self.visit_unformatted(node)
1245 yield from super().visit(node)
1247 def visit_default(self, node: LN) -> Iterator[Line]:
1248 """Default `visit_*()` implementation. Recurses to children of `node`."""
1249 if isinstance(node, Leaf):
1250 any_open_brackets = self.current_line.bracket_tracker.any_open_brackets()
1252 for comment in generate_comments(node):
1253 if any_open_brackets:
1254 # any comment within brackets is subject to splitting
1255 self.current_line.append(comment)
1256 elif comment.type == token.COMMENT:
1257 # regular trailing comment
1258 self.current_line.append(comment)
1259 yield from self.line()
1262 # regular standalone comment
1263 yield from self.line()
1265 self.current_line.append(comment)
1266 yield from self.line()
1268 except FormatOff as f_off:
1269 f_off.trim_prefix(node)
1270 yield from self.line(type=UnformattedLines)
1271 yield from self.visit(node)
1273 except FormatOn as f_on:
1274 # This only happens here if somebody says "fmt: on" multiple
1276 f_on.trim_prefix(node)
1277 yield from self.visit_default(node)
1280 normalize_prefix(node, inside_brackets=any_open_brackets)
1281 if node.type == token.STRING:
1282 normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
1283 normalize_string_quotes(node)
1284 if node.type not in WHITESPACE:
1285 self.current_line.append(node)
1286 yield from super().visit_default(node)
1288 def visit_INDENT(self, node: Node) -> Iterator[Line]:
1289 """Increase indentation level, maybe yield a line."""
1290 # In blib2to3 INDENT never holds comments.
1291 yield from self.line(+1)
1292 yield from self.visit_default(node)
1294 def visit_DEDENT(self, node: Node) -> Iterator[Line]:
1295 """Decrease indentation level, maybe yield a line."""
1296 # The current line might still wait for trailing comments. At DEDENT time
1297 # there won't be any (they would be prefixes on the preceding NEWLINE).
1298 # Emit the line then.
1299 yield from self.line()
1301 # While DEDENT has no value, its prefix may contain standalone comments
1302 # that belong to the current indentation level. Get 'em.
1303 yield from self.visit_default(node)
1305 # Finally, emit the dedent.
1306 yield from self.line(-1)
1309 self, node: Node, keywords: Set[str], parens: Set[str]
1310 ) -> Iterator[Line]:
1311 """Visit a statement.
1313 This implementation is shared for `if`, `while`, `for`, `try`, `except`,
1314 `def`, `with`, `class`, `assert` and assignments.
1316 The relevant Python language `keywords` for a given statement will be
1317 NAME leaves within it. This methods puts those on a separate line.
1319 `parens` holds a set of string leaf values immediately after which
1320 invisible parens should be put.
1322 normalize_invisible_parens(node, parens_after=parens)
1323 for child in node.children:
1324 if child.type == token.NAME and child.value in keywords: # type: ignore
1325 yield from self.line()
1327 yield from self.visit(child)
1329 def visit_suite(self, node: Node) -> Iterator[Line]:
1330 """Visit a suite."""
1331 if self.is_pyi and is_stub_suite(node):
1332 yield from self.visit(node.children[2])
1334 yield from self.visit_default(node)
1336 def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
1337 """Visit a statement without nested statements."""
1338 is_suite_like = node.parent and node.parent.type in STATEMENT
1340 if self.is_pyi and is_stub_body(node):
1341 yield from self.visit_default(node)
1343 yield from self.line(+1)
1344 yield from self.visit_default(node)
1345 yield from self.line(-1)
1348 if not self.is_pyi or not node.parent or not is_stub_suite(node.parent):
1349 yield from self.line()
1350 yield from self.visit_default(node)
1352 def visit_async_stmt(self, node: Node) -> Iterator[Line]:
1353 """Visit `async def`, `async for`, `async with`."""
1354 yield from self.line()
1356 children = iter(node.children)
1357 for child in children:
1358 yield from self.visit(child)
1360 if child.type == token.ASYNC:
1363 internal_stmt = next(children)
1364 for child in internal_stmt.children:
1365 yield from self.visit(child)
1367 def visit_decorators(self, node: Node) -> Iterator[Line]:
1368 """Visit decorators."""
1369 for child in node.children:
1370 yield from self.line()
1371 yield from self.visit(child)
1373 def visit_import_from(self, node: Node) -> Iterator[Line]:
1374 """Visit import_from and maybe put invisible parentheses.
1376 This is separate from `visit_stmt` because import statements don't
1377 support arbitrary atoms and thus handling of parentheses is custom.
1380 for index, child in enumerate(node.children):
1382 if child.type == token.LPAR:
1383 # make parentheses invisible
1384 child.value = "" # type: ignore
1385 node.children[-1].value = "" # type: ignore
1387 # insert invisible parentheses
1388 node.insert_child(index, Leaf(token.LPAR, ""))
1389 node.append_child(Leaf(token.RPAR, ""))
1393 child.type == token.NAME and child.value == "import" # type: ignore
1396 for child in node.children:
1397 yield from self.visit(child)
1399 def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
1400 """Remove a semicolon and put the other statement on a separate line."""
1401 yield from self.line()
1403 def visit_ENDMARKER(self, leaf: Leaf) -> Iterator[Line]:
1404 """End of file. Process outstanding comments and end with a newline."""
1405 yield from self.visit_default(leaf)
1406 yield from self.line()
1408 def visit_unformatted(self, node: LN) -> Iterator[Line]:
1409 """Used when file contained a `# fmt: off`."""
1410 if isinstance(node, Node):
1411 for child in node.children:
1412 yield from self.visit(child)
1416 self.current_line.append(node)
1417 except FormatOn as f_on:
1418 f_on.trim_prefix(node)
1419 yield from self.line()
1420 yield from self.visit(node)
1422 if node.type == token.ENDMARKER:
1423 # somebody decided not to put a final `# fmt: on`
1424 yield from self.line()
1426 def __attrs_post_init__(self) -> None:
1427 """You are in a twisty little maze of passages."""
1430 self.visit_assert_stmt = partial(v, keywords={"assert"}, parens={"assert", ","})
1431 self.visit_if_stmt = partial(
1432 v, keywords={"if", "else", "elif"}, parens={"if", "elif"}
1434 self.visit_while_stmt = partial(v, keywords={"while", "else"}, parens={"while"})
1435 self.visit_for_stmt = partial(v, keywords={"for", "else"}, parens={"for", "in"})
1436 self.visit_try_stmt = partial(
1437 v, keywords={"try", "except", "else", "finally"}, parens=Ø
1439 self.visit_except_clause = partial(v, keywords={"except"}, parens=Ø)
1440 self.visit_with_stmt = partial(v, keywords={"with"}, parens=Ø)
1441 self.visit_funcdef = partial(v, keywords={"def"}, parens=Ø)
1442 self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
1443 self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
1444 self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
1445 self.visit_async_funcdef = self.visit_async_stmt
1446 self.visit_decorated = self.visit_decorators
1449 IMPLICIT_TUPLE = {syms.testlist, syms.testlist_star_expr, syms.exprlist}
1450 BRACKET = {token.LPAR: token.RPAR, token.LSQB: token.RSQB, token.LBRACE: token.RBRACE}
1451 OPENING_BRACKETS = set(BRACKET.keys())
1452 CLOSING_BRACKETS = set(BRACKET.values())
1453 BRACKETS = OPENING_BRACKETS | CLOSING_BRACKETS
1454 ALWAYS_NO_SPACE = CLOSING_BRACKETS | {token.COMMA, STANDALONE_COMMENT}
1457 def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str: # noqa C901
1458 """Return whitespace prefix if needed for the given `leaf`.
1460 `complex_subscript` signals whether the given leaf is part of a subscription
1461 which has non-trivial arguments, like arithmetic expressions or function calls.
1469 if t in ALWAYS_NO_SPACE:
1472 if t == token.COMMENT:
1475 assert p is not None, f"INTERNAL ERROR: hand-made leaf without parent: {leaf!r}"
1476 if t == token.COLON and p.type not in {
1483 prev = leaf.prev_sibling
1485 prevp = preceding_leaf(p)
1486 if not prevp or prevp.type in OPENING_BRACKETS:
1489 if t == token.COLON:
1490 if prevp.type == token.COLON:
1493 elif prevp.type != token.COMMA and not complex_subscript:
1498 if prevp.type == token.EQUAL:
1500 if prevp.parent.type in {
1508 elif prevp.parent.type == syms.typedargslist:
1509 # A bit hacky: if the equal sign has whitespace, it means we
1510 # previously found it's a typed argument. So, we're using
1514 elif prevp.type in STARS:
1515 if is_vararg(prevp, within=VARARGS_PARENTS | UNPACKING_PARENTS):
1518 elif prevp.type == token.COLON:
1519 if prevp.parent and prevp.parent.type in {syms.subscript, syms.sliceop}:
1520 return SPACE if complex_subscript else NO
1524 and prevp.parent.type == syms.factor
1525 and prevp.type in MATH_OPERATORS
1530 prevp.type == token.RIGHTSHIFT
1532 and prevp.parent.type == syms.shift_expr
1533 and prevp.prev_sibling
1534 and prevp.prev_sibling.type == token.NAME
1535 and prevp.prev_sibling.value == "print" # type: ignore
1537 # Python 2 print chevron
1540 elif prev.type in OPENING_BRACKETS:
1543 if p.type in {syms.parameters, syms.arglist}:
1544 # untyped function signatures or calls
1545 if not prev or prev.type != token.COMMA:
1548 elif p.type == syms.varargslist:
1550 if prev and prev.type != token.COMMA:
1553 elif p.type == syms.typedargslist:
1554 # typed function signatures
1558 if t == token.EQUAL:
1559 if prev.type != syms.tname:
1562 elif prev.type == token.EQUAL:
1563 # A bit hacky: if the equal sign has whitespace, it means we
1564 # previously found it's a typed argument. So, we're using that, too.
1567 elif prev.type != token.COMMA:
1570 elif p.type == syms.tname:
1573 prevp = preceding_leaf(p)
1574 if not prevp or prevp.type != token.COMMA:
1577 elif p.type == syms.trailer:
1578 # attributes and calls
1579 if t == token.LPAR or t == token.RPAR:
1584 prevp = preceding_leaf(p)
1585 if not prevp or prevp.type != token.NUMBER:
1588 elif t == token.LSQB:
1591 elif prev.type != token.COMMA:
1594 elif p.type == syms.argument:
1596 if t == token.EQUAL:
1600 prevp = preceding_leaf(p)
1601 if not prevp or prevp.type == token.LPAR:
1604 elif prev.type in {token.EQUAL} | STARS:
1607 elif p.type == syms.decorator:
1611 elif p.type == syms.dotted_name:
1615 prevp = preceding_leaf(p)
1616 if not prevp or prevp.type == token.AT or prevp.type == token.DOT:
1619 elif p.type == syms.classdef:
1623 if prev and prev.type == token.LPAR:
1626 elif p.type in {syms.subscript, syms.sliceop}:
1629 assert p.parent is not None, "subscripts are always parented"
1630 if p.parent.type == syms.subscriptlist:
1635 elif not complex_subscript:
1638 elif p.type == syms.atom:
1639 if prev and t == token.DOT:
1640 # dots, but not the first one.
1643 elif p.type == syms.dictsetmaker:
1645 if prev and prev.type == token.DOUBLESTAR:
1648 elif p.type in {syms.factor, syms.star_expr}:
1651 prevp = preceding_leaf(p)
1652 if not prevp or prevp.type in OPENING_BRACKETS:
1655 prevp_parent = prevp.parent
1656 assert prevp_parent is not None
1657 if prevp.type == token.COLON and prevp_parent.type in {
1663 elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
1666 elif t == token.NAME or t == token.NUMBER:
1669 elif p.type == syms.import_from:
1671 if prev and prev.type == token.DOT:
1674 elif t == token.NAME:
1678 if prev and prev.type == token.DOT:
1681 elif p.type == syms.sliceop:
1687 def preceding_leaf(node: Optional[LN]) -> Optional[Leaf]:
1688 """Return the first leaf that precedes `node`, if any."""
1690 res = node.prev_sibling
1692 if isinstance(res, Leaf):
1696 return list(res.leaves())[-1]
1705 def child_towards(ancestor: Node, descendant: LN) -> Optional[LN]:
1706 """Return the child of `ancestor` that contains `descendant`."""
1707 node: Optional[LN] = descendant
1708 while node and node.parent != ancestor:
1713 def is_split_after_delimiter(leaf: Leaf, previous: Leaf = None) -> int:
1714 """Return the priority of the `leaf` delimiter, given a line break after it.
1716 The delimiter priorities returned here are from those delimiters that would
1717 cause a line break after themselves.
1719 Higher numbers are higher priority.
1721 if leaf.type == token.COMMA:
1722 return COMMA_PRIORITY
1727 def is_split_before_delimiter(leaf: Leaf, previous: Leaf = None) -> int:
1728 """Return the priority of the `leaf` delimiter, given a line before after it.
1730 The delimiter priorities returned here are from those delimiters that would
1731 cause a line break before themselves.
1733 Higher numbers are higher priority.
1735 if is_vararg(leaf, within=VARARGS_PARENTS | UNPACKING_PARENTS):
1736 # * and ** might also be MATH_OPERATORS but in this case they are not.
1737 # Don't treat them as a delimiter.
1741 leaf.type == token.DOT
1743 and leaf.parent.type not in {syms.import_from, syms.dotted_name}
1744 and (previous is None or previous.type in CLOSING_BRACKETS)
1749 leaf.type in MATH_OPERATORS
1751 and leaf.parent.type not in {syms.factor, syms.star_expr}
1753 return MATH_PRIORITIES[leaf.type]
1755 if leaf.type in COMPARATORS:
1756 return COMPARATOR_PRIORITY
1759 leaf.type == token.STRING
1760 and previous is not None
1761 and previous.type == token.STRING
1763 return STRING_PRIORITY
1765 if leaf.type != token.NAME:
1771 and leaf.parent.type in {syms.comp_for, syms.old_comp_for}
1773 return COMPREHENSION_PRIORITY
1778 and leaf.parent.type in {syms.comp_if, syms.old_comp_if}
1780 return COMPREHENSION_PRIORITY
1782 if leaf.value in {"if", "else"} and leaf.parent and leaf.parent.type == syms.test:
1783 return TERNARY_PRIORITY
1785 if leaf.value == "is":
1786 return COMPARATOR_PRIORITY
1791 and leaf.parent.type in {syms.comp_op, syms.comparison}
1793 previous is not None
1794 and previous.type == token.NAME
1795 and previous.value == "not"
1798 return COMPARATOR_PRIORITY
1803 and leaf.parent.type == syms.comp_op
1805 previous is not None
1806 and previous.type == token.NAME
1807 and previous.value == "is"
1810 return COMPARATOR_PRIORITY
1812 if leaf.value in LOGIC_OPERATORS and leaf.parent:
1813 return LOGIC_PRIORITY
1818 def generate_comments(leaf: Leaf) -> Iterator[Leaf]:
1819 """Clean the prefix of the `leaf` and generate comments from it, if any.
1821 Comments in lib2to3 are shoved into the whitespace prefix. This happens
1822 in `pgen2/driver.py:Driver.parse_tokens()`. This was a brilliant implementation
1823 move because it does away with modifying the grammar to include all the
1824 possible places in which comments can be placed.
1826 The sad consequence for us though is that comments don't "belong" anywhere.
1827 This is why this function generates simple parentless Leaf objects for
1828 comments. We simply don't know what the correct parent should be.
1830 No matter though, we can live without this. We really only need to
1831 differentiate between inline and standalone comments. The latter don't
1832 share the line with any code.
1834 Inline comments are emitted as regular token.COMMENT leaves. Standalone
1835 are emitted with a fake STANDALONE_COMMENT token identifier.
1846 for index, line in enumerate(p.split("\n")):
1847 consumed += len(line) + 1 # adding the length of the split '\n'
1848 line = line.lstrip()
1851 if not line.startswith("#"):
1854 if index == 0 and leaf.type != token.ENDMARKER:
1855 comment_type = token.COMMENT # simple trailing comment
1857 comment_type = STANDALONE_COMMENT
1858 comment = make_comment(line)
1859 yield Leaf(comment_type, comment, prefix="\n" * nlines)
1861 if comment in {"# fmt: on", "# yapf: enable"}:
1862 raise FormatOn(consumed)
1864 if comment in {"# fmt: off", "# yapf: disable"}:
1865 if comment_type == STANDALONE_COMMENT:
1866 raise FormatOff(consumed)
1868 prev = preceding_leaf(leaf)
1869 if not prev or prev.type in WHITESPACE: # standalone comment in disguise
1870 raise FormatOff(consumed)
1875 def make_comment(content: str) -> str:
1876 """Return a consistently formatted comment from the given `content` string.
1878 All comments (except for "##", "#!", "#:") should have a single space between
1879 the hash sign and the content.
1881 If `content` didn't start with a hash sign, one is provided.
1883 content = content.rstrip()
1887 if content[0] == "#":
1888 content = content[1:]
1889 if content and content[0] not in " !:#":
1890 content = " " + content
1891 return "#" + content
1895 line: Line, line_length: int, inner: bool = False, py36: bool = False
1896 ) -> Iterator[Line]:
1897 """Split a `line` into potentially many lines.
1899 They should fit in the allotted `line_length` but might not be able to.
1900 `inner` signifies that there were a pair of brackets somewhere around the
1901 current `line`, possibly transitively. This means we can fallback to splitting
1902 by delimiters if the LHS/RHS don't yield any results.
1904 If `py36` is True, splitting may generate syntax that is only compatible
1905 with Python 3.6 and later.
1907 if isinstance(line, UnformattedLines) or line.is_comment:
1911 line_str = str(line).strip("\n")
1912 if not line.should_explode and is_line_short_enough(
1913 line, line_length=line_length, line_str=line_str
1918 split_funcs: List[SplitFunc]
1920 split_funcs = [left_hand_split]
1923 def rhs(line: Line, py36: bool = False) -> Iterator[Line]:
1924 for omit in generate_trailers_to_omit(line, line_length):
1925 lines = list(right_hand_split(line, py36, omit=omit))
1926 if is_line_short_enough(lines[0], line_length=line_length):
1930 # All splits failed, best effort split with no omits.
1931 yield from right_hand_split(line, py36)
1933 if line.inside_brackets:
1934 split_funcs = [delimiter_split, standalone_comment_split, rhs]
1937 for split_func in split_funcs:
1938 # We are accumulating lines in `result` because we might want to abort
1939 # mission and return the original line in the end, or attempt a different
1941 result: List[Line] = []
1943 for l in split_func(line, py36):
1944 if str(l).strip("\n") == line_str:
1945 raise CannotSplit("Split function returned an unchanged result")
1948 split_line(l, line_length=line_length, inner=True, py36=py36)
1950 except CannotSplit as cs:
1961 def left_hand_split(line: Line, py36: bool = False) -> Iterator[Line]:
1962 """Split line into many lines, starting with the first matching bracket pair.
1964 Note: this usually looks weird, only use this for function definitions.
1965 Prefer RHS otherwise. This is why this function is not symmetrical with
1966 :func:`right_hand_split` which also handles optional parentheses.
1968 head = Line(depth=line.depth)
1969 body = Line(depth=line.depth + 1, inside_brackets=True)
1970 tail = Line(depth=line.depth)
1971 tail_leaves: List[Leaf] = []
1972 body_leaves: List[Leaf] = []
1973 head_leaves: List[Leaf] = []
1974 current_leaves = head_leaves
1975 matching_bracket = None
1976 for leaf in line.leaves:
1978 current_leaves is body_leaves
1979 and leaf.type in CLOSING_BRACKETS
1980 and leaf.opening_bracket is matching_bracket
1982 current_leaves = tail_leaves if body_leaves else head_leaves
1983 current_leaves.append(leaf)
1984 if current_leaves is head_leaves:
1985 if leaf.type in OPENING_BRACKETS:
1986 matching_bracket = leaf
1987 current_leaves = body_leaves
1988 # Since body is a new indent level, remove spurious leading whitespace.
1990 normalize_prefix(body_leaves[0], inside_brackets=True)
1991 # Build the new lines.
1992 for result, leaves in (head, head_leaves), (body, body_leaves), (tail, tail_leaves):
1994 result.append(leaf, preformatted=True)
1995 for comment_after in line.comments_after(leaf):
1996 result.append(comment_after, preformatted=True)
1997 bracket_split_succeeded_or_raise(head, body, tail)
1998 for result in (head, body, tail):
2003 def right_hand_split(
2004 line: Line, py36: bool = False, omit: Collection[LeafID] = ()
2005 ) -> Iterator[Line]:
2006 """Split line into many lines, starting with the last matching bracket pair.
2008 If the split was by optional parentheses, attempt splitting without them, too.
2009 `omit` is a collection of closing bracket IDs that shouldn't be considered for
2012 Note: running this function modifies `bracket_depth` on the leaves of `line`.
2014 head = Line(depth=line.depth)
2015 body = Line(depth=line.depth + 1, inside_brackets=True)
2016 tail = Line(depth=line.depth)
2017 tail_leaves: List[Leaf] = []
2018 body_leaves: List[Leaf] = []
2019 head_leaves: List[Leaf] = []
2020 current_leaves = tail_leaves
2021 opening_bracket = None
2022 closing_bracket = None
2023 for leaf in reversed(line.leaves):
2024 if current_leaves is body_leaves:
2025 if leaf is opening_bracket:
2026 current_leaves = head_leaves if body_leaves else tail_leaves
2027 current_leaves.append(leaf)
2028 if current_leaves is tail_leaves:
2029 if leaf.type in CLOSING_BRACKETS and id(leaf) not in omit:
2030 opening_bracket = leaf.opening_bracket
2031 closing_bracket = leaf
2032 current_leaves = body_leaves
2033 tail_leaves.reverse()
2034 body_leaves.reverse()
2035 head_leaves.reverse()
2036 # Since body is a new indent level, remove spurious leading whitespace.
2038 normalize_prefix(body_leaves[0], inside_brackets=True)
2040 # No `head` means the split failed. Either `tail` has all content or
2041 # the matching `opening_bracket` wasn't available on `line` anymore.
2042 raise CannotSplit("No brackets found")
2044 # Build the new lines.
2045 for result, leaves in (head, head_leaves), (body, body_leaves), (tail, tail_leaves):
2047 result.append(leaf, preformatted=True)
2048 for comment_after in line.comments_after(leaf):
2049 result.append(comment_after, preformatted=True)
2050 bracket_split_succeeded_or_raise(head, body, tail)
2051 assert opening_bracket and closing_bracket
2053 # the opening bracket is an optional paren
2054 opening_bracket.type == token.LPAR
2055 and not opening_bracket.value
2056 # the closing bracket is an optional paren
2057 and closing_bracket.type == token.RPAR
2058 and not closing_bracket.value
2059 # there are no standalone comments in the body
2060 and not line.contains_standalone_comments(0)
2061 # and it's not an import (optional parens are the only thing we can split
2062 # on in this case; attempting a split without them is a waste of time)
2063 and not line.is_import
2065 omit = {id(closing_bracket), *omit}
2066 delimiter_count = body.bracket_tracker.delimiter_count_with_priority()
2068 delimiter_count == 0
2069 or delimiter_count == 1
2071 body.leaves[0].type in OPENING_BRACKETS
2072 or body.leaves[-1].type in CLOSING_BRACKETS
2076 yield from right_hand_split(line, py36=py36, omit=omit)
2081 ensure_visible(opening_bracket)
2082 ensure_visible(closing_bracket)
2083 body.should_explode = should_explode(body, opening_bracket)
2084 for result in (head, body, tail):
2089 def bracket_split_succeeded_or_raise(head: Line, body: Line, tail: Line) -> None:
2090 """Raise :exc:`CannotSplit` if the last left- or right-hand split failed.
2092 Do nothing otherwise.
2094 A left- or right-hand split is based on a pair of brackets. Content before
2095 (and including) the opening bracket is left on one line, content inside the
2096 brackets is put on a separate line, and finally content starting with and
2097 following the closing bracket is put on a separate line.
2099 Those are called `head`, `body`, and `tail`, respectively. If the split
2100 produced the same line (all content in `head`) or ended up with an empty `body`
2101 and the `tail` is just the closing bracket, then it's considered failed.
2103 tail_len = len(str(tail).strip())
2106 raise CannotSplit("Splitting brackets produced the same line")
2110 f"Splitting brackets on an empty body to save "
2111 f"{tail_len} characters is not worth it"
2115 def dont_increase_indentation(split_func: SplitFunc) -> SplitFunc:
2116 """Normalize prefix of the first leaf in every line returned by `split_func`.
2118 This is a decorator over relevant split functions.
2122 def split_wrapper(line: Line, py36: bool = False) -> Iterator[Line]:
2123 for l in split_func(line, py36):
2124 normalize_prefix(l.leaves[0], inside_brackets=True)
2127 return split_wrapper
2130 @dont_increase_indentation
2131 def delimiter_split(line: Line, py36: bool = False) -> Iterator[Line]:
2132 """Split according to delimiters of the highest priority.
2134 If `py36` is True, the split will add trailing commas also in function
2135 signatures that contain `*` and `**`.
2138 last_leaf = line.leaves[-1]
2140 raise CannotSplit("Line empty")
2142 bt = line.bracket_tracker
2144 delimiter_priority = bt.max_delimiter_priority(exclude={id(last_leaf)})
2146 raise CannotSplit("No delimiters found")
2148 if delimiter_priority == DOT_PRIORITY:
2149 if bt.delimiter_count_with_priority(delimiter_priority) == 1:
2150 raise CannotSplit("Splitting a single attribute from its owner looks wrong")
2152 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2153 lowest_depth = sys.maxsize
2154 trailing_comma_safe = True
2156 def append_to_line(leaf: Leaf) -> Iterator[Line]:
2157 """Append `leaf` to current line or to new line if appending impossible."""
2158 nonlocal current_line
2160 current_line.append_safe(leaf, preformatted=True)
2161 except ValueError as ve:
2164 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2165 current_line.append(leaf)
2167 for index, leaf in enumerate(line.leaves):
2168 yield from append_to_line(leaf)
2170 for comment_after in line.comments_after(leaf, index):
2171 yield from append_to_line(comment_after)
2173 lowest_depth = min(lowest_depth, leaf.bracket_depth)
2174 if leaf.bracket_depth == lowest_depth and is_vararg(
2175 leaf, within=VARARGS_PARENTS
2177 trailing_comma_safe = trailing_comma_safe and py36
2178 leaf_priority = bt.delimiters.get(id(leaf))
2179 if leaf_priority == delimiter_priority:
2182 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2186 and delimiter_priority == COMMA_PRIORITY
2187 and current_line.leaves[-1].type != token.COMMA
2188 and current_line.leaves[-1].type != STANDALONE_COMMENT
2190 current_line.append(Leaf(token.COMMA, ","))
2194 @dont_increase_indentation
2195 def standalone_comment_split(line: Line, py36: bool = False) -> Iterator[Line]:
2196 """Split standalone comments from the rest of the line."""
2197 if not line.contains_standalone_comments(0):
2198 raise CannotSplit("Line does not have any standalone comments")
2200 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2202 def append_to_line(leaf: Leaf) -> Iterator[Line]:
2203 """Append `leaf` to current line or to new line if appending impossible."""
2204 nonlocal current_line
2206 current_line.append_safe(leaf, preformatted=True)
2207 except ValueError as ve:
2210 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2211 current_line.append(leaf)
2213 for index, leaf in enumerate(line.leaves):
2214 yield from append_to_line(leaf)
2216 for comment_after in line.comments_after(leaf, index):
2217 yield from append_to_line(comment_after)
2223 def is_import(leaf: Leaf) -> bool:
2224 """Return True if the given leaf starts an import statement."""
2231 (v == "import" and p and p.type == syms.import_name)
2232 or (v == "from" and p and p.type == syms.import_from)
2237 def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
2238 """Leave existing extra newlines if not `inside_brackets`. Remove everything
2241 Note: don't use backslashes for formatting or you'll lose your voting rights.
2243 if not inside_brackets:
2244 spl = leaf.prefix.split("#")
2245 if "\\" not in spl[0]:
2246 nl_count = spl[-1].count("\n")
2249 leaf.prefix = "\n" * nl_count
2255 def normalize_string_prefix(leaf: Leaf, remove_u_prefix: bool = False) -> None:
2256 """Make all string prefixes lowercase.
2258 If remove_u_prefix is given, also removes any u prefix from the string.
2260 Note: Mutates its argument.
2262 match = re.match(r"^([furbFURB]*)(.*)$", leaf.value, re.DOTALL)
2263 assert match is not None, f"failed to match string {leaf.value!r}"
2264 orig_prefix = match.group(1)
2265 new_prefix = orig_prefix.lower()
2267 new_prefix = new_prefix.replace("u", "")
2268 leaf.value = f"{new_prefix}{match.group(2)}"
2271 def normalize_string_quotes(leaf: Leaf) -> None:
2272 """Prefer double quotes but only if it doesn't cause more escaping.
2274 Adds or removes backslashes as appropriate. Doesn't parse and fix
2275 strings nested in f-strings (yet).
2277 Note: Mutates its argument.
2279 value = leaf.value.lstrip("furbFURB")
2280 if value[:3] == '"""':
2283 elif value[:3] == "'''":
2286 elif value[0] == '"':
2292 first_quote_pos = leaf.value.find(orig_quote)
2293 if first_quote_pos == -1:
2294 return # There's an internal error
2296 prefix = leaf.value[:first_quote_pos]
2297 unescaped_new_quote = re.compile(rf"(([^\\]|^)(\\\\)*){new_quote}")
2298 escaped_new_quote = re.compile(rf"([^\\]|^)\\(\\\\)*{new_quote}")
2299 escaped_orig_quote = re.compile(rf"([^\\]|^)\\(\\\\)*{orig_quote}")
2300 body = leaf.value[first_quote_pos + len(orig_quote) : -len(orig_quote)]
2301 if "r" in prefix.casefold():
2302 if unescaped_new_quote.search(body):
2303 # There's at least one unescaped new_quote in this raw string
2304 # so converting is impossible
2307 # Do not introduce or remove backslashes in raw strings
2310 # remove unnecessary quotes
2311 new_body = sub_twice(escaped_new_quote, rf"\1\2{new_quote}", body)
2312 if body != new_body:
2313 # Consider the string without unnecessary quotes as the original
2315 leaf.value = f"{prefix}{orig_quote}{body}{orig_quote}"
2316 new_body = sub_twice(escaped_orig_quote, rf"\1\2{orig_quote}", new_body)
2317 new_body = sub_twice(unescaped_new_quote, rf"\1\\{new_quote}", new_body)
2318 if new_quote == '"""' and new_body[-1] == '"':
2320 new_body = new_body[:-1] + '\\"'
2321 orig_escape_count = body.count("\\")
2322 new_escape_count = new_body.count("\\")
2323 if new_escape_count > orig_escape_count:
2324 return # Do not introduce more escaping
2326 if new_escape_count == orig_escape_count and orig_quote == '"':
2327 return # Prefer double quotes
2329 leaf.value = f"{prefix}{new_quote}{new_body}{new_quote}"
2332 def normalize_invisible_parens(node: Node, parens_after: Set[str]) -> None:
2333 """Make existing optional parentheses invisible or create new ones.
2335 `parens_after` is a set of string leaf values immeditely after which parens
2338 Standardizes on visible parentheses for single-element tuples, and keeps
2339 existing visible parentheses for other tuples and generator expressions.
2342 for child in list(node.children):
2344 if child.type == syms.atom:
2345 maybe_make_parens_invisible_in_atom(child)
2346 elif is_one_tuple(child):
2347 # wrap child in visible parentheses
2348 lpar = Leaf(token.LPAR, "(")
2349 rpar = Leaf(token.RPAR, ")")
2350 index = child.remove() or 0
2351 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2352 elif not (isinstance(child, Leaf) and is_multiline_string(child)):
2353 # wrap child in invisible parentheses
2354 lpar = Leaf(token.LPAR, "")
2355 rpar = Leaf(token.RPAR, "")
2356 index = child.remove() or 0
2357 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2359 check_lpar = isinstance(child, Leaf) and child.value in parens_after
2362 def maybe_make_parens_invisible_in_atom(node: LN) -> bool:
2363 """If it's safe, make the parens in the atom `node` invisible, recusively."""
2365 node.type != syms.atom
2366 or is_empty_tuple(node)
2367 or is_one_tuple(node)
2369 or max_delimiter_priority_in_atom(node) >= COMMA_PRIORITY
2373 first = node.children[0]
2374 last = node.children[-1]
2375 if first.type == token.LPAR and last.type == token.RPAR:
2376 # make parentheses invisible
2377 first.value = "" # type: ignore
2378 last.value = "" # type: ignore
2379 if len(node.children) > 1:
2380 maybe_make_parens_invisible_in_atom(node.children[1])
2386 def is_empty_tuple(node: LN) -> bool:
2387 """Return True if `node` holds an empty tuple."""
2389 node.type == syms.atom
2390 and len(node.children) == 2
2391 and node.children[0].type == token.LPAR
2392 and node.children[1].type == token.RPAR
2396 def is_one_tuple(node: LN) -> bool:
2397 """Return True if `node` holds a tuple with one element, with or without parens."""
2398 if node.type == syms.atom:
2399 if len(node.children) != 3:
2402 lpar, gexp, rpar = node.children
2404 lpar.type == token.LPAR
2405 and gexp.type == syms.testlist_gexp
2406 and rpar.type == token.RPAR
2410 return len(gexp.children) == 2 and gexp.children[1].type == token.COMMA
2413 node.type in IMPLICIT_TUPLE
2414 and len(node.children) == 2
2415 and node.children[1].type == token.COMMA
2419 def is_yield(node: LN) -> bool:
2420 """Return True if `node` holds a `yield` or `yield from` expression."""
2421 if node.type == syms.yield_expr:
2424 if node.type == token.NAME and node.value == "yield": # type: ignore
2427 if node.type != syms.atom:
2430 if len(node.children) != 3:
2433 lpar, expr, rpar = node.children
2434 if lpar.type == token.LPAR and rpar.type == token.RPAR:
2435 return is_yield(expr)
2440 def is_vararg(leaf: Leaf, within: Set[NodeType]) -> bool:
2441 """Return True if `leaf` is a star or double star in a vararg or kwarg.
2443 If `within` includes VARARGS_PARENTS, this applies to function signatures.
2444 If `within` includes UNPACKING_PARENTS, it applies to right hand-side
2445 extended iterable unpacking (PEP 3132) and additional unpacking
2446 generalizations (PEP 448).
2448 if leaf.type not in STARS or not leaf.parent:
2452 if p.type == syms.star_expr:
2453 # Star expressions are also used as assignment targets in extended
2454 # iterable unpacking (PEP 3132). See what its parent is instead.
2460 return p.type in within
2463 def is_multiline_string(leaf: Leaf) -> bool:
2464 """Return True if `leaf` is a multiline string that actually spans many lines."""
2465 value = leaf.value.lstrip("furbFURB")
2466 return value[:3] in {'"""', "'''"} and "\n" in value
2469 def is_stub_suite(node: Node) -> bool:
2470 """Return True if `node` is a suite with a stub body."""
2472 len(node.children) != 4
2473 or node.children[0].type != token.NEWLINE
2474 or node.children[1].type != token.INDENT
2475 or node.children[3].type != token.DEDENT
2479 return is_stub_body(node.children[2])
2482 def is_stub_body(node: LN) -> bool:
2483 """Return True if `node` is a simple statement containing an ellipsis."""
2484 if not isinstance(node, Node) or node.type != syms.simple_stmt:
2487 if len(node.children) != 2:
2490 child = node.children[0]
2492 child.type == syms.atom
2493 and len(child.children) == 3
2494 and all(leaf == Leaf(token.DOT, ".") for leaf in child.children)
2498 def max_delimiter_priority_in_atom(node: LN) -> int:
2499 """Return maximum delimiter priority inside `node`.
2501 This is specific to atoms with contents contained in a pair of parentheses.
2502 If `node` isn't an atom or there are no enclosing parentheses, returns 0.
2504 if node.type != syms.atom:
2507 first = node.children[0]
2508 last = node.children[-1]
2509 if not (first.type == token.LPAR and last.type == token.RPAR):
2512 bt = BracketTracker()
2513 for c in node.children[1:-1]:
2514 if isinstance(c, Leaf):
2517 for leaf in c.leaves():
2520 return bt.max_delimiter_priority()
2526 def ensure_visible(leaf: Leaf) -> None:
2527 """Make sure parentheses are visible.
2529 They could be invisible as part of some statements (see
2530 :func:`normalize_invible_parens` and :func:`visit_import_from`).
2532 if leaf.type == token.LPAR:
2534 elif leaf.type == token.RPAR:
2538 def should_explode(line: Line, opening_bracket: Leaf) -> bool:
2539 """Should `line` immediately be split with `delimiter_split()` after RHS?"""
2541 opening_bracket.parent
2542 and opening_bracket.parent.type in {syms.atom, syms.import_from}
2543 and opening_bracket.value in "[{("
2544 and line.bracket_tracker.delimiters
2545 and line.bracket_tracker.max_delimiter_priority() == COMMA_PRIORITY
2549 def is_python36(node: Node) -> bool:
2550 """Return True if the current file is using Python 3.6+ features.
2552 Currently looking for:
2554 - trailing commas after * or ** in function signatures and calls.
2556 for n in node.pre_order():
2557 if n.type == token.STRING:
2558 value_head = n.value[:2] # type: ignore
2559 if value_head in {'f"', 'F"', "f'", "F'", "rf", "fr", "RF", "FR"}:
2563 n.type in {syms.typedargslist, syms.arglist}
2565 and n.children[-1].type == token.COMMA
2567 for ch in n.children:
2568 if ch.type in STARS:
2571 if ch.type == syms.argument:
2572 for argch in ch.children:
2573 if argch.type in STARS:
2579 def generate_trailers_to_omit(line: Line, line_length: int) -> Iterator[Set[LeafID]]:
2580 """Generate sets of closing bracket IDs that should be omitted in a RHS.
2582 Brackets can be omitted if the entire trailer up to and including
2583 a preceding closing bracket fits in one line.
2585 Yielded sets are cumulative (contain results of previous yields, too). First
2589 omit: Set[LeafID] = set()
2592 length = 4 * line.depth
2593 opening_bracket = None
2594 closing_bracket = None
2595 optional_brackets: Set[LeafID] = set()
2596 inner_brackets: Set[LeafID] = set()
2597 for index, leaf in enumerate_reversed(line.leaves):
2598 length += len(leaf.prefix) + len(leaf.value)
2599 if length > line_length:
2602 comment: Optional[Leaf]
2603 for comment in line.comments_after(leaf, index):
2604 if "\n" in comment.prefix:
2605 break # Oops, standalone comment!
2607 length += len(comment.value)
2610 if comment is not None:
2611 break # There was a standalone comment, we can't continue.
2613 optional_brackets.discard(id(leaf))
2615 if leaf is opening_bracket:
2616 opening_bracket = None
2617 elif leaf.type in CLOSING_BRACKETS:
2618 inner_brackets.add(id(leaf))
2619 elif leaf.type in CLOSING_BRACKETS:
2621 optional_brackets.add(id(opening_bracket))
2624 if index > 0 and line.leaves[index - 1].type in OPENING_BRACKETS:
2625 # Empty brackets would fail a split so treat them as "inner"
2626 # brackets (e.g. only add them to the `omit` set if another
2627 # pair of brackets was good enough.
2628 inner_brackets.add(id(leaf))
2631 opening_bracket = leaf.opening_bracket
2633 omit.add(id(closing_bracket))
2634 omit.update(inner_brackets)
2635 inner_brackets.clear()
2637 closing_bracket = leaf
2640 def get_future_imports(node: Node) -> Set[str]:
2641 """Return a set of __future__ imports in the file."""
2643 for child in node.children:
2644 if child.type != syms.simple_stmt:
2646 first_child = child.children[0]
2647 if isinstance(first_child, Leaf):
2648 # Continue looking if we see a docstring; otherwise stop.
2650 len(child.children) == 2
2651 and first_child.type == token.STRING
2652 and child.children[1].type == token.NEWLINE
2657 elif first_child.type == syms.import_from:
2658 module_name = first_child.children[1]
2659 if not isinstance(module_name, Leaf) or module_name.value != "__future__":
2661 for import_from_child in first_child.children[3:]:
2662 if isinstance(import_from_child, Leaf):
2663 if import_from_child.type == token.NAME:
2664 imports.add(import_from_child.value)
2666 assert import_from_child.type == syms.import_as_names
2667 for leaf in import_from_child.children:
2668 if isinstance(leaf, Leaf) and leaf.type == token.NAME:
2669 imports.add(leaf.value)
2675 PYTHON_EXTENSIONS = {".py", ".pyi"}
2676 BLACKLISTED_DIRECTORIES = {
2689 def gen_python_files_in_dir(path: Path) -> Iterator[Path]:
2690 """Generate all files under `path` which aren't under BLACKLISTED_DIRECTORIES
2691 and have one of the PYTHON_EXTENSIONS.
2693 for child in path.iterdir():
2695 if child.name in BLACKLISTED_DIRECTORIES:
2698 yield from gen_python_files_in_dir(child)
2700 elif child.is_file() and child.suffix in PYTHON_EXTENSIONS:
2706 """Provides a reformatting counter. Can be rendered with `str(report)`."""
2709 change_count: int = 0
2711 failure_count: int = 0
2713 def done(self, src: Path, changed: Changed) -> None:
2714 """Increment the counter for successful reformatting. Write out a message."""
2715 if changed is Changed.YES:
2716 reformatted = "would reformat" if self.check else "reformatted"
2718 out(f"{reformatted} {src}")
2719 self.change_count += 1
2722 if changed is Changed.NO:
2723 msg = f"{src} already well formatted, good job."
2725 msg = f"{src} wasn't modified on disk since last run."
2726 out(msg, bold=False)
2727 self.same_count += 1
2729 def failed(self, src: Path, message: str) -> None:
2730 """Increment the counter for failed reformatting. Write out a message."""
2731 err(f"error: cannot format {src}: {message}")
2732 self.failure_count += 1
2735 def return_code(self) -> int:
2736 """Return the exit code that the app should use.
2738 This considers the current state of changed files and failures:
2739 - if there were any failures, return 123;
2740 - if any files were changed and --check is being used, return 1;
2741 - otherwise return 0.
2743 # According to http://tldp.org/LDP/abs/html/exitcodes.html starting with
2744 # 126 we have special returncodes reserved by the shell.
2745 if self.failure_count:
2748 elif self.change_count and self.check:
2753 def __str__(self) -> str:
2754 """Render a color report of the current state.
2756 Use `click.unstyle` to remove colors.
2759 reformatted = "would be reformatted"
2760 unchanged = "would be left unchanged"
2761 failed = "would fail to reformat"
2763 reformatted = "reformatted"
2764 unchanged = "left unchanged"
2765 failed = "failed to reformat"
2767 if self.change_count:
2768 s = "s" if self.change_count > 1 else ""
2770 click.style(f"{self.change_count} file{s} {reformatted}", bold=True)
2773 s = "s" if self.same_count > 1 else ""
2774 report.append(f"{self.same_count} file{s} {unchanged}")
2775 if self.failure_count:
2776 s = "s" if self.failure_count > 1 else ""
2778 click.style(f"{self.failure_count} file{s} {failed}", fg="red")
2780 return ", ".join(report) + "."
2783 def assert_equivalent(src: str, dst: str) -> None:
2784 """Raise AssertionError if `src` and `dst` aren't equivalent."""
2789 def _v(node: ast.AST, depth: int = 0) -> Iterator[str]:
2790 """Simple visitor generating strings to compare ASTs by content."""
2791 yield f"{' ' * depth}{node.__class__.__name__}("
2793 for field in sorted(node._fields):
2795 value = getattr(node, field)
2796 except AttributeError:
2799 yield f"{' ' * (depth+1)}{field}="
2801 if isinstance(value, list):
2803 if isinstance(item, ast.AST):
2804 yield from _v(item, depth + 2)
2806 elif isinstance(value, ast.AST):
2807 yield from _v(value, depth + 2)
2810 yield f"{' ' * (depth+2)}{value!r}, # {value.__class__.__name__}"
2812 yield f"{' ' * depth}) # /{node.__class__.__name__}"
2815 src_ast = ast.parse(src)
2816 except Exception as exc:
2817 major, minor = sys.version_info[:2]
2818 raise AssertionError(
2819 f"cannot use --safe with this file; failed to parse source file "
2820 f"with Python {major}.{minor}'s builtin AST. Re-run with --fast "
2821 f"or stop using deprecated Python 2 syntax. AST error message: {exc}"
2825 dst_ast = ast.parse(dst)
2826 except Exception as exc:
2827 log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
2828 raise AssertionError(
2829 f"INTERNAL ERROR: Black produced invalid code: {exc}. "
2830 f"Please report a bug on https://github.com/ambv/black/issues. "
2831 f"This invalid output might be helpful: {log}"
2834 src_ast_str = "\n".join(_v(src_ast))
2835 dst_ast_str = "\n".join(_v(dst_ast))
2836 if src_ast_str != dst_ast_str:
2837 log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst"))
2838 raise AssertionError(
2839 f"INTERNAL ERROR: Black produced code that is not equivalent to "
2841 f"Please report a bug on https://github.com/ambv/black/issues. "
2842 f"This diff might be helpful: {log}"
2846 def assert_stable(src: str, dst: str, line_length: int, is_pyi: bool = False) -> None:
2847 """Raise AssertionError if `dst` reformats differently the second time."""
2848 newdst = format_str(dst, line_length=line_length, is_pyi=is_pyi)
2851 diff(src, dst, "source", "first pass"),
2852 diff(dst, newdst, "first pass", "second pass"),
2854 raise AssertionError(
2855 f"INTERNAL ERROR: Black produced different code on the second pass "
2856 f"of the formatter. "
2857 f"Please report a bug on https://github.com/ambv/black/issues. "
2858 f"This diff might be helpful: {log}"
2862 def dump_to_file(*output: str) -> str:
2863 """Dump `output` to a temporary file. Return path to the file."""
2866 with tempfile.NamedTemporaryFile(
2867 mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
2869 for lines in output:
2871 if lines and lines[-1] != "\n":
2876 def diff(a: str, b: str, a_name: str, b_name: str) -> str:
2877 """Return a unified diff string between strings `a` and `b`."""
2880 a_lines = [line + "\n" for line in a.split("\n")]
2881 b_lines = [line + "\n" for line in b.split("\n")]
2883 difflib.unified_diff(a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5)
2887 def cancel(tasks: Iterable[asyncio.Task]) -> None:
2888 """asyncio signal handler that cancels all `tasks` and reports to stderr."""
2894 def shutdown(loop: BaseEventLoop) -> None:
2895 """Cancel all pending tasks on `loop`, wait for them, and close the loop."""
2897 # This part is borrowed from asyncio/runners.py in Python 3.7b2.
2898 to_cancel = [task for task in asyncio.Task.all_tasks(loop) if not task.done()]
2902 for task in to_cancel:
2904 loop.run_until_complete(
2905 asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
2908 # `concurrent.futures.Future` objects cannot be cancelled once they
2909 # are already running. There might be some when the `shutdown()` happened.
2910 # Silence their logger's spew about the event loop being closed.
2911 cf_logger = logging.getLogger("concurrent.futures")
2912 cf_logger.setLevel(logging.CRITICAL)
2916 def sub_twice(regex: Pattern[str], replacement: str, original: str) -> str:
2917 """Replace `regex` with `replacement` twice on `original`.
2919 This is used by string normalization to perform replaces on
2920 overlapping matches.
2922 return regex.sub(replacement, regex.sub(replacement, original))
2925 def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
2926 """Like `reversed(enumerate(sequence))` if that were possible."""
2927 index = len(sequence) - 1
2928 for element in reversed(sequence):
2929 yield (index, element)
2933 def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
2934 """Return True if `line` is no longer than `line_length`.
2936 Uses the provided `line_str` rendering, if any, otherwise computes a new one.
2939 line_str = str(line).strip("\n")
2941 len(line_str) <= line_length
2942 and "\n" not in line_str # multiline strings
2943 and not line.contains_standalone_comments()
2947 CACHE_DIR = Path(user_cache_dir("black", version=__version__))
2950 def get_cache_file(line_length: int) -> Path:
2951 return CACHE_DIR / f"cache.{line_length}.pickle"
2954 def read_cache(line_length: int) -> Cache:
2955 """Read the cache if it exists and is well formed.
2957 If it is not well formed, the call to write_cache later should resolve the issue.
2959 cache_file = get_cache_file(line_length)
2960 if not cache_file.exists():
2963 with cache_file.open("rb") as fobj:
2965 cache: Cache = pickle.load(fobj)
2966 except pickle.UnpicklingError:
2972 def get_cache_info(path: Path) -> CacheInfo:
2973 """Return the information used to check if a file is already formatted or not."""
2975 return stat.st_mtime, stat.st_size
2979 cache: Cache, sources: Iterable[Path]
2980 ) -> Tuple[List[Path], List[Path]]:
2981 """Split a list of paths into two.
2983 The first list contains paths of files that modified on disk or are not in the
2984 cache. The other list contains paths to non-modified files.
2989 if cache.get(src) != get_cache_info(src):
2996 def write_cache(cache: Cache, sources: List[Path], line_length: int) -> None:
2997 """Update the cache file."""
2998 cache_file = get_cache_file(line_length)
3000 if not CACHE_DIR.exists():
3001 CACHE_DIR.mkdir(parents=True)
3002 new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
3003 with cache_file.open("wb") as fobj:
3004 pickle.dump(new_cache, fobj, protocol=pickle.HIGHEST_PROTOCOL)
3009 if __name__ == "__main__":