All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
2 from asyncio.base_events import BaseEventLoop
3 from concurrent.futures import Executor, ProcessPoolExecutor
4 from datetime import datetime
6 from functools import lru_cache, partial, wraps
10 from multiprocessing import Manager, freeze_support
12 from pathlib import Path
39 from appdirs import user_cache_dir
40 from attr import dataclass, evolve, Factory
45 from blib2to3.pytree import Node, Leaf, type_repr
46 from blib2to3 import pygram, pytree
47 from blib2to3.pgen2 import driver, token
48 from blib2to3.pgen2.grammar import Grammar
49 from blib2to3.pgen2.parse import ParseError
52 __version__ = "18.9b0"
53 DEFAULT_LINE_LENGTH = 88
55 r"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|_build|buck-out|build|dist)/"
57 DEFAULT_INCLUDES = r"\.pyi?$"
58 CACHE_DIR = Path(user_cache_dir("black", version=__version__))
70 LN = Union[Leaf, Node]
71 SplitFunc = Callable[["Line", bool], Iterator["Line"]]
74 CacheInfo = Tuple[Timestamp, FileSize]
75 Cache = Dict[Path, CacheInfo]
76 out = partial(click.secho, bold=True, err=True)
77 err = partial(click.secho, fg="red", err=True)
79 pygram.initialize(CACHE_DIR)
80 syms = pygram.python_symbols
83 class NothingChanged(UserWarning):
84 """Raised when reformatted code is the same as source."""
87 class CannotSplit(Exception):
88 """A readable split that fits the allotted line length is impossible."""
91 class InvalidInput(ValueError):
92 """Raised when input source code fails all parse attempts."""
95 class WriteBack(Enum):
102 def from_configuration(cls, *, check: bool, diff: bool) -> "WriteBack":
103 if check and not diff:
106 return cls.DIFF if diff else cls.YES
115 class TargetVersion(Enum):
125 def is_python2(self) -> bool:
126 return self is TargetVersion.CPY27
129 PY36_VERSIONS = {TargetVersion.CPY36, TargetVersion.CPY37, TargetVersion.CPY38}
133 # All string literals are unicode
136 NUMERIC_UNDERSCORES = 3
140 VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
141 TargetVersion.CPY27: set(),
142 TargetVersion.PYPY35: {Feature.UNICODE_LITERALS, Feature.F_STRINGS},
143 TargetVersion.CPY33: {Feature.UNICODE_LITERALS},
144 TargetVersion.CPY34: {Feature.UNICODE_LITERALS},
145 TargetVersion.CPY35: {Feature.UNICODE_LITERALS, Feature.TRAILING_COMMA},
146 TargetVersion.CPY36: {
147 Feature.UNICODE_LITERALS,
149 Feature.NUMERIC_UNDERSCORES,
150 Feature.TRAILING_COMMA,
152 TargetVersion.CPY37: {
153 Feature.UNICODE_LITERALS,
155 Feature.NUMERIC_UNDERSCORES,
156 Feature.TRAILING_COMMA,
158 TargetVersion.CPY38: {
159 Feature.UNICODE_LITERALS,
161 Feature.NUMERIC_UNDERSCORES,
162 Feature.TRAILING_COMMA,
169 target_versions: Set[TargetVersion] = Factory(set)
170 line_length: int = DEFAULT_LINE_LENGTH
171 string_normalization: bool = True
174 def get_cache_key(self) -> str:
175 if self.target_versions:
176 version_str = ",".join(
178 for version in sorted(self.target_versions, key=lambda v: v.value)
184 str(self.line_length),
185 str(int(self.string_normalization)),
186 str(int(self.is_pyi)),
188 return ".".join(parts)
191 def supports_feature(target_versions: Set[TargetVersion], feature: Feature) -> bool:
192 return all(feature in VERSION_TO_FEATURES[version] for version in target_versions)
195 def read_pyproject_toml(
196 ctx: click.Context, param: click.Parameter, value: Union[str, int, bool, None]
198 """Inject Black configuration from "pyproject.toml" into defaults in `ctx`.
200 Returns the path to a successfully found and read configuration file, None
203 assert not isinstance(value, (int, bool)), "Invalid parameter type passed"
205 root = find_project_root(ctx.params.get("src", ()))
206 path = root / "pyproject.toml"
213 pyproject_toml = toml.load(value)
214 config = pyproject_toml.get("tool", {}).get("black", {})
215 except (toml.TomlDecodeError, OSError) as e:
216 raise click.FileError(
217 filename=value, hint=f"Error reading configuration file: {e}"
223 if ctx.default_map is None:
225 ctx.default_map.update( # type: ignore # bad types in .pyi
226 {k.replace("--", "").replace("-", "_"): v for k, v in config.items()}
231 @click.command(context_settings=dict(help_option_names=["-h", "--help"]))
236 default=DEFAULT_LINE_LENGTH,
237 help="How many characters per line to allow.",
243 type=click.Choice([v.name.lower() for v in TargetVersion]),
244 callback=lambda c, p, v: [TargetVersion[val.upper()] for val in v],
247 "Python versions that should be supported by Black's output. [default: "
248 "per-file auto-detection]"
255 "Format all input files like typing stubs regardless of file extension "
256 "(useful when piping source on standard input)."
261 "--skip-string-normalization",
263 help="Don't normalize string quotes or prefixes.",
269 "Don't write the files back, just return the status. Return code 0 "
270 "means nothing would change. Return code 1 means some files would be "
271 "reformatted. Return code 123 means there was an internal error."
277 help="Don't write the files back, just output a diff for each file on stdout.",
282 help="If --fast given, skip temporary sanity checks. [default: --safe]",
287 default=DEFAULT_INCLUDES,
289 "A regular expression that matches files and directories that should be "
290 "included on recursive searches. An empty value means all files are "
291 "included regardless of the name. Use forward slashes for directories on "
292 "all platforms (Windows, too). Exclusions are calculated first, inclusions "
300 default=DEFAULT_EXCLUDES,
302 "A regular expression that matches files and directories that should be "
303 "excluded on recursive searches. An empty value means no paths are excluded. "
304 "Use forward slashes for directories on all platforms (Windows, too). "
305 "Exclusions are calculated first, inclusions later."
314 "Don't emit non-error messages to stderr. Errors are still emitted, "
315 "silence those with 2>/dev/null."
323 "Also emit messages to stderr about files that were not changed or were "
324 "ignored due to --exclude=."
327 @click.version_option(version=__version__)
332 exists=True, file_okay=True, dir_okay=True, readable=True, allow_dash=True
339 exists=False, file_okay=True, dir_okay=False, readable=True, allow_dash=False
342 callback=read_pyproject_toml,
343 help="Read configuration from PATH.",
349 target_version: List[TargetVersion],
354 skip_string_normalization: bool,
360 config: Optional[str],
362 """The uncompromising code formatter."""
363 write_back = WriteBack.from_configuration(check=check, diff=diff)
365 versions = set(target_version)
367 # We'll autodetect later.
370 target_versions=versions,
371 line_length=line_length,
373 string_normalization=not skip_string_normalization,
375 if config and verbose:
376 out(f"Using configuration from {config}.", bold=False, fg="blue")
378 include_regex = re_compile_maybe_verbose(include)
380 err(f"Invalid regular expression for include given: {include!r}")
383 exclude_regex = re_compile_maybe_verbose(exclude)
385 err(f"Invalid regular expression for exclude given: {exclude!r}")
387 report = Report(check=check, quiet=quiet, verbose=verbose)
388 root = find_project_root(src)
389 sources: Set[Path] = set()
394 gen_python_files_in_dir(p, root, include_regex, exclude_regex, report)
396 elif p.is_file() or s == "-":
397 # if a file was explicitly given, we don't care about its extension
400 err(f"invalid path: {s}")
401 if len(sources) == 0:
402 if verbose or not quiet:
403 out("No paths given. Nothing to do 😴")
406 if len(sources) == 1:
410 write_back=write_back,
415 loop = asyncio.get_event_loop()
416 executor = ProcessPoolExecutor(max_workers=os.cpu_count())
418 loop.run_until_complete(
422 write_back=write_back,
431 if verbose or not quiet:
432 bang = "💥 💔 💥" if report.return_code else "✨ 🍰 ✨"
433 out(f"All done! {bang}")
434 click.secho(str(report), err=True)
435 ctx.exit(report.return_code)
439 src: Path, fast: bool, write_back: WriteBack, mode: FileMode, report: "Report"
441 """Reformat a single file under `src` without spawning child processes.
443 If `quiet` is True, non-error messages are not output. `line_length`,
444 `write_back`, `fast` and `pyi` options are passed to
445 :func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
449 if not src.is_file() and str(src) == "-":
450 if format_stdin_to_stdout(fast=fast, write_back=write_back, mode=mode):
451 changed = Changed.YES
454 if write_back != WriteBack.DIFF:
455 cache = read_cache(mode)
456 res_src = src.resolve()
457 if res_src in cache and cache[res_src] == get_cache_info(res_src):
458 changed = Changed.CACHED
459 if changed is not Changed.CACHED and format_file_in_place(
460 src, fast=fast, write_back=write_back, mode=mode
462 changed = Changed.YES
463 if (write_back is WriteBack.YES and changed is not Changed.CACHED) or (
464 write_back is WriteBack.CHECK and changed is Changed.NO
466 write_cache(cache, [src], mode)
467 report.done(src, changed)
468 except Exception as exc:
469 report.failed(src, str(exc))
472 async def schedule_formatting(
475 write_back: WriteBack,
481 """Run formatting of `sources` in parallel using the provided `executor`.
483 (Use ProcessPoolExecutors for actual parallelism.)
485 `line_length`, `write_back`, `fast`, and `pyi` options are passed to
486 :func:`format_file_in_place`.
489 if write_back != WriteBack.DIFF:
490 cache = read_cache(mode)
491 sources, cached = filter_cached(cache, sources)
492 for src in sorted(cached):
493 report.done(src, Changed.CACHED)
498 sources_to_cache = []
500 if write_back == WriteBack.DIFF:
501 # For diff output, we need locks to ensure we don't interleave output
502 # from different processes.
504 lock = manager.Lock()
506 loop.run_in_executor(
507 executor, format_file_in_place, src, fast, mode, write_back, lock
509 for src in sorted(sources)
511 pending: Iterable[asyncio.Task] = tasks.keys()
513 loop.add_signal_handler(signal.SIGINT, cancel, pending)
514 loop.add_signal_handler(signal.SIGTERM, cancel, pending)
515 except NotImplementedError:
516 # There are no good alternatives for these on Windows.
519 done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
521 src = tasks.pop(task)
523 cancelled.append(task)
524 elif task.exception():
525 report.failed(src, str(task.exception()))
527 changed = Changed.YES if task.result() else Changed.NO
528 # If the file was written back or was successfully checked as
529 # well-formatted, store this information in the cache.
530 if write_back is WriteBack.YES or (
531 write_back is WriteBack.CHECK and changed is Changed.NO
533 sources_to_cache.append(src)
534 report.done(src, changed)
536 await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
538 write_cache(cache, sources_to_cache, mode)
541 def format_file_in_place(
545 write_back: WriteBack = WriteBack.NO,
546 lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
548 """Format file under `src` path. Return True if changed.
550 If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted
552 `line_length` and `fast` options are passed to :func:`format_file_contents`.
554 if src.suffix == ".pyi":
555 mode = evolve(mode, is_pyi=True)
557 then = datetime.utcfromtimestamp(src.stat().st_mtime)
558 with open(src, "rb") as buf:
559 src_contents, encoding, newline = decode_bytes(buf.read())
561 dst_contents = format_file_contents(src_contents, fast=fast, mode=mode)
562 except NothingChanged:
565 if write_back == write_back.YES:
566 with open(src, "w", encoding=encoding, newline=newline) as f:
567 f.write(dst_contents)
568 elif write_back == write_back.DIFF:
569 now = datetime.utcnow()
570 src_name = f"{src}\t{then} +0000"
571 dst_name = f"{src}\t{now} +0000"
572 diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
576 f = io.TextIOWrapper(
582 f.write(diff_contents)
590 def format_stdin_to_stdout(
591 fast: bool, *, write_back: WriteBack = WriteBack.NO, mode: FileMode
593 """Format file on stdin. Return True if changed.
595 If `write_back` is YES, write reformatted code back to stdout. If it is DIFF,
596 write a diff to stdout. The `mode` argument is passed to
597 :func:`format_file_contents`.
599 then = datetime.utcnow()
600 src, encoding, newline = decode_bytes(sys.stdin.buffer.read())
603 dst = format_file_contents(src, fast=fast, mode=mode)
606 except NothingChanged:
610 f = io.TextIOWrapper(
611 sys.stdout.buffer, encoding=encoding, newline=newline, write_through=True
613 if write_back == WriteBack.YES:
615 elif write_back == WriteBack.DIFF:
616 now = datetime.utcnow()
617 src_name = f"STDIN\t{then} +0000"
618 dst_name = f"STDOUT\t{now} +0000"
619 f.write(diff(src, dst, src_name, dst_name))
623 def format_file_contents(
624 src_contents: str, *, fast: bool, mode: FileMode
626 """Reformat contents a file and return new contents.
628 If `fast` is False, additionally confirm that the reformatted code is
629 valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
630 `line_length` is passed to :func:`format_str`.
632 if src_contents.strip() == "":
635 dst_contents = format_str(src_contents, mode=mode)
636 if src_contents == dst_contents:
640 assert_equivalent(src_contents, dst_contents)
641 assert_stable(src_contents, dst_contents, mode=mode)
645 def format_str(src_contents: str, *, mode: FileMode) -> FileContent:
646 """Reformat a string and return new contents.
648 `line_length` determines how many characters per line are allowed.
650 src_node = lib2to3_parse(src_contents.lstrip(), mode.target_versions)
652 future_imports = get_future_imports(src_node)
653 if mode.target_versions:
654 versions = mode.target_versions
656 versions = detect_target_versions(src_node)
657 normalize_fmt_off(src_node)
658 lines = LineGenerator(
659 remove_u_prefix="unicode_literals" in future_imports
660 or supports_feature(versions, Feature.UNICODE_LITERALS),
662 normalize_strings=mode.string_normalization,
664 elt = EmptyLineTracker(is_pyi=mode.is_pyi)
667 for current_line in lines.visit(src_node):
668 for _ in range(after):
669 dst_contents += str(empty_line)
670 before, after = elt.maybe_empty_lines(current_line)
671 for _ in range(before):
672 dst_contents += str(empty_line)
673 for line in split_line(
675 line_length=mode.line_length,
676 supports_trailing_commas=supports_feature(versions, Feature.TRAILING_COMMA),
678 dst_contents += str(line)
682 def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine]:
683 """Return a tuple of (decoded_contents, encoding, newline).
685 `newline` is either CRLF or LF but `decoded_contents` is decoded with
686 universal newlines (i.e. only contains LF).
688 srcbuf = io.BytesIO(src)
689 encoding, lines = tokenize.detect_encoding(srcbuf.readline)
691 return "", encoding, "\n"
693 newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
695 with io.TextIOWrapper(srcbuf, encoding) as tiow:
696 return tiow.read(), encoding, newline
700 pygram.python_grammar_no_print_statement_no_exec_statement,
701 pygram.python_grammar_no_print_statement,
702 pygram.python_grammar,
706 def get_grammars(target_versions: Set[TargetVersion]) -> List[Grammar]:
707 if not target_versions:
709 elif all(not version.is_python2() for version in target_versions):
710 # Python 2-compatible code, so don't try Python 3 grammar.
712 pygram.python_grammar_no_print_statement_no_exec_statement,
713 pygram.python_grammar_no_print_statement,
716 return [pygram.python_grammar]
719 def lib2to3_parse(src_txt: str, target_versions: Iterable[TargetVersion] = ()) -> Node:
720 """Given a string with source, return the lib2to3 Node."""
721 if src_txt[-1:] != "\n":
724 for grammar in get_grammars(set(target_versions)):
725 drv = driver.Driver(grammar, pytree.convert)
727 result = drv.parse_string(src_txt, True)
730 except ParseError as pe:
731 lineno, column = pe.context[1]
732 lines = src_txt.splitlines()
734 faulty_line = lines[lineno - 1]
736 faulty_line = "<line number missing in source>"
737 exc = InvalidInput(f"Cannot parse: {lineno}:{column}: {faulty_line}")
741 if isinstance(result, Leaf):
742 result = Node(syms.file_input, [result])
746 def lib2to3_unparse(node: Node) -> str:
747 """Given a lib2to3 node, return its string representation."""
755 class Visitor(Generic[T]):
756 """Basic lib2to3 visitor that yields things of type `T` on `visit()`."""
758 def visit(self, node: LN) -> Iterator[T]:
759 """Main method to visit `node` and its children.
761 It tries to find a `visit_*()` method for the given `node.type`, like
762 `visit_simple_stmt` for Node objects or `visit_INDENT` for Leaf objects.
763 If no dedicated `visit_*()` method is found, chooses `visit_default()`
766 Then yields objects of type `T` from the selected visitor.
769 name = token.tok_name[node.type]
771 name = type_repr(node.type)
772 yield from getattr(self, f"visit_{name}", self.visit_default)(node)
774 def visit_default(self, node: LN) -> Iterator[T]:
775 """Default `visit_*()` implementation. Recurses to children of `node`."""
776 if isinstance(node, Node):
777 for child in node.children:
778 yield from self.visit(child)
782 class DebugVisitor(Visitor[T]):
785 def visit_default(self, node: LN) -> Iterator[T]:
786 indent = " " * (2 * self.tree_depth)
787 if isinstance(node, Node):
788 _type = type_repr(node.type)
789 out(f"{indent}{_type}", fg="yellow")
791 for child in node.children:
792 yield from self.visit(child)
795 out(f"{indent}/{_type}", fg="yellow", bold=False)
797 _type = token.tok_name.get(node.type, str(node.type))
798 out(f"{indent}{_type}", fg="blue", nl=False)
800 # We don't have to handle prefixes for `Node` objects since
801 # that delegates to the first child anyway.
802 out(f" {node.prefix!r}", fg="green", bold=False, nl=False)
803 out(f" {node.value!r}", fg="blue", bold=False)
806 def show(cls, code: Union[str, Leaf, Node]) -> None:
807 """Pretty-print the lib2to3 AST of a given string of `code`.
809 Convenience method for debugging.
811 v: DebugVisitor[None] = DebugVisitor()
812 if isinstance(code, str):
813 code = lib2to3_parse(code)
817 WHITESPACE = {token.DEDENT, token.INDENT, token.NEWLINE}
828 STANDALONE_COMMENT = 153
829 token.tok_name[STANDALONE_COMMENT] = "STANDALONE_COMMENT"
830 LOGIC_OPERATORS = {"and", "or"}
855 STARS = {token.STAR, token.DOUBLESTAR}
858 syms.argument, # double star in arglist
859 syms.trailer, # single argument to call
861 syms.varargslist, # lambdas
863 UNPACKING_PARENTS = {
864 syms.atom, # single element of a list or set literal
868 syms.testlist_star_expr,
903 COMPREHENSION_PRIORITY = 20
905 TERNARY_PRIORITY = 16
908 COMPARATOR_PRIORITY = 10
919 token.DOUBLESLASH: 4,
929 class BracketTracker:
930 """Keeps track of brackets on a line."""
933 bracket_match: Dict[Tuple[Depth, NodeType], Leaf] = Factory(dict)
934 delimiters: Dict[LeafID, Priority] = Factory(dict)
935 previous: Optional[Leaf] = None
936 _for_loop_depths: List[int] = Factory(list)
937 _lambda_argument_depths: List[int] = Factory(list)
939 def mark(self, leaf: Leaf) -> None:
940 """Mark `leaf` with bracket-related metadata. Keep track of delimiters.
942 All leaves receive an int `bracket_depth` field that stores how deep
943 within brackets a given leaf is. 0 means there are no enclosing brackets
944 that started on this line.
946 If a leaf is itself a closing bracket, it receives an `opening_bracket`
947 field that it forms a pair with. This is a one-directional link to
948 avoid reference cycles.
950 If a leaf is a delimiter (a token on which Black can split the line if
951 needed) and it's on depth 0, its `id()` is stored in the tracker's
954 if leaf.type == token.COMMENT:
957 self.maybe_decrement_after_for_loop_variable(leaf)
958 self.maybe_decrement_after_lambda_arguments(leaf)
959 if leaf.type in CLOSING_BRACKETS:
961 opening_bracket = self.bracket_match.pop((self.depth, leaf.type))
962 leaf.opening_bracket = opening_bracket
963 leaf.bracket_depth = self.depth
965 delim = is_split_before_delimiter(leaf, self.previous)
966 if delim and self.previous is not None:
967 self.delimiters[id(self.previous)] = delim
969 delim = is_split_after_delimiter(leaf, self.previous)
971 self.delimiters[id(leaf)] = delim
972 if leaf.type in OPENING_BRACKETS:
973 self.bracket_match[self.depth, BRACKET[leaf.type]] = leaf
976 self.maybe_increment_lambda_arguments(leaf)
977 self.maybe_increment_for_loop_variable(leaf)
979 def any_open_brackets(self) -> bool:
980 """Return True if there is an yet unmatched open bracket on the line."""
981 return bool(self.bracket_match)
983 def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> int:
984 """Return the highest priority of a delimiter found on the line.
986 Values are consistent with what `is_split_*_delimiter()` return.
987 Raises ValueError on no delimiters.
989 return max(v for k, v in self.delimiters.items() if k not in exclude)
991 def delimiter_count_with_priority(self, priority: int = 0) -> int:
992 """Return the number of delimiters with the given `priority`.
994 If no `priority` is passed, defaults to max priority on the line.
996 if not self.delimiters:
999 priority = priority or self.max_delimiter_priority()
1000 return sum(1 for p in self.delimiters.values() if p == priority)
1002 def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
1003 """In a for loop, or comprehension, the variables are often unpacks.
1005 To avoid splitting on the comma in this situation, increase the depth of
1006 tokens between `for` and `in`.
1008 if leaf.type == token.NAME and leaf.value == "for":
1010 self._for_loop_depths.append(self.depth)
1015 def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
1016 """See `maybe_increment_for_loop_variable` above for explanation."""
1018 self._for_loop_depths
1019 and self._for_loop_depths[-1] == self.depth
1020 and leaf.type == token.NAME
1021 and leaf.value == "in"
1024 self._for_loop_depths.pop()
1029 def maybe_increment_lambda_arguments(self, leaf: Leaf) -> bool:
1030 """In a lambda expression, there might be more than one argument.
1032 To avoid splitting on the comma in this situation, increase the depth of
1033 tokens between `lambda` and `:`.
1035 if leaf.type == token.NAME and leaf.value == "lambda":
1037 self._lambda_argument_depths.append(self.depth)
1042 def maybe_decrement_after_lambda_arguments(self, leaf: Leaf) -> bool:
1043 """See `maybe_increment_lambda_arguments` above for explanation."""
1045 self._lambda_argument_depths
1046 and self._lambda_argument_depths[-1] == self.depth
1047 and leaf.type == token.COLON
1050 self._lambda_argument_depths.pop()
1055 def get_open_lsqb(self) -> Optional[Leaf]:
1056 """Return the most recent opening square bracket (if any)."""
1057 return self.bracket_match.get((self.depth - 1, token.RSQB))
1062 """Holds leaves and comments. Can be printed with `str(line)`."""
1065 leaves: List[Leaf] = Factory(list)
1066 # The LeafID keys of comments must remain ordered by the corresponding leaf's index
1068 comments: Dict[LeafID, List[Leaf]] = Factory(dict)
1069 bracket_tracker: BracketTracker = Factory(BracketTracker)
1070 inside_brackets: bool = False
1071 should_explode: bool = False
1073 def append(self, leaf: Leaf, preformatted: bool = False) -> None:
1074 """Add a new `leaf` to the end of the line.
1076 Unless `preformatted` is True, the `leaf` will receive a new consistent
1077 whitespace prefix and metadata applied by :class:`BracketTracker`.
1078 Trailing commas are maybe removed, unpacked for loop variables are
1079 demoted from being delimiters.
1081 Inline comments are put aside.
1083 has_value = leaf.type in BRACKETS or bool(leaf.value.strip())
1087 if token.COLON == leaf.type and self.is_class_paren_empty:
1088 del self.leaves[-2:]
1089 if self.leaves and not preformatted:
1090 # Note: at this point leaf.prefix should be empty except for
1091 # imports, for which we only preserve newlines.
1092 leaf.prefix += whitespace(
1093 leaf, complex_subscript=self.is_complex_subscript(leaf)
1095 if self.inside_brackets or not preformatted:
1096 self.bracket_tracker.mark(leaf)
1097 self.maybe_remove_trailing_comma(leaf)
1098 if not self.append_comment(leaf):
1099 self.leaves.append(leaf)
1101 def append_safe(self, leaf: Leaf, preformatted: bool = False) -> None:
1102 """Like :func:`append()` but disallow invalid standalone comment structure.
1104 Raises ValueError when any `leaf` is appended after a standalone comment
1105 or when a standalone comment is not the first leaf on the line.
1107 if self.bracket_tracker.depth == 0:
1109 raise ValueError("cannot append to standalone comments")
1111 if self.leaves and leaf.type == STANDALONE_COMMENT:
1113 "cannot append standalone comments to a populated line"
1116 self.append(leaf, preformatted=preformatted)
1119 def is_comment(self) -> bool:
1120 """Is this line a standalone comment?"""
1121 return len(self.leaves) == 1 and self.leaves[0].type == STANDALONE_COMMENT
1124 def is_decorator(self) -> bool:
1125 """Is this line a decorator?"""
1126 return bool(self) and self.leaves[0].type == token.AT
1129 def is_import(self) -> bool:
1130 """Is this an import line?"""
1131 return bool(self) and is_import(self.leaves[0])
1134 def is_class(self) -> bool:
1135 """Is this line a class definition?"""
1138 and self.leaves[0].type == token.NAME
1139 and self.leaves[0].value == "class"
1143 def is_stub_class(self) -> bool:
1144 """Is this line a class definition with a body consisting only of "..."?"""
1145 return self.is_class and self.leaves[-3:] == [
1146 Leaf(token.DOT, ".") for _ in range(3)
1150 def is_def(self) -> bool:
1151 """Is this a function definition? (Also returns True for async defs.)"""
1153 first_leaf = self.leaves[0]
1158 second_leaf: Optional[Leaf] = self.leaves[1]
1161 return (first_leaf.type == token.NAME and first_leaf.value == "def") or (
1162 first_leaf.type == token.ASYNC
1163 and second_leaf is not None
1164 and second_leaf.type == token.NAME
1165 and second_leaf.value == "def"
1169 def is_class_paren_empty(self) -> bool:
1170 """Is this a class with no base classes but using parentheses?
1172 Those are unnecessary and should be removed.
1176 and len(self.leaves) == 4
1178 and self.leaves[2].type == token.LPAR
1179 and self.leaves[2].value == "("
1180 and self.leaves[3].type == token.RPAR
1181 and self.leaves[3].value == ")"
1185 def is_triple_quoted_string(self) -> bool:
1186 """Is the line a triple quoted string?"""
1189 and self.leaves[0].type == token.STRING
1190 and self.leaves[0].value.startswith(('"""', "'''"))
1193 def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
1194 """If so, needs to be split before emitting."""
1195 for leaf in self.leaves:
1196 if leaf.type == STANDALONE_COMMENT:
1197 if leaf.bracket_depth <= depth_limit:
1202 def contains_multiline_strings(self) -> bool:
1203 for leaf in self.leaves:
1204 if is_multiline_string(leaf):
1209 def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
1210 """Remove trailing comma if there is one and it's safe."""
1213 and self.leaves[-1].type == token.COMMA
1214 and closing.type in CLOSING_BRACKETS
1218 if closing.type == token.RBRACE:
1219 self.remove_trailing_comma()
1222 if closing.type == token.RSQB:
1223 comma = self.leaves[-1]
1224 if comma.parent and comma.parent.type == syms.listmaker:
1225 self.remove_trailing_comma()
1228 # For parens let's check if it's safe to remove the comma.
1229 # Imports are always safe.
1231 self.remove_trailing_comma()
1234 # Otherwise, if the trailing one is the only one, we might mistakenly
1235 # change a tuple into a different type by removing the comma.
1236 depth = closing.bracket_depth + 1
1238 opening = closing.opening_bracket
1239 for _opening_index, leaf in enumerate(self.leaves):
1246 for leaf in self.leaves[_opening_index + 1 :]:
1250 bracket_depth = leaf.bracket_depth
1251 if bracket_depth == depth and leaf.type == token.COMMA:
1253 if leaf.parent and leaf.parent.type == syms.arglist:
1258 self.remove_trailing_comma()
1263 def append_comment(self, comment: Leaf) -> bool:
1264 """Add an inline or standalone comment to the line."""
1266 comment.type == STANDALONE_COMMENT
1267 and self.bracket_tracker.any_open_brackets()
1272 if comment.type != token.COMMENT:
1276 comment.type = STANDALONE_COMMENT
1281 leaf_id = id(self.leaves[-1])
1282 if leaf_id not in self.comments:
1283 self.comments[leaf_id] = [comment]
1285 self.comments[leaf_id].append(comment)
1288 def comments_after(self, leaf: Leaf) -> List[Leaf]:
1289 """Generate comments that should appear directly after `leaf`."""
1290 return self.comments.get(id(leaf), [])
1292 def remove_trailing_comma(self) -> None:
1293 """Remove the trailing comma and moves the comments attached to it."""
1294 # Remember, the LeafID keys of self.comments are ordered by the
1295 # corresponding leaf's index in self.leaves
1296 # If id(self.leaves[-2]) is in self.comments, the order doesn't change.
1297 # Otherwise, we insert it into self.comments, and it becomes the last entry.
1298 # However, since we delete id(self.leaves[-1]) from self.comments, the invariant
1300 self.comments.setdefault(id(self.leaves[-2]), []).extend(
1301 self.comments.get(id(self.leaves[-1]), [])
1303 self.comments.pop(id(self.leaves[-1]), None)
1306 def is_complex_subscript(self, leaf: Leaf) -> bool:
1307 """Return True iff `leaf` is part of a slice with non-trivial exprs."""
1308 open_lsqb = self.bracket_tracker.get_open_lsqb()
1309 if open_lsqb is None:
1312 subscript_start = open_lsqb.next_sibling
1314 if isinstance(subscript_start, Node):
1315 if subscript_start.type == syms.listmaker:
1318 if subscript_start.type == syms.subscriptlist:
1319 subscript_start = child_towards(subscript_start, leaf)
1320 return subscript_start is not None and any(
1321 n.type in TEST_DESCENDANTS for n in subscript_start.pre_order()
1324 def __str__(self) -> str:
1325 """Render the line."""
1329 indent = " " * self.depth
1330 leaves = iter(self.leaves)
1331 first = next(leaves)
1332 res = f"{first.prefix}{indent}{first.value}"
1335 for comment in itertools.chain.from_iterable(self.comments.values()):
1339 def __bool__(self) -> bool:
1340 """Return True if the line has leaves or comments."""
1341 return bool(self.leaves or self.comments)
1345 class EmptyLineTracker:
1346 """Provides a stateful method that returns the number of potential extra
1347 empty lines needed before and after the currently processed line.
1349 Note: this tracker works on lines that haven't been split yet. It assumes
1350 the prefix of the first leaf consists of optional newlines. Those newlines
1351 are consumed by `maybe_empty_lines()` and included in the computation.
1354 is_pyi: bool = False
1355 previous_line: Optional[Line] = None
1356 previous_after: int = 0
1357 previous_defs: List[int] = Factory(list)
1359 def maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1360 """Return the number of extra empty lines before and after the `current_line`.
1362 This is for separating `def`, `async def` and `class` with extra empty
1363 lines (two on module-level).
1365 before, after = self._maybe_empty_lines(current_line)
1366 before -= self.previous_after
1367 self.previous_after = after
1368 self.previous_line = current_line
1369 return before, after
1371 def _maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1373 if current_line.depth == 0:
1374 max_allowed = 1 if self.is_pyi else 2
1375 if current_line.leaves:
1376 # Consume the first leaf's extra newlines.
1377 first_leaf = current_line.leaves[0]
1378 before = first_leaf.prefix.count("\n")
1379 before = min(before, max_allowed)
1380 first_leaf.prefix = ""
1383 depth = current_line.depth
1384 while self.previous_defs and self.previous_defs[-1] >= depth:
1385 self.previous_defs.pop()
1387 before = 0 if depth else 1
1389 before = 1 if depth else 2
1390 if current_line.is_decorator or current_line.is_def or current_line.is_class:
1391 return self._maybe_empty_lines_for_class_or_def(current_line, before)
1395 and self.previous_line.is_import
1396 and not current_line.is_import
1397 and depth == self.previous_line.depth
1399 return (before or 1), 0
1403 and self.previous_line.is_class
1404 and current_line.is_triple_quoted_string
1410 def _maybe_empty_lines_for_class_or_def(
1411 self, current_line: Line, before: int
1412 ) -> Tuple[int, int]:
1413 if not current_line.is_decorator:
1414 self.previous_defs.append(current_line.depth)
1415 if self.previous_line is None:
1416 # Don't insert empty lines before the first line in the file.
1419 if self.previous_line.is_decorator:
1422 if self.previous_line.depth < current_line.depth and (
1423 self.previous_line.is_class or self.previous_line.is_def
1428 self.previous_line.is_comment
1429 and self.previous_line.depth == current_line.depth
1435 if self.previous_line.depth > current_line.depth:
1437 elif current_line.is_class or self.previous_line.is_class:
1438 if current_line.is_stub_class and self.previous_line.is_stub_class:
1439 # No blank line between classes with an empty body
1443 elif current_line.is_def and not self.previous_line.is_def:
1444 # Blank line between a block of functions and a block of non-functions
1450 if current_line.depth and newlines:
1456 class LineGenerator(Visitor[Line]):
1457 """Generates reformatted Line objects. Empty lines are not emitted.
1459 Note: destroys the tree it's visiting by mutating prefixes of its leaves
1460 in ways that will no longer stringify to valid Python code on the tree.
1463 is_pyi: bool = False
1464 normalize_strings: bool = True
1465 current_line: Line = Factory(Line)
1466 remove_u_prefix: bool = False
1468 def line(self, indent: int = 0) -> Iterator[Line]:
1471 If the line is empty, only emit if it makes sense.
1472 If the line is too long, split it first and then generate.
1474 If any lines were generated, set up a new current_line.
1476 if not self.current_line:
1477 self.current_line.depth += indent
1478 return # Line is empty, don't emit. Creating a new one unnecessary.
1480 complete_line = self.current_line
1481 self.current_line = Line(depth=complete_line.depth + indent)
1484 def visit_default(self, node: LN) -> Iterator[Line]:
1485 """Default `visit_*()` implementation. Recurses to children of `node`."""
1486 if isinstance(node, Leaf):
1487 any_open_brackets = self.current_line.bracket_tracker.any_open_brackets()
1488 for comment in generate_comments(node):
1489 if any_open_brackets:
1490 # any comment within brackets is subject to splitting
1491 self.current_line.append(comment)
1492 elif comment.type == token.COMMENT:
1493 # regular trailing comment
1494 self.current_line.append(comment)
1495 yield from self.line()
1498 # regular standalone comment
1499 yield from self.line()
1501 self.current_line.append(comment)
1502 yield from self.line()
1504 normalize_prefix(node, inside_brackets=any_open_brackets)
1505 if self.normalize_strings and node.type == token.STRING:
1506 normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
1507 normalize_string_quotes(node)
1508 if node.type == token.NUMBER:
1509 normalize_numeric_literal(node)
1510 if node.type not in WHITESPACE:
1511 self.current_line.append(node)
1512 yield from super().visit_default(node)
1514 def visit_INDENT(self, node: Node) -> Iterator[Line]:
1515 """Increase indentation level, maybe yield a line."""
1516 # In blib2to3 INDENT never holds comments.
1517 yield from self.line(+1)
1518 yield from self.visit_default(node)
1520 def visit_DEDENT(self, node: Node) -> Iterator[Line]:
1521 """Decrease indentation level, maybe yield a line."""
1522 # The current line might still wait for trailing comments. At DEDENT time
1523 # there won't be any (they would be prefixes on the preceding NEWLINE).
1524 # Emit the line then.
1525 yield from self.line()
1527 # While DEDENT has no value, its prefix may contain standalone comments
1528 # that belong to the current indentation level. Get 'em.
1529 yield from self.visit_default(node)
1531 # Finally, emit the dedent.
1532 yield from self.line(-1)
1535 self, node: Node, keywords: Set[str], parens: Set[str]
1536 ) -> Iterator[Line]:
1537 """Visit a statement.
1539 This implementation is shared for `if`, `while`, `for`, `try`, `except`,
1540 `def`, `with`, `class`, `assert` and assignments.
1542 The relevant Python language `keywords` for a given statement will be
1543 NAME leaves within it. This methods puts those on a separate line.
1545 `parens` holds a set of string leaf values immediately after which
1546 invisible parens should be put.
1548 normalize_invisible_parens(node, parens_after=parens)
1549 for child in node.children:
1550 if child.type == token.NAME and child.value in keywords: # type: ignore
1551 yield from self.line()
1553 yield from self.visit(child)
1555 def visit_suite(self, node: Node) -> Iterator[Line]:
1556 """Visit a suite."""
1557 if self.is_pyi and is_stub_suite(node):
1558 yield from self.visit(node.children[2])
1560 yield from self.visit_default(node)
1562 def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
1563 """Visit a statement without nested statements."""
1564 is_suite_like = node.parent and node.parent.type in STATEMENT
1566 if self.is_pyi and is_stub_body(node):
1567 yield from self.visit_default(node)
1569 yield from self.line(+1)
1570 yield from self.visit_default(node)
1571 yield from self.line(-1)
1574 if not self.is_pyi or not node.parent or not is_stub_suite(node.parent):
1575 yield from self.line()
1576 yield from self.visit_default(node)
1578 def visit_async_stmt(self, node: Node) -> Iterator[Line]:
1579 """Visit `async def`, `async for`, `async with`."""
1580 yield from self.line()
1582 children = iter(node.children)
1583 for child in children:
1584 yield from self.visit(child)
1586 if child.type == token.ASYNC:
1589 internal_stmt = next(children)
1590 for child in internal_stmt.children:
1591 yield from self.visit(child)
1593 def visit_decorators(self, node: Node) -> Iterator[Line]:
1594 """Visit decorators."""
1595 for child in node.children:
1596 yield from self.line()
1597 yield from self.visit(child)
1599 def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
1600 """Remove a semicolon and put the other statement on a separate line."""
1601 yield from self.line()
1603 def visit_ENDMARKER(self, leaf: Leaf) -> Iterator[Line]:
1604 """End of file. Process outstanding comments and end with a newline."""
1605 yield from self.visit_default(leaf)
1606 yield from self.line()
1608 def visit_STANDALONE_COMMENT(self, leaf: Leaf) -> Iterator[Line]:
1609 if not self.current_line.bracket_tracker.any_open_brackets():
1610 yield from self.line()
1611 yield from self.visit_default(leaf)
1613 def __attrs_post_init__(self) -> None:
1614 """You are in a twisty little maze of passages."""
1617 self.visit_assert_stmt = partial(v, keywords={"assert"}, parens={"assert", ","})
1618 self.visit_if_stmt = partial(
1619 v, keywords={"if", "else", "elif"}, parens={"if", "elif"}
1621 self.visit_while_stmt = partial(v, keywords={"while", "else"}, parens={"while"})
1622 self.visit_for_stmt = partial(v, keywords={"for", "else"}, parens={"for", "in"})
1623 self.visit_try_stmt = partial(
1624 v, keywords={"try", "except", "else", "finally"}, parens=Ø
1626 self.visit_except_clause = partial(v, keywords={"except"}, parens=Ø)
1627 self.visit_with_stmt = partial(v, keywords={"with"}, parens=Ø)
1628 self.visit_funcdef = partial(v, keywords={"def"}, parens=Ø)
1629 self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
1630 self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
1631 self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
1632 self.visit_import_from = partial(v, keywords=Ø, parens={"import"})
1633 self.visit_del_stmt = partial(v, keywords=Ø, parens={"del"})
1634 self.visit_async_funcdef = self.visit_async_stmt
1635 self.visit_decorated = self.visit_decorators
1638 IMPLICIT_TUPLE = {syms.testlist, syms.testlist_star_expr, syms.exprlist}
1639 BRACKET = {token.LPAR: token.RPAR, token.LSQB: token.RSQB, token.LBRACE: token.RBRACE}
1640 OPENING_BRACKETS = set(BRACKET.keys())
1641 CLOSING_BRACKETS = set(BRACKET.values())
1642 BRACKETS = OPENING_BRACKETS | CLOSING_BRACKETS
1643 ALWAYS_NO_SPACE = CLOSING_BRACKETS | {token.COMMA, STANDALONE_COMMENT}
1646 def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str: # noqa: C901
1647 """Return whitespace prefix if needed for the given `leaf`.
1649 `complex_subscript` signals whether the given leaf is part of a subscription
1650 which has non-trivial arguments, like arithmetic expressions or function calls.
1658 if t in ALWAYS_NO_SPACE:
1661 if t == token.COMMENT:
1664 assert p is not None, f"INTERNAL ERROR: hand-made leaf without parent: {leaf!r}"
1665 if t == token.COLON and p.type not in {
1672 prev = leaf.prev_sibling
1674 prevp = preceding_leaf(p)
1675 if not prevp or prevp.type in OPENING_BRACKETS:
1678 if t == token.COLON:
1679 if prevp.type == token.COLON:
1682 elif prevp.type != token.COMMA and not complex_subscript:
1687 if prevp.type == token.EQUAL:
1689 if prevp.parent.type in {
1697 elif prevp.parent.type == syms.typedargslist:
1698 # A bit hacky: if the equal sign has whitespace, it means we
1699 # previously found it's a typed argument. So, we're using
1703 elif prevp.type in STARS:
1704 if is_vararg(prevp, within=VARARGS_PARENTS | UNPACKING_PARENTS):
1707 elif prevp.type == token.COLON:
1708 if prevp.parent and prevp.parent.type in {syms.subscript, syms.sliceop}:
1709 return SPACE if complex_subscript else NO
1713 and prevp.parent.type == syms.factor
1714 and prevp.type in MATH_OPERATORS
1719 prevp.type == token.RIGHTSHIFT
1721 and prevp.parent.type == syms.shift_expr
1722 and prevp.prev_sibling
1723 and prevp.prev_sibling.type == token.NAME
1724 and prevp.prev_sibling.value == "print" # type: ignore
1726 # Python 2 print chevron
1729 elif prev.type in OPENING_BRACKETS:
1732 if p.type in {syms.parameters, syms.arglist}:
1733 # untyped function signatures or calls
1734 if not prev or prev.type != token.COMMA:
1737 elif p.type == syms.varargslist:
1739 if prev and prev.type != token.COMMA:
1742 elif p.type == syms.typedargslist:
1743 # typed function signatures
1747 if t == token.EQUAL:
1748 if prev.type != syms.tname:
1751 elif prev.type == token.EQUAL:
1752 # A bit hacky: if the equal sign has whitespace, it means we
1753 # previously found it's a typed argument. So, we're using that, too.
1756 elif prev.type != token.COMMA:
1759 elif p.type == syms.tname:
1762 prevp = preceding_leaf(p)
1763 if not prevp or prevp.type != token.COMMA:
1766 elif p.type == syms.trailer:
1767 # attributes and calls
1768 if t == token.LPAR or t == token.RPAR:
1773 prevp = preceding_leaf(p)
1774 if not prevp or prevp.type != token.NUMBER:
1777 elif t == token.LSQB:
1780 elif prev.type != token.COMMA:
1783 elif p.type == syms.argument:
1785 if t == token.EQUAL:
1789 prevp = preceding_leaf(p)
1790 if not prevp or prevp.type == token.LPAR:
1793 elif prev.type in {token.EQUAL} | STARS:
1796 elif p.type == syms.decorator:
1800 elif p.type == syms.dotted_name:
1804 prevp = preceding_leaf(p)
1805 if not prevp or prevp.type == token.AT or prevp.type == token.DOT:
1808 elif p.type == syms.classdef:
1812 if prev and prev.type == token.LPAR:
1815 elif p.type in {syms.subscript, syms.sliceop}:
1818 assert p.parent is not None, "subscripts are always parented"
1819 if p.parent.type == syms.subscriptlist:
1824 elif not complex_subscript:
1827 elif p.type == syms.atom:
1828 if prev and t == token.DOT:
1829 # dots, but not the first one.
1832 elif p.type == syms.dictsetmaker:
1834 if prev and prev.type == token.DOUBLESTAR:
1837 elif p.type in {syms.factor, syms.star_expr}:
1840 prevp = preceding_leaf(p)
1841 if not prevp or prevp.type in OPENING_BRACKETS:
1844 prevp_parent = prevp.parent
1845 assert prevp_parent is not None
1846 if prevp.type == token.COLON and prevp_parent.type in {
1852 elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
1855 elif t in {token.NAME, token.NUMBER, token.STRING}:
1858 elif p.type == syms.import_from:
1860 if prev and prev.type == token.DOT:
1863 elif t == token.NAME:
1867 if prev and prev.type == token.DOT:
1870 elif p.type == syms.sliceop:
1876 def preceding_leaf(node: Optional[LN]) -> Optional[Leaf]:
1877 """Return the first leaf that precedes `node`, if any."""
1879 res = node.prev_sibling
1881 if isinstance(res, Leaf):
1885 return list(res.leaves())[-1]
1894 def child_towards(ancestor: Node, descendant: LN) -> Optional[LN]:
1895 """Return the child of `ancestor` that contains `descendant`."""
1896 node: Optional[LN] = descendant
1897 while node and node.parent != ancestor:
1902 def container_of(leaf: Leaf) -> LN:
1903 """Return `leaf` or one of its ancestors that is the topmost container of it.
1905 By "container" we mean a node where `leaf` is the very first child.
1907 same_prefix = leaf.prefix
1908 container: LN = leaf
1910 parent = container.parent
1914 if parent.children[0].prefix != same_prefix:
1917 if parent.type == syms.file_input:
1920 if parent.prev_sibling is not None and parent.prev_sibling.type in BRACKETS:
1927 def is_split_after_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> int:
1928 """Return the priority of the `leaf` delimiter, given a line break after it.
1930 The delimiter priorities returned here are from those delimiters that would
1931 cause a line break after themselves.
1933 Higher numbers are higher priority.
1935 if leaf.type == token.COMMA:
1936 return COMMA_PRIORITY
1941 def is_split_before_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> int:
1942 """Return the priority of the `leaf` delimiter, given a line break before it.
1944 The delimiter priorities returned here are from those delimiters that would
1945 cause a line break before themselves.
1947 Higher numbers are higher priority.
1949 if is_vararg(leaf, within=VARARGS_PARENTS | UNPACKING_PARENTS):
1950 # * and ** might also be MATH_OPERATORS but in this case they are not.
1951 # Don't treat them as a delimiter.
1955 leaf.type == token.DOT
1957 and leaf.parent.type not in {syms.import_from, syms.dotted_name}
1958 and (previous is None or previous.type in CLOSING_BRACKETS)
1963 leaf.type in MATH_OPERATORS
1965 and leaf.parent.type not in {syms.factor, syms.star_expr}
1967 return MATH_PRIORITIES[leaf.type]
1969 if leaf.type in COMPARATORS:
1970 return COMPARATOR_PRIORITY
1973 leaf.type == token.STRING
1974 and previous is not None
1975 and previous.type == token.STRING
1977 return STRING_PRIORITY
1979 if leaf.type not in {token.NAME, token.ASYNC}:
1985 and leaf.parent.type in {syms.comp_for, syms.old_comp_for}
1986 or leaf.type == token.ASYNC
1989 not isinstance(leaf.prev_sibling, Leaf)
1990 or leaf.prev_sibling.value != "async"
1992 return COMPREHENSION_PRIORITY
1997 and leaf.parent.type in {syms.comp_if, syms.old_comp_if}
1999 return COMPREHENSION_PRIORITY
2001 if leaf.value in {"if", "else"} and leaf.parent and leaf.parent.type == syms.test:
2002 return TERNARY_PRIORITY
2004 if leaf.value == "is":
2005 return COMPARATOR_PRIORITY
2010 and leaf.parent.type in {syms.comp_op, syms.comparison}
2012 previous is not None
2013 and previous.type == token.NAME
2014 and previous.value == "not"
2017 return COMPARATOR_PRIORITY
2022 and leaf.parent.type == syms.comp_op
2024 previous is not None
2025 and previous.type == token.NAME
2026 and previous.value == "is"
2029 return COMPARATOR_PRIORITY
2031 if leaf.value in LOGIC_OPERATORS and leaf.parent:
2032 return LOGIC_PRIORITY
2037 FMT_OFF = {"# fmt: off", "# fmt:off", "# yapf: disable"}
2038 FMT_ON = {"# fmt: on", "# fmt:on", "# yapf: enable"}
2041 def generate_comments(leaf: LN) -> Iterator[Leaf]:
2042 """Clean the prefix of the `leaf` and generate comments from it, if any.
2044 Comments in lib2to3 are shoved into the whitespace prefix. This happens
2045 in `pgen2/driver.py:Driver.parse_tokens()`. This was a brilliant implementation
2046 move because it does away with modifying the grammar to include all the
2047 possible places in which comments can be placed.
2049 The sad consequence for us though is that comments don't "belong" anywhere.
2050 This is why this function generates simple parentless Leaf objects for
2051 comments. We simply don't know what the correct parent should be.
2053 No matter though, we can live without this. We really only need to
2054 differentiate between inline and standalone comments. The latter don't
2055 share the line with any code.
2057 Inline comments are emitted as regular token.COMMENT leaves. Standalone
2058 are emitted with a fake STANDALONE_COMMENT token identifier.
2060 for pc in list_comments(leaf.prefix, is_endmarker=leaf.type == token.ENDMARKER):
2061 yield Leaf(pc.type, pc.value, prefix="\n" * pc.newlines)
2066 """Describes a piece of syntax that is a comment.
2068 It's not a :class:`blib2to3.pytree.Leaf` so that:
2070 * it can be cached (`Leaf` objects should not be reused more than once as
2071 they store their lineno, column, prefix, and parent information);
2072 * `newlines` and `consumed` fields are kept separate from the `value`. This
2073 simplifies handling of special marker comments like ``# fmt: off/on``.
2076 type: int # token.COMMENT or STANDALONE_COMMENT
2077 value: str # content of the comment
2078 newlines: int # how many newlines before the comment
2079 consumed: int # how many characters of the original leaf's prefix did we consume
2082 @lru_cache(maxsize=4096)
2083 def list_comments(prefix: str, *, is_endmarker: bool) -> List[ProtoComment]:
2084 """Return a list of :class:`ProtoComment` objects parsed from the given `prefix`."""
2085 result: List[ProtoComment] = []
2086 if not prefix or "#" not in prefix:
2091 for index, line in enumerate(prefix.split("\n")):
2092 consumed += len(line) + 1 # adding the length of the split '\n'
2093 line = line.lstrip()
2096 if not line.startswith("#"):
2099 if index == 0 and not is_endmarker:
2100 comment_type = token.COMMENT # simple trailing comment
2102 comment_type = STANDALONE_COMMENT
2103 comment = make_comment(line)
2106 type=comment_type, value=comment, newlines=nlines, consumed=consumed
2113 def make_comment(content: str) -> str:
2114 """Return a consistently formatted comment from the given `content` string.
2116 All comments (except for "##", "#!", "#:", '#'", "#%%") should have a single
2117 space between the hash sign and the content.
2119 If `content` didn't start with a hash sign, one is provided.
2121 content = content.rstrip()
2125 if content[0] == "#":
2126 content = content[1:]
2127 if content and content[0] not in " !:#'%":
2128 content = " " + content
2129 return "#" + content
2135 inner: bool = False,
2136 supports_trailing_commas: bool = False,
2137 ) -> Iterator[Line]:
2138 """Split a `line` into potentially many lines.
2140 They should fit in the allotted `line_length` but might not be able to.
2141 `inner` signifies that there were a pair of brackets somewhere around the
2142 current `line`, possibly transitively. This means we can fallback to splitting
2143 by delimiters if the LHS/RHS don't yield any results.
2145 If `supports_trailing_commas` is True, splitting may use the TRAILING_COMMA feature.
2151 line_str = str(line).strip("\n")
2153 # we don't want to split special comments like type annotations
2154 # https://github.com/python/typing/issues/186
2155 has_special_comment = False
2156 for leaf in line.leaves:
2157 for comment in line.comments_after(leaf):
2158 if leaf.type == token.COMMA and is_special_comment(comment):
2159 has_special_comment = True
2162 not has_special_comment
2163 and not line.should_explode
2164 and is_line_short_enough(line, line_length=line_length, line_str=line_str)
2169 split_funcs: List[SplitFunc]
2171 split_funcs = [left_hand_split]
2174 def rhs(line: Line, supports_trailing_commas: bool = False) -> Iterator[Line]:
2175 for omit in generate_trailers_to_omit(line, line_length):
2178 line, line_length, supports_trailing_commas, omit=omit
2181 if is_line_short_enough(lines[0], line_length=line_length):
2185 # All splits failed, best effort split with no omits.
2186 # This mostly happens to multiline strings that are by definition
2187 # reported as not fitting a single line.
2188 yield from right_hand_split(line, supports_trailing_commas)
2190 if line.inside_brackets:
2191 split_funcs = [delimiter_split, standalone_comment_split, rhs]
2194 for split_func in split_funcs:
2195 # We are accumulating lines in `result` because we might want to abort
2196 # mission and return the original line in the end, or attempt a different
2198 result: List[Line] = []
2200 for l in split_func(line, supports_trailing_commas):
2201 if str(l).strip("\n") == line_str:
2202 raise CannotSplit("Split function returned an unchanged result")
2207 line_length=line_length,
2209 supports_trailing_commas=supports_trailing_commas,
2223 def left_hand_split(
2224 line: Line, supports_trailing_commas: bool = False
2225 ) -> Iterator[Line]:
2226 """Split line into many lines, starting with the first matching bracket pair.
2228 Note: this usually looks weird, only use this for function definitions.
2229 Prefer RHS otherwise. This is why this function is not symmetrical with
2230 :func:`right_hand_split` which also handles optional parentheses.
2232 tail_leaves: List[Leaf] = []
2233 body_leaves: List[Leaf] = []
2234 head_leaves: List[Leaf] = []
2235 current_leaves = head_leaves
2236 matching_bracket = None
2237 for leaf in line.leaves:
2239 current_leaves is body_leaves
2240 and leaf.type in CLOSING_BRACKETS
2241 and leaf.opening_bracket is matching_bracket
2243 current_leaves = tail_leaves if body_leaves else head_leaves
2244 current_leaves.append(leaf)
2245 if current_leaves is head_leaves:
2246 if leaf.type in OPENING_BRACKETS:
2247 matching_bracket = leaf
2248 current_leaves = body_leaves
2249 if not matching_bracket:
2250 raise CannotSplit("No brackets found")
2252 head = bracket_split_build_line(head_leaves, line, matching_bracket)
2253 body = bracket_split_build_line(body_leaves, line, matching_bracket, is_body=True)
2254 tail = bracket_split_build_line(tail_leaves, line, matching_bracket)
2255 bracket_split_succeeded_or_raise(head, body, tail)
2256 for result in (head, body, tail):
2261 def right_hand_split(
2264 supports_trailing_commas: bool = False,
2265 omit: Collection[LeafID] = (),
2266 ) -> Iterator[Line]:
2267 """Split line into many lines, starting with the last matching bracket pair.
2269 If the split was by optional parentheses, attempt splitting without them, too.
2270 `omit` is a collection of closing bracket IDs that shouldn't be considered for
2273 Note: running this function modifies `bracket_depth` on the leaves of `line`.
2275 tail_leaves: List[Leaf] = []
2276 body_leaves: List[Leaf] = []
2277 head_leaves: List[Leaf] = []
2278 current_leaves = tail_leaves
2279 opening_bracket = None
2280 closing_bracket = None
2281 for leaf in reversed(line.leaves):
2282 if current_leaves is body_leaves:
2283 if leaf is opening_bracket:
2284 current_leaves = head_leaves if body_leaves else tail_leaves
2285 current_leaves.append(leaf)
2286 if current_leaves is tail_leaves:
2287 if leaf.type in CLOSING_BRACKETS and id(leaf) not in omit:
2288 opening_bracket = leaf.opening_bracket
2289 closing_bracket = leaf
2290 current_leaves = body_leaves
2291 if not (opening_bracket and closing_bracket and head_leaves):
2292 # If there is no opening or closing_bracket that means the split failed and
2293 # all content is in the tail. Otherwise, if `head_leaves` are empty, it means
2294 # the matching `opening_bracket` wasn't available on `line` anymore.
2295 raise CannotSplit("No brackets found")
2297 tail_leaves.reverse()
2298 body_leaves.reverse()
2299 head_leaves.reverse()
2300 head = bracket_split_build_line(head_leaves, line, opening_bracket)
2301 body = bracket_split_build_line(body_leaves, line, opening_bracket, is_body=True)
2302 tail = bracket_split_build_line(tail_leaves, line, opening_bracket)
2303 bracket_split_succeeded_or_raise(head, body, tail)
2305 # the body shouldn't be exploded
2306 not body.should_explode
2307 # the opening bracket is an optional paren
2308 and opening_bracket.type == token.LPAR
2309 and not opening_bracket.value
2310 # the closing bracket is an optional paren
2311 and closing_bracket.type == token.RPAR
2312 and not closing_bracket.value
2313 # it's not an import (optional parens are the only thing we can split on
2314 # in this case; attempting a split without them is a waste of time)
2315 and not line.is_import
2316 # there are no standalone comments in the body
2317 and not body.contains_standalone_comments(0)
2318 # and we can actually remove the parens
2319 and can_omit_invisible_parens(body, line_length)
2321 omit = {id(closing_bracket), *omit}
2323 yield from right_hand_split(
2326 supports_trailing_commas=supports_trailing_commas,
2334 or is_line_short_enough(body, line_length=line_length)
2337 "Splitting failed, body is still too long and can't be split."
2340 elif head.contains_multiline_strings() or tail.contains_multiline_strings():
2342 "The current optional pair of parentheses is bound to fail to "
2343 "satisfy the splitting algorithm because the head or the tail "
2344 "contains multiline strings which by definition never fit one "
2348 ensure_visible(opening_bracket)
2349 ensure_visible(closing_bracket)
2350 for result in (head, body, tail):
2355 def bracket_split_succeeded_or_raise(head: Line, body: Line, tail: Line) -> None:
2356 """Raise :exc:`CannotSplit` if the last left- or right-hand split failed.
2358 Do nothing otherwise.
2360 A left- or right-hand split is based on a pair of brackets. Content before
2361 (and including) the opening bracket is left on one line, content inside the
2362 brackets is put on a separate line, and finally content starting with and
2363 following the closing bracket is put on a separate line.
2365 Those are called `head`, `body`, and `tail`, respectively. If the split
2366 produced the same line (all content in `head`) or ended up with an empty `body`
2367 and the `tail` is just the closing bracket, then it's considered failed.
2369 tail_len = len(str(tail).strip())
2372 raise CannotSplit("Splitting brackets produced the same line")
2376 f"Splitting brackets on an empty body to save "
2377 f"{tail_len} characters is not worth it"
2381 def bracket_split_build_line(
2382 leaves: List[Leaf], original: Line, opening_bracket: Leaf, *, is_body: bool = False
2384 """Return a new line with given `leaves` and respective comments from `original`.
2386 If `is_body` is True, the result line is one-indented inside brackets and as such
2387 has its first leaf's prefix normalized and a trailing comma added when expected.
2389 result = Line(depth=original.depth)
2391 result.inside_brackets = True
2394 # Since body is a new indent level, remove spurious leading whitespace.
2395 normalize_prefix(leaves[0], inside_brackets=True)
2396 # Ensure a trailing comma when expected.
2397 if original.is_import:
2398 if leaves[-1].type != token.COMMA:
2399 leaves.append(Leaf(token.COMMA, ","))
2402 result.append(leaf, preformatted=True)
2403 for comment_after in original.comments_after(leaf):
2404 result.append(comment_after, preformatted=True)
2406 result.should_explode = should_explode(result, opening_bracket)
2410 def dont_increase_indentation(split_func: SplitFunc) -> SplitFunc:
2411 """Normalize prefix of the first leaf in every line returned by `split_func`.
2413 This is a decorator over relevant split functions.
2418 line: Line, supports_trailing_commas: bool = False
2419 ) -> Iterator[Line]:
2420 for l in split_func(line, supports_trailing_commas):
2421 normalize_prefix(l.leaves[0], inside_brackets=True)
2424 return split_wrapper
2427 @dont_increase_indentation
2428 def delimiter_split(
2429 line: Line, supports_trailing_commas: bool = False
2430 ) -> Iterator[Line]:
2431 """Split according to delimiters of the highest priority.
2433 If `supports_trailing_commas` is True, the split will add trailing commas
2434 also in function signatures that contain `*` and `**`.
2437 last_leaf = line.leaves[-1]
2439 raise CannotSplit("Line empty")
2441 bt = line.bracket_tracker
2443 delimiter_priority = bt.max_delimiter_priority(exclude={id(last_leaf)})
2445 raise CannotSplit("No delimiters found")
2447 if delimiter_priority == DOT_PRIORITY:
2448 if bt.delimiter_count_with_priority(delimiter_priority) == 1:
2449 raise CannotSplit("Splitting a single attribute from its owner looks wrong")
2451 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2452 lowest_depth = sys.maxsize
2453 trailing_comma_safe = True
2455 def append_to_line(leaf: Leaf) -> Iterator[Line]:
2456 """Append `leaf` to current line or to new line if appending impossible."""
2457 nonlocal current_line
2459 current_line.append_safe(leaf, preformatted=True)
2463 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2464 current_line.append(leaf)
2466 for leaf in line.leaves:
2467 yield from append_to_line(leaf)
2469 for comment_after in line.comments_after(leaf):
2470 yield from append_to_line(comment_after)
2472 lowest_depth = min(lowest_depth, leaf.bracket_depth)
2473 if leaf.bracket_depth == lowest_depth and is_vararg(
2474 leaf, within=VARARGS_PARENTS
2476 trailing_comma_safe = trailing_comma_safe and supports_trailing_commas
2477 leaf_priority = bt.delimiters.get(id(leaf))
2478 if leaf_priority == delimiter_priority:
2481 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2485 and delimiter_priority == COMMA_PRIORITY
2486 and current_line.leaves[-1].type != token.COMMA
2487 and current_line.leaves[-1].type != STANDALONE_COMMENT
2489 current_line.append(Leaf(token.COMMA, ","))
2493 @dont_increase_indentation
2494 def standalone_comment_split(
2495 line: Line, supports_trailing_commas: bool = False
2496 ) -> Iterator[Line]:
2497 """Split standalone comments from the rest of the line."""
2498 if not line.contains_standalone_comments(0):
2499 raise CannotSplit("Line does not have any standalone comments")
2501 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2503 def append_to_line(leaf: Leaf) -> Iterator[Line]:
2504 """Append `leaf` to current line or to new line if appending impossible."""
2505 nonlocal current_line
2507 current_line.append_safe(leaf, preformatted=True)
2511 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2512 current_line.append(leaf)
2514 for leaf in line.leaves:
2515 yield from append_to_line(leaf)
2517 for comment_after in line.comments_after(leaf):
2518 yield from append_to_line(comment_after)
2524 def is_import(leaf: Leaf) -> bool:
2525 """Return True if the given leaf starts an import statement."""
2532 (v == "import" and p and p.type == syms.import_name)
2533 or (v == "from" and p and p.type == syms.import_from)
2538 def is_special_comment(leaf: Leaf) -> bool:
2539 """Return True if the given leaf is a special comment.
2540 Only returns true for type comments for now."""
2544 (t == token.COMMENT or t == STANDALONE_COMMENT) and (v.startswith("# type:"))
2548 def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
2549 """Leave existing extra newlines if not `inside_brackets`. Remove everything
2552 Note: don't use backslashes for formatting or you'll lose your voting rights.
2554 if not inside_brackets:
2555 spl = leaf.prefix.split("#")
2556 if "\\" not in spl[0]:
2557 nl_count = spl[-1].count("\n")
2560 leaf.prefix = "\n" * nl_count
2566 def normalize_string_prefix(leaf: Leaf, remove_u_prefix: bool = False) -> None:
2567 """Make all string prefixes lowercase.
2569 If remove_u_prefix is given, also removes any u prefix from the string.
2571 Note: Mutates its argument.
2573 match = re.match(r"^([furbFURB]*)(.*)$", leaf.value, re.DOTALL)
2574 assert match is not None, f"failed to match string {leaf.value!r}"
2575 orig_prefix = match.group(1)
2576 new_prefix = orig_prefix.lower()
2578 new_prefix = new_prefix.replace("u", "")
2579 leaf.value = f"{new_prefix}{match.group(2)}"
2582 def normalize_string_quotes(leaf: Leaf) -> None:
2583 """Prefer double quotes but only if it doesn't cause more escaping.
2585 Adds or removes backslashes as appropriate. Doesn't parse and fix
2586 strings nested in f-strings (yet).
2588 Note: Mutates its argument.
2590 value = leaf.value.lstrip("furbFURB")
2591 if value[:3] == '"""':
2594 elif value[:3] == "'''":
2597 elif value[0] == '"':
2603 first_quote_pos = leaf.value.find(orig_quote)
2604 if first_quote_pos == -1:
2605 return # There's an internal error
2607 prefix = leaf.value[:first_quote_pos]
2608 unescaped_new_quote = re.compile(rf"(([^\\]|^)(\\\\)*){new_quote}")
2609 escaped_new_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){new_quote}")
2610 escaped_orig_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){orig_quote}")
2611 body = leaf.value[first_quote_pos + len(orig_quote) : -len(orig_quote)]
2612 if "r" in prefix.casefold():
2613 if unescaped_new_quote.search(body):
2614 # There's at least one unescaped new_quote in this raw string
2615 # so converting is impossible
2618 # Do not introduce or remove backslashes in raw strings
2621 # remove unnecessary escapes
2622 new_body = sub_twice(escaped_new_quote, rf"\1\2{new_quote}", body)
2623 if body != new_body:
2624 # Consider the string without unnecessary escapes as the original
2626 leaf.value = f"{prefix}{orig_quote}{body}{orig_quote}"
2627 new_body = sub_twice(escaped_orig_quote, rf"\1\2{orig_quote}", new_body)
2628 new_body = sub_twice(unescaped_new_quote, rf"\1\\{new_quote}", new_body)
2629 if "f" in prefix.casefold():
2630 matches = re.findall(r"[^{]\{(.*?)\}[^}]", new_body)
2633 # Do not introduce backslashes in interpolated expressions
2635 if new_quote == '"""' and new_body[-1:] == '"':
2637 new_body = new_body[:-1] + '\\"'
2638 orig_escape_count = body.count("\\")
2639 new_escape_count = new_body.count("\\")
2640 if new_escape_count > orig_escape_count:
2641 return # Do not introduce more escaping
2643 if new_escape_count == orig_escape_count and orig_quote == '"':
2644 return # Prefer double quotes
2646 leaf.value = f"{prefix}{new_quote}{new_body}{new_quote}"
2649 def normalize_numeric_literal(leaf: Leaf) -> None:
2650 """Normalizes numeric (float, int, and complex) literals.
2652 All letters used in the representation are normalized to lowercase (except
2653 in Python 2 long literals).
2655 text = leaf.value.lower()
2656 if text.startswith(("0o", "0b")):
2657 # Leave octal and binary literals alone.
2659 elif text.startswith("0x"):
2660 # Change hex literals to upper case.
2661 before, after = text[:2], text[2:]
2662 text = f"{before}{after.upper()}"
2664 before, after = text.split("e")
2666 if after.startswith("-"):
2669 elif after.startswith("+"):
2671 before = format_float_or_int_string(before)
2672 text = f"{before}e{sign}{after}"
2673 elif text.endswith(("j", "l")):
2676 # Capitalize in "2L" because "l" looks too similar to "1".
2679 text = f"{format_float_or_int_string(number)}{suffix}"
2681 text = format_float_or_int_string(text)
2685 def format_float_or_int_string(text: str) -> str:
2686 """Formats a float string like "1.0"."""
2690 before, after = text.split(".")
2691 return f"{before or 0}.{after or 0}"
2694 def normalize_invisible_parens(node: Node, parens_after: Set[str]) -> None:
2695 """Make existing optional parentheses invisible or create new ones.
2697 `parens_after` is a set of string leaf values immeditely after which parens
2700 Standardizes on visible parentheses for single-element tuples, and keeps
2701 existing visible parentheses for other tuples and generator expressions.
2703 for pc in list_comments(node.prefix, is_endmarker=False):
2704 if pc.value in FMT_OFF:
2705 # This `node` has a prefix with `# fmt: off`, don't mess with parens.
2709 for index, child in enumerate(list(node.children)):
2711 if child.type == syms.atom:
2712 if maybe_make_parens_invisible_in_atom(child):
2713 lpar = Leaf(token.LPAR, "")
2714 rpar = Leaf(token.RPAR, "")
2715 index = child.remove() or 0
2716 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2717 elif is_one_tuple(child):
2718 # wrap child in visible parentheses
2719 lpar = Leaf(token.LPAR, "(")
2720 rpar = Leaf(token.RPAR, ")")
2722 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2723 elif node.type == syms.import_from:
2724 # "import from" nodes store parentheses directly as part of
2726 if child.type == token.LPAR:
2727 # make parentheses invisible
2728 child.value = "" # type: ignore
2729 node.children[-1].value = "" # type: ignore
2730 elif child.type != token.STAR:
2731 # insert invisible parentheses
2732 node.insert_child(index, Leaf(token.LPAR, ""))
2733 node.append_child(Leaf(token.RPAR, ""))
2736 elif not (isinstance(child, Leaf) and is_multiline_string(child)):
2737 # wrap child in invisible parentheses
2738 lpar = Leaf(token.LPAR, "")
2739 rpar = Leaf(token.RPAR, "")
2740 index = child.remove() or 0
2741 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2743 check_lpar = isinstance(child, Leaf) and child.value in parens_after
2746 def normalize_fmt_off(node: Node) -> None:
2747 """Convert content between `# fmt: off`/`# fmt: on` into standalone comments."""
2750 try_again = convert_one_fmt_off_pair(node)
2753 def convert_one_fmt_off_pair(node: Node) -> bool:
2754 """Convert content of a single `# fmt: off`/`# fmt: on` into a standalone comment.
2756 Returns True if a pair was converted.
2758 for leaf in node.leaves():
2759 previous_consumed = 0
2760 for comment in list_comments(leaf.prefix, is_endmarker=False):
2761 if comment.value in FMT_OFF:
2762 # We only want standalone comments. If there's no previous leaf or
2763 # the previous leaf is indentation, it's a standalone comment in
2765 if comment.type != STANDALONE_COMMENT:
2766 prev = preceding_leaf(leaf)
2767 if prev and prev.type not in WHITESPACE:
2770 ignored_nodes = list(generate_ignored_nodes(leaf))
2771 if not ignored_nodes:
2774 first = ignored_nodes[0] # Can be a container node with the `leaf`.
2775 parent = first.parent
2776 prefix = first.prefix
2777 first.prefix = prefix[comment.consumed :]
2779 comment.value + "\n" + "".join(str(n) for n in ignored_nodes)
2781 if hidden_value.endswith("\n"):
2782 # That happens when one of the `ignored_nodes` ended with a NEWLINE
2783 # leaf (possibly followed by a DEDENT).
2784 hidden_value = hidden_value[:-1]
2786 for ignored in ignored_nodes:
2787 index = ignored.remove()
2788 if first_idx is None:
2790 assert parent is not None, "INTERNAL ERROR: fmt: on/off handling (1)"
2791 assert first_idx is not None, "INTERNAL ERROR: fmt: on/off handling (2)"
2792 parent.insert_child(
2797 prefix=prefix[:previous_consumed] + "\n" * comment.newlines,
2802 previous_consumed = comment.consumed
2807 def generate_ignored_nodes(leaf: Leaf) -> Iterator[LN]:
2808 """Starting from the container of `leaf`, generate all leaves until `# fmt: on`.
2810 Stops at the end of the block.
2812 container: Optional[LN] = container_of(leaf)
2813 while container is not None and container.type != token.ENDMARKER:
2814 for comment in list_comments(container.prefix, is_endmarker=False):
2815 if comment.value in FMT_ON:
2820 container = container.next_sibling
2823 def maybe_make_parens_invisible_in_atom(node: LN) -> bool:
2824 """If it's safe, make the parens in the atom `node` invisible, recursively.
2826 Returns whether the node should itself be wrapped in invisible parentheses.
2830 node.type != syms.atom
2831 or is_empty_tuple(node)
2832 or is_one_tuple(node)
2834 or max_delimiter_priority_in_atom(node) >= COMMA_PRIORITY
2838 first = node.children[0]
2839 last = node.children[-1]
2840 if first.type == token.LPAR and last.type == token.RPAR:
2841 # make parentheses invisible
2842 first.value = "" # type: ignore
2843 last.value = "" # type: ignore
2844 if len(node.children) > 1:
2845 maybe_make_parens_invisible_in_atom(node.children[1])
2851 def is_empty_tuple(node: LN) -> bool:
2852 """Return True if `node` holds an empty tuple."""
2854 node.type == syms.atom
2855 and len(node.children) == 2
2856 and node.children[0].type == token.LPAR
2857 and node.children[1].type == token.RPAR
2861 def is_one_tuple(node: LN) -> bool:
2862 """Return True if `node` holds a tuple with one element, with or without parens."""
2863 if node.type == syms.atom:
2864 if len(node.children) != 3:
2867 lpar, gexp, rpar = node.children
2869 lpar.type == token.LPAR
2870 and gexp.type == syms.testlist_gexp
2871 and rpar.type == token.RPAR
2875 return len(gexp.children) == 2 and gexp.children[1].type == token.COMMA
2878 node.type in IMPLICIT_TUPLE
2879 and len(node.children) == 2
2880 and node.children[1].type == token.COMMA
2884 def is_yield(node: LN) -> bool:
2885 """Return True if `node` holds a `yield` or `yield from` expression."""
2886 if node.type == syms.yield_expr:
2889 if node.type == token.NAME and node.value == "yield": # type: ignore
2892 if node.type != syms.atom:
2895 if len(node.children) != 3:
2898 lpar, expr, rpar = node.children
2899 if lpar.type == token.LPAR and rpar.type == token.RPAR:
2900 return is_yield(expr)
2905 def is_vararg(leaf: Leaf, within: Set[NodeType]) -> bool:
2906 """Return True if `leaf` is a star or double star in a vararg or kwarg.
2908 If `within` includes VARARGS_PARENTS, this applies to function signatures.
2909 If `within` includes UNPACKING_PARENTS, it applies to right hand-side
2910 extended iterable unpacking (PEP 3132) and additional unpacking
2911 generalizations (PEP 448).
2913 if leaf.type not in STARS or not leaf.parent:
2917 if p.type == syms.star_expr:
2918 # Star expressions are also used as assignment targets in extended
2919 # iterable unpacking (PEP 3132). See what its parent is instead.
2925 return p.type in within
2928 def is_multiline_string(leaf: Leaf) -> bool:
2929 """Return True if `leaf` is a multiline string that actually spans many lines."""
2930 value = leaf.value.lstrip("furbFURB")
2931 return value[:3] in {'"""', "'''"} and "\n" in value
2934 def is_stub_suite(node: Node) -> bool:
2935 """Return True if `node` is a suite with a stub body."""
2937 len(node.children) != 4
2938 or node.children[0].type != token.NEWLINE
2939 or node.children[1].type != token.INDENT
2940 or node.children[3].type != token.DEDENT
2944 return is_stub_body(node.children[2])
2947 def is_stub_body(node: LN) -> bool:
2948 """Return True if `node` is a simple statement containing an ellipsis."""
2949 if not isinstance(node, Node) or node.type != syms.simple_stmt:
2952 if len(node.children) != 2:
2955 child = node.children[0]
2957 child.type == syms.atom
2958 and len(child.children) == 3
2959 and all(leaf == Leaf(token.DOT, ".") for leaf in child.children)
2963 def max_delimiter_priority_in_atom(node: LN) -> int:
2964 """Return maximum delimiter priority inside `node`.
2966 This is specific to atoms with contents contained in a pair of parentheses.
2967 If `node` isn't an atom or there are no enclosing parentheses, returns 0.
2969 if node.type != syms.atom:
2972 first = node.children[0]
2973 last = node.children[-1]
2974 if not (first.type == token.LPAR and last.type == token.RPAR):
2977 bt = BracketTracker()
2978 for c in node.children[1:-1]:
2979 if isinstance(c, Leaf):
2982 for leaf in c.leaves():
2985 return bt.max_delimiter_priority()
2991 def ensure_visible(leaf: Leaf) -> None:
2992 """Make sure parentheses are visible.
2994 They could be invisible as part of some statements (see
2995 :func:`normalize_invible_parens` and :func:`visit_import_from`).
2997 if leaf.type == token.LPAR:
2999 elif leaf.type == token.RPAR:
3003 def should_explode(line: Line, opening_bracket: Leaf) -> bool:
3004 """Should `line` immediately be split with `delimiter_split()` after RHS?"""
3007 opening_bracket.parent
3008 and opening_bracket.parent.type in {syms.atom, syms.import_from}
3009 and opening_bracket.value in "[{("
3014 last_leaf = line.leaves[-1]
3015 exclude = {id(last_leaf)} if last_leaf.type == token.COMMA else set()
3016 max_priority = line.bracket_tracker.max_delimiter_priority(exclude=exclude)
3017 except (IndexError, ValueError):
3020 return max_priority == COMMA_PRIORITY
3023 def get_features_used(node: Node) -> Set[Feature]:
3024 """Return a set of (relatively) new Python features used in this file.
3026 Currently looking for:
3028 - underscores in numeric literals; and
3029 - trailing commas after * or ** in function signatures and calls.
3031 features: Set[Feature] = set()
3032 for n in node.pre_order():
3033 if n.type == token.STRING:
3034 value_head = n.value[:2] # type: ignore
3035 if value_head in {'f"', 'F"', "f'", "F'", "rf", "fr", "RF", "FR"}:
3036 features.add(Feature.F_STRINGS)
3038 elif n.type == token.NUMBER:
3039 if "_" in n.value: # type: ignore
3040 features.add(Feature.NUMERIC_UNDERSCORES)
3043 n.type in {syms.typedargslist, syms.arglist}
3045 and n.children[-1].type == token.COMMA
3047 for ch in n.children:
3048 if ch.type in STARS:
3049 features.add(Feature.TRAILING_COMMA)
3051 if ch.type == syms.argument:
3052 for argch in ch.children:
3053 if argch.type in STARS:
3054 features.add(Feature.TRAILING_COMMA)
3059 def detect_target_versions(node: Node) -> Set[TargetVersion]:
3060 """Detect the version to target based on the nodes used."""
3061 features = get_features_used(node)
3063 version for version in TargetVersion if features <= VERSION_TO_FEATURES[version]
3067 def generate_trailers_to_omit(line: Line, line_length: int) -> Iterator[Set[LeafID]]:
3068 """Generate sets of closing bracket IDs that should be omitted in a RHS.
3070 Brackets can be omitted if the entire trailer up to and including
3071 a preceding closing bracket fits in one line.
3073 Yielded sets are cumulative (contain results of previous yields, too). First
3077 omit: Set[LeafID] = set()
3080 length = 4 * line.depth
3081 opening_bracket = None
3082 closing_bracket = None
3083 inner_brackets: Set[LeafID] = set()
3084 for index, leaf, leaf_length in enumerate_with_length(line, reversed=True):
3085 length += leaf_length
3086 if length > line_length:
3089 has_inline_comment = leaf_length > len(leaf.value) + len(leaf.prefix)
3090 if leaf.type == STANDALONE_COMMENT or has_inline_comment:
3094 if leaf is opening_bracket:
3095 opening_bracket = None
3096 elif leaf.type in CLOSING_BRACKETS:
3097 inner_brackets.add(id(leaf))
3098 elif leaf.type in CLOSING_BRACKETS:
3099 if index > 0 and line.leaves[index - 1].type in OPENING_BRACKETS:
3100 # Empty brackets would fail a split so treat them as "inner"
3101 # brackets (e.g. only add them to the `omit` set if another
3102 # pair of brackets was good enough.
3103 inner_brackets.add(id(leaf))
3107 omit.add(id(closing_bracket))
3108 omit.update(inner_brackets)
3109 inner_brackets.clear()
3113 opening_bracket = leaf.opening_bracket
3114 closing_bracket = leaf
3117 def get_future_imports(node: Node) -> Set[str]:
3118 """Return a set of __future__ imports in the file."""
3119 imports: Set[str] = set()
3121 def get_imports_from_children(children: List[LN]) -> Generator[str, None, None]:
3122 for child in children:
3123 if isinstance(child, Leaf):
3124 if child.type == token.NAME:
3126 elif child.type == syms.import_as_name:
3127 orig_name = child.children[0]
3128 assert isinstance(orig_name, Leaf), "Invalid syntax parsing imports"
3129 assert orig_name.type == token.NAME, "Invalid syntax parsing imports"
3130 yield orig_name.value
3131 elif child.type == syms.import_as_names:
3132 yield from get_imports_from_children(child.children)
3134 assert False, "Invalid syntax parsing imports"
3136 for child in node.children:
3137 if child.type != syms.simple_stmt:
3139 first_child = child.children[0]
3140 if isinstance(first_child, Leaf):
3141 # Continue looking if we see a docstring; otherwise stop.
3143 len(child.children) == 2
3144 and first_child.type == token.STRING
3145 and child.children[1].type == token.NEWLINE
3150 elif first_child.type == syms.import_from:
3151 module_name = first_child.children[1]
3152 if not isinstance(module_name, Leaf) or module_name.value != "__future__":
3154 imports |= set(get_imports_from_children(first_child.children[3:]))
3160 def gen_python_files_in_dir(
3163 include: Pattern[str],
3164 exclude: Pattern[str],
3166 ) -> Iterator[Path]:
3167 """Generate all files under `path` whose paths are not excluded by the
3168 `exclude` regex, but are included by the `include` regex.
3170 Symbolic links pointing outside of the `root` directory are ignored.
3172 `report` is where output about exclusions goes.
3174 assert root.is_absolute(), f"INTERNAL ERROR: `root` must be absolute but is {root}"
3175 for child in path.iterdir():
3177 normalized_path = "/" + child.resolve().relative_to(root).as_posix()
3179 if child.is_symlink():
3180 report.path_ignored(
3181 child, f"is a symbolic link that points outside {root}"
3188 normalized_path += "/"
3189 exclude_match = exclude.search(normalized_path)
3190 if exclude_match and exclude_match.group(0):
3191 report.path_ignored(child, f"matches the --exclude regular expression")
3195 yield from gen_python_files_in_dir(child, root, include, exclude, report)
3197 elif child.is_file():
3198 include_match = include.search(normalized_path)
3204 def find_project_root(srcs: Iterable[str]) -> Path:
3205 """Return a directory containing .git, .hg, or pyproject.toml.
3207 That directory can be one of the directories passed in `srcs` or their
3210 If no directory in the tree contains a marker that would specify it's the
3211 project root, the root of the file system is returned.
3214 return Path("/").resolve()
3216 common_base = min(Path(src).resolve() for src in srcs)
3217 if common_base.is_dir():
3218 # Append a fake file so `parents` below returns `common_base_dir`, too.
3219 common_base /= "fake-file"
3220 for directory in common_base.parents:
3221 if (directory / ".git").is_dir():
3224 if (directory / ".hg").is_dir():
3227 if (directory / "pyproject.toml").is_file():
3235 """Provides a reformatting counter. Can be rendered with `str(report)`."""
3239 verbose: bool = False
3240 change_count: int = 0
3242 failure_count: int = 0
3244 def done(self, src: Path, changed: Changed) -> None:
3245 """Increment the counter for successful reformatting. Write out a message."""
3246 if changed is Changed.YES:
3247 reformatted = "would reformat" if self.check else "reformatted"
3248 if self.verbose or not self.quiet:
3249 out(f"{reformatted} {src}")
3250 self.change_count += 1
3253 if changed is Changed.NO:
3254 msg = f"{src} already well formatted, good job."
3256 msg = f"{src} wasn't modified on disk since last run."
3257 out(msg, bold=False)
3258 self.same_count += 1
3260 def failed(self, src: Path, message: str) -> None:
3261 """Increment the counter for failed reformatting. Write out a message."""
3262 err(f"error: cannot format {src}: {message}")
3263 self.failure_count += 1
3265 def path_ignored(self, path: Path, message: str) -> None:
3267 out(f"{path} ignored: {message}", bold=False)
3270 def return_code(self) -> int:
3271 """Return the exit code that the app should use.
3273 This considers the current state of changed files and failures:
3274 - if there were any failures, return 123;
3275 - if any files were changed and --check is being used, return 1;
3276 - otherwise return 0.
3278 # According to http://tldp.org/LDP/abs/html/exitcodes.html starting with
3279 # 126 we have special return codes reserved by the shell.
3280 if self.failure_count:
3283 elif self.change_count and self.check:
3288 def __str__(self) -> str:
3289 """Render a color report of the current state.
3291 Use `click.unstyle` to remove colors.
3294 reformatted = "would be reformatted"
3295 unchanged = "would be left unchanged"
3296 failed = "would fail to reformat"
3298 reformatted = "reformatted"
3299 unchanged = "left unchanged"
3300 failed = "failed to reformat"
3302 if self.change_count:
3303 s = "s" if self.change_count > 1 else ""
3305 click.style(f"{self.change_count} file{s} {reformatted}", bold=True)
3308 s = "s" if self.same_count > 1 else ""
3309 report.append(f"{self.same_count} file{s} {unchanged}")
3310 if self.failure_count:
3311 s = "s" if self.failure_count > 1 else ""
3313 click.style(f"{self.failure_count} file{s} {failed}", fg="red")
3315 return ", ".join(report) + "."
3318 def assert_equivalent(src: str, dst: str) -> None:
3319 """Raise AssertionError if `src` and `dst` aren't equivalent."""
3324 def _v(node: ast.AST, depth: int = 0) -> Iterator[str]:
3325 """Simple visitor generating strings to compare ASTs by content."""
3326 yield f"{' ' * depth}{node.__class__.__name__}("
3328 for field in sorted(node._fields):
3330 value = getattr(node, field)
3331 except AttributeError:
3334 yield f"{' ' * (depth+1)}{field}="
3336 if isinstance(value, list):
3338 # Ignore nested tuples within del statements, because we may insert
3339 # parentheses and they change the AST.
3342 and isinstance(node, ast.Delete)
3343 and isinstance(item, ast.Tuple)
3345 for item in item.elts:
3346 yield from _v(item, depth + 2)
3347 elif isinstance(item, ast.AST):
3348 yield from _v(item, depth + 2)
3350 elif isinstance(value, ast.AST):
3351 yield from _v(value, depth + 2)
3354 yield f"{' ' * (depth+2)}{value!r}, # {value.__class__.__name__}"
3356 yield f"{' ' * depth}) # /{node.__class__.__name__}"
3359 src_ast = ast.parse(src)
3360 except Exception as exc:
3361 major, minor = sys.version_info[:2]
3362 raise AssertionError(
3363 f"cannot use --safe with this file; failed to parse source file "
3364 f"with Python {major}.{minor}'s builtin AST. Re-run with --fast "
3365 f"or stop using deprecated Python 2 syntax. AST error message: {exc}"
3369 dst_ast = ast.parse(dst)
3370 except Exception as exc:
3371 log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
3372 raise AssertionError(
3373 f"INTERNAL ERROR: Black produced invalid code: {exc}. "
3374 f"Please report a bug on https://github.com/ambv/black/issues. "
3375 f"This invalid output might be helpful: {log}"
3378 src_ast_str = "\n".join(_v(src_ast))
3379 dst_ast_str = "\n".join(_v(dst_ast))
3380 if src_ast_str != dst_ast_str:
3381 log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst"))
3382 raise AssertionError(
3383 f"INTERNAL ERROR: Black produced code that is not equivalent to "
3385 f"Please report a bug on https://github.com/ambv/black/issues. "
3386 f"This diff might be helpful: {log}"
3390 def assert_stable(src: str, dst: str, mode: FileMode) -> None:
3391 """Raise AssertionError if `dst` reformats differently the second time."""
3392 newdst = format_str(dst, mode=mode)
3395 diff(src, dst, "source", "first pass"),
3396 diff(dst, newdst, "first pass", "second pass"),
3398 raise AssertionError(
3399 f"INTERNAL ERROR: Black produced different code on the second pass "
3400 f"of the formatter. "
3401 f"Please report a bug on https://github.com/ambv/black/issues. "
3402 f"This diff might be helpful: {log}"
3406 def dump_to_file(*output: str) -> str:
3407 """Dump `output` to a temporary file. Return path to the file."""
3410 with tempfile.NamedTemporaryFile(
3411 mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
3413 for lines in output:
3415 if lines and lines[-1] != "\n":
3420 def diff(a: str, b: str, a_name: str, b_name: str) -> str:
3421 """Return a unified diff string between strings `a` and `b`."""
3424 a_lines = [line + "\n" for line in a.split("\n")]
3425 b_lines = [line + "\n" for line in b.split("\n")]
3427 difflib.unified_diff(a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5)
3431 def cancel(tasks: Iterable[asyncio.Task]) -> None:
3432 """asyncio signal handler that cancels all `tasks` and reports to stderr."""
3438 def shutdown(loop: BaseEventLoop) -> None:
3439 """Cancel all pending tasks on `loop`, wait for them, and close the loop."""
3441 # This part is borrowed from asyncio/runners.py in Python 3.7b2.
3442 to_cancel = [task for task in asyncio.Task.all_tasks(loop) if not task.done()]
3446 for task in to_cancel:
3448 loop.run_until_complete(
3449 asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
3452 # `concurrent.futures.Future` objects cannot be cancelled once they
3453 # are already running. There might be some when the `shutdown()` happened.
3454 # Silence their logger's spew about the event loop being closed.
3455 cf_logger = logging.getLogger("concurrent.futures")
3456 cf_logger.setLevel(logging.CRITICAL)
3460 def sub_twice(regex: Pattern[str], replacement: str, original: str) -> str:
3461 """Replace `regex` with `replacement` twice on `original`.
3463 This is used by string normalization to perform replaces on
3464 overlapping matches.
3466 return regex.sub(replacement, regex.sub(replacement, original))
3469 def re_compile_maybe_verbose(regex: str) -> Pattern[str]:
3470 """Compile a regular expression string in `regex`.
3472 If it contains newlines, use verbose mode.
3475 regex = "(?x)" + regex
3476 return re.compile(regex)
3479 def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
3480 """Like `reversed(enumerate(sequence))` if that were possible."""
3481 index = len(sequence) - 1
3482 for element in reversed(sequence):
3483 yield (index, element)
3487 def enumerate_with_length(
3488 line: Line, reversed: bool = False
3489 ) -> Iterator[Tuple[Index, Leaf, int]]:
3490 """Return an enumeration of leaves with their length.
3492 Stops prematurely on multiline strings and standalone comments.
3495 Callable[[Sequence[Leaf]], Iterator[Tuple[Index, Leaf]]],
3496 enumerate_reversed if reversed else enumerate,
3498 for index, leaf in op(line.leaves):
3499 length = len(leaf.prefix) + len(leaf.value)
3500 if "\n" in leaf.value:
3501 return # Multiline strings, we can't continue.
3503 comment: Optional[Leaf]
3504 for comment in line.comments_after(leaf):
3505 length += len(comment.value)
3507 yield index, leaf, length
3510 def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
3511 """Return True if `line` is no longer than `line_length`.
3513 Uses the provided `line_str` rendering, if any, otherwise computes a new one.
3516 line_str = str(line).strip("\n")
3518 len(line_str) <= line_length
3519 and "\n" not in line_str # multiline strings
3520 and not line.contains_standalone_comments()
3524 def can_be_split(line: Line) -> bool:
3525 """Return False if the line cannot be split *for sure*.
3527 This is not an exhaustive search but a cheap heuristic that we can use to
3528 avoid some unfortunate formattings (mostly around wrapping unsplittable code
3529 in unnecessary parentheses).
3531 leaves = line.leaves
3535 if leaves[0].type == token.STRING and leaves[1].type == token.DOT:
3539 for leaf in leaves[-2::-1]:
3540 if leaf.type in OPENING_BRACKETS:
3541 if next.type not in CLOSING_BRACKETS:
3545 elif leaf.type == token.DOT:
3547 elif leaf.type == token.NAME:
3548 if not (next.type == token.DOT or next.type in OPENING_BRACKETS):
3551 elif leaf.type not in CLOSING_BRACKETS:
3554 if dot_count > 1 and call_count > 1:
3560 def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
3561 """Does `line` have a shape safe to reformat without optional parens around it?
3563 Returns True for only a subset of potentially nice looking formattings but
3564 the point is to not return false positives that end up producing lines that
3567 bt = line.bracket_tracker
3568 if not bt.delimiters:
3569 # Without delimiters the optional parentheses are useless.
3572 max_priority = bt.max_delimiter_priority()
3573 if bt.delimiter_count_with_priority(max_priority) > 1:
3574 # With more than one delimiter of a kind the optional parentheses read better.
3577 if max_priority == DOT_PRIORITY:
3578 # A single stranded method call doesn't require optional parentheses.
3581 assert len(line.leaves) >= 2, "Stranded delimiter"
3583 first = line.leaves[0]
3584 second = line.leaves[1]
3585 penultimate = line.leaves[-2]
3586 last = line.leaves[-1]
3588 # With a single delimiter, omit if the expression starts or ends with
3590 if first.type in OPENING_BRACKETS and second.type not in CLOSING_BRACKETS:
3592 length = 4 * line.depth
3593 for _index, leaf, leaf_length in enumerate_with_length(line):
3594 if leaf.type in CLOSING_BRACKETS and leaf.opening_bracket is first:
3597 length += leaf_length
3598 if length > line_length:
3601 if leaf.type in OPENING_BRACKETS:
3602 # There are brackets we can further split on.
3606 # checked the entire string and line length wasn't exceeded
3607 if len(line.leaves) == _index + 1:
3610 # Note: we are not returning False here because a line might have *both*
3611 # a leading opening bracket and a trailing closing bracket. If the
3612 # opening bracket doesn't match our rule, maybe the closing will.
3615 last.type == token.RPAR
3616 or last.type == token.RBRACE
3618 # don't use indexing for omitting optional parentheses;
3620 last.type == token.RSQB
3622 and last.parent.type != syms.trailer
3625 if penultimate.type in OPENING_BRACKETS:
3626 # Empty brackets don't help.
3629 if is_multiline_string(first):
3630 # Additional wrapping of a multiline string in this situation is
3634 length = 4 * line.depth
3635 seen_other_brackets = False
3636 for _index, leaf, leaf_length in enumerate_with_length(line):
3637 length += leaf_length
3638 if leaf is last.opening_bracket:
3639 if seen_other_brackets or length <= line_length:
3642 elif leaf.type in OPENING_BRACKETS:
3643 # There are brackets we can further split on.
3644 seen_other_brackets = True
3649 def get_cache_file(mode: FileMode) -> Path:
3650 return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
3653 def read_cache(mode: FileMode) -> Cache:
3654 """Read the cache if it exists and is well formed.
3656 If it is not well formed, the call to write_cache later should resolve the issue.
3658 cache_file = get_cache_file(mode)
3659 if not cache_file.exists():
3662 with cache_file.open("rb") as fobj:
3664 cache: Cache = pickle.load(fobj)
3665 except pickle.UnpicklingError:
3671 def get_cache_info(path: Path) -> CacheInfo:
3672 """Return the information used to check if a file is already formatted or not."""
3674 return stat.st_mtime, stat.st_size
3677 def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
3678 """Split an iterable of paths in `sources` into two sets.
3680 The first contains paths of files that modified on disk or are not in the
3681 cache. The other contains paths to non-modified files.
3683 todo, done = set(), set()
3686 if cache.get(src) != get_cache_info(src):
3693 def write_cache(cache: Cache, sources: Iterable[Path], mode: FileMode) -> None:
3694 """Update the cache file."""
3695 cache_file = get_cache_file(mode)
3697 CACHE_DIR.mkdir(parents=True, exist_ok=True)
3698 new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
3699 with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
3700 pickle.dump(new_cache, f, protocol=pickle.HIGHEST_PROTOCOL)
3701 os.replace(f.name, cache_file)
3706 def patch_click() -> None:
3707 """Make Click not crash.
3709 On certain misconfigured environments, Python 3 selects the ASCII encoding as the
3710 default which restricts paths that it can access during the lifetime of the
3711 application. Click refuses to work in this scenario by raising a RuntimeError.
3713 In case of Black the likelihood that non-ASCII characters are going to be used in
3714 file paths is minimal since it's Python source code. Moreover, this crash was
3715 spurious on Python 3.7 thanks to PEP 538 and PEP 540.
3718 from click import core
3719 from click import _unicodefun # type: ignore
3720 except ModuleNotFoundError:
3723 for module in (core, _unicodefun):
3724 if hasattr(module, "_verify_python3_env"):
3725 module._verify_python3_env = lambda: None
3728 def patched_main() -> None:
3734 if __name__ == "__main__":