All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
2 from concurrent.futures import Executor, ProcessPoolExecutor
3 from datetime import datetime
5 from functools import lru_cache, partial, wraps
9 from multiprocessing import Manager, freeze_support
11 from pathlib import Path
39 from appdirs import user_cache_dir
40 from attr import dataclass, evolve, Factory
43 from typed_ast import ast3, ast27
46 from blib2to3.pytree import Node, Leaf, type_repr
47 from blib2to3 import pygram, pytree
48 from blib2to3.pgen2 import driver, token
49 from blib2to3.pgen2.grammar import Grammar
50 from blib2to3.pgen2.parse import ParseError
53 __version__ = "19.3b0"
54 DEFAULT_LINE_LENGTH = 88
56 r"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|_build|buck-out|build|dist)/"
58 DEFAULT_INCLUDES = r"\.pyi?$"
59 CACHE_DIR = Path(user_cache_dir("black", version=__version__))
71 LN = Union[Leaf, Node]
72 SplitFunc = Callable[["Line", Collection["Feature"]], Iterator["Line"]]
75 CacheInfo = Tuple[Timestamp, FileSize]
76 Cache = Dict[Path, CacheInfo]
77 out = partial(click.secho, bold=True, err=True)
78 err = partial(click.secho, fg="red", err=True)
80 pygram.initialize(CACHE_DIR)
81 syms = pygram.python_symbols
84 class NothingChanged(UserWarning):
85 """Raised when reformatted code is the same as source."""
88 class CannotSplit(Exception):
89 """A readable split that fits the allotted line length is impossible."""
92 class InvalidInput(ValueError):
93 """Raised when input source code fails all parse attempts."""
96 class WriteBack(Enum):
103 def from_configuration(cls, *, check: bool, diff: bool) -> "WriteBack":
104 if check and not diff:
107 return cls.DIFF if diff else cls.YES
116 class TargetVersion(Enum):
125 def is_python2(self) -> bool:
126 return self is TargetVersion.PY27
129 PY36_VERSIONS = {TargetVersion.PY36, TargetVersion.PY37, TargetVersion.PY38}
133 # All string literals are unicode
136 NUMERIC_UNDERSCORES = 3
137 TRAILING_COMMA_IN_CALL = 4
138 TRAILING_COMMA_IN_DEF = 5
139 # The following two feature-flags are mutually exclusive, and exactly one should be
140 # set for every version of python.
141 ASYNC_IDENTIFIERS = 6
145 VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
146 TargetVersion.PY27: {Feature.ASYNC_IDENTIFIERS},
147 TargetVersion.PY33: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
148 TargetVersion.PY34: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
149 TargetVersion.PY35: {
150 Feature.UNICODE_LITERALS,
151 Feature.TRAILING_COMMA_IN_CALL,
152 Feature.ASYNC_IDENTIFIERS,
154 TargetVersion.PY36: {
155 Feature.UNICODE_LITERALS,
157 Feature.NUMERIC_UNDERSCORES,
158 Feature.TRAILING_COMMA_IN_CALL,
159 Feature.TRAILING_COMMA_IN_DEF,
160 Feature.ASYNC_IDENTIFIERS,
162 TargetVersion.PY37: {
163 Feature.UNICODE_LITERALS,
165 Feature.NUMERIC_UNDERSCORES,
166 Feature.TRAILING_COMMA_IN_CALL,
167 Feature.TRAILING_COMMA_IN_DEF,
168 Feature.ASYNC_KEYWORDS,
170 TargetVersion.PY38: {
171 Feature.UNICODE_LITERALS,
173 Feature.NUMERIC_UNDERSCORES,
174 Feature.TRAILING_COMMA_IN_CALL,
175 Feature.TRAILING_COMMA_IN_DEF,
176 Feature.ASYNC_KEYWORDS,
183 target_versions: Set[TargetVersion] = Factory(set)
184 line_length: int = DEFAULT_LINE_LENGTH
185 string_normalization: bool = True
188 def get_cache_key(self) -> str:
189 if self.target_versions:
190 version_str = ",".join(
192 for version in sorted(self.target_versions, key=lambda v: v.value)
198 str(self.line_length),
199 str(int(self.string_normalization)),
200 str(int(self.is_pyi)),
202 return ".".join(parts)
205 def supports_feature(target_versions: Set[TargetVersion], feature: Feature) -> bool:
206 return all(feature in VERSION_TO_FEATURES[version] for version in target_versions)
209 def read_pyproject_toml(
210 ctx: click.Context, param: click.Parameter, value: Union[str, int, bool, None]
212 """Inject Black configuration from "pyproject.toml" into defaults in `ctx`.
214 Returns the path to a successfully found and read configuration file, None
217 assert not isinstance(value, (int, bool)), "Invalid parameter type passed"
219 root = find_project_root(ctx.params.get("src", ()))
220 path = root / "pyproject.toml"
227 pyproject_toml = toml.load(value)
228 config = pyproject_toml.get("tool", {}).get("black", {})
229 except (toml.TomlDecodeError, OSError) as e:
230 raise click.FileError(
231 filename=value, hint=f"Error reading configuration file: {e}"
237 if ctx.default_map is None:
239 ctx.default_map.update( # type: ignore # bad types in .pyi
240 {k.replace("--", "").replace("-", "_"): v for k, v in config.items()}
245 @click.command(context_settings=dict(help_option_names=["-h", "--help"]))
246 @click.option("-c", "--code", type=str, help="Format the code passed in as a string.")
251 default=DEFAULT_LINE_LENGTH,
252 help="How many characters per line to allow.",
258 type=click.Choice([v.name.lower() for v in TargetVersion]),
259 callback=lambda c, p, v: [TargetVersion[val.upper()] for val in v],
262 "Python versions that should be supported by Black's output. [default: "
263 "per-file auto-detection]"
270 "Allow using Python 3.6-only syntax on all input files. This will put "
271 "trailing commas in function signatures and calls also after *args and "
272 "**kwargs. Deprecated; use --target-version instead. "
273 "[default: per-file auto-detection]"
280 "Format all input files like typing stubs regardless of file extension "
281 "(useful when piping source on standard input)."
286 "--skip-string-normalization",
288 help="Don't normalize string quotes or prefixes.",
294 "Don't write the files back, just return the status. Return code 0 "
295 "means nothing would change. Return code 1 means some files would be "
296 "reformatted. Return code 123 means there was an internal error."
302 help="Don't write the files back, just output a diff for each file on stdout.",
307 help="If --fast given, skip temporary sanity checks. [default: --safe]",
312 default=DEFAULT_INCLUDES,
314 "A regular expression that matches files and directories that should be "
315 "included on recursive searches. An empty value means all files are "
316 "included regardless of the name. Use forward slashes for directories on "
317 "all platforms (Windows, too). Exclusions are calculated first, inclusions "
325 default=DEFAULT_EXCLUDES,
327 "A regular expression that matches files and directories that should be "
328 "excluded on recursive searches. An empty value means no paths are excluded. "
329 "Use forward slashes for directories on all platforms (Windows, too). "
330 "Exclusions are calculated first, inclusions later."
339 "Don't emit non-error messages to stderr. Errors are still emitted; "
340 "silence those with 2>/dev/null."
348 "Also emit messages to stderr about files that were not changed or were "
349 "ignored due to --exclude=."
352 @click.version_option(version=__version__)
357 exists=True, file_okay=True, dir_okay=True, readable=True, allow_dash=True
364 exists=False, file_okay=True, dir_okay=False, readable=True, allow_dash=False
367 callback=read_pyproject_toml,
368 help="Read configuration from PATH.",
375 target_version: List[TargetVersion],
381 skip_string_normalization: bool,
387 config: Optional[str],
389 """The uncompromising code formatter."""
390 write_back = WriteBack.from_configuration(check=check, diff=diff)
393 err(f"Cannot use both --target-version and --py36")
396 versions = set(target_version)
399 "--py36 is deprecated and will be removed in a future version. "
400 "Use --target-version py36 instead."
402 versions = PY36_VERSIONS
404 # We'll autodetect later.
407 target_versions=versions,
408 line_length=line_length,
410 string_normalization=not skip_string_normalization,
412 if config and verbose:
413 out(f"Using configuration from {config}.", bold=False, fg="blue")
415 print(format_str(code, mode=mode))
418 include_regex = re_compile_maybe_verbose(include)
420 err(f"Invalid regular expression for include given: {include!r}")
423 exclude_regex = re_compile_maybe_verbose(exclude)
425 err(f"Invalid regular expression for exclude given: {exclude!r}")
427 report = Report(check=check, quiet=quiet, verbose=verbose)
428 root = find_project_root(src)
429 sources: Set[Path] = set()
434 gen_python_files_in_dir(p, root, include_regex, exclude_regex, report)
436 elif p.is_file() or s == "-":
437 # if a file was explicitly given, we don't care about its extension
440 err(f"invalid path: {s}")
441 if len(sources) == 0:
442 if verbose or not quiet:
443 out("No paths given. Nothing to do 😴")
446 if len(sources) == 1:
450 write_back=write_back,
456 sources=sources, fast=fast, write_back=write_back, mode=mode, report=report
459 if verbose or not quiet:
460 out("Oh no! 💥 💔 💥" if report.return_code else "All done! ✨ 🍰 ✨")
461 click.secho(str(report), err=True)
462 ctx.exit(report.return_code)
466 src: Path, fast: bool, write_back: WriteBack, mode: FileMode, report: "Report"
468 """Reformat a single file under `src` without spawning child processes.
470 `fast`, `write_back`, and `mode` options are passed to
471 :func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
475 if not src.is_file() and str(src) == "-":
476 if format_stdin_to_stdout(fast=fast, write_back=write_back, mode=mode):
477 changed = Changed.YES
480 if write_back != WriteBack.DIFF:
481 cache = read_cache(mode)
482 res_src = src.resolve()
483 if res_src in cache and cache[res_src] == get_cache_info(res_src):
484 changed = Changed.CACHED
485 if changed is not Changed.CACHED and format_file_in_place(
486 src, fast=fast, write_back=write_back, mode=mode
488 changed = Changed.YES
489 if (write_back is WriteBack.YES and changed is not Changed.CACHED) or (
490 write_back is WriteBack.CHECK and changed is Changed.NO
492 write_cache(cache, [src], mode)
493 report.done(src, changed)
494 except Exception as exc:
495 report.failed(src, str(exc))
501 write_back: WriteBack,
505 """Reformat multiple files using a ProcessPoolExecutor."""
506 loop = asyncio.get_event_loop()
507 worker_count = os.cpu_count()
508 if sys.platform == "win32":
509 # Work around https://bugs.python.org/issue26903
510 worker_count = min(worker_count, 61)
511 executor = ProcessPoolExecutor(max_workers=worker_count)
513 loop.run_until_complete(
517 write_back=write_back,
528 async def schedule_formatting(
531 write_back: WriteBack,
534 loop: asyncio.AbstractEventLoop,
537 """Run formatting of `sources` in parallel using the provided `executor`.
539 (Use ProcessPoolExecutors for actual parallelism.)
541 `write_back`, `fast`, and `mode` options are passed to
542 :func:`format_file_in_place`.
545 if write_back != WriteBack.DIFF:
546 cache = read_cache(mode)
547 sources, cached = filter_cached(cache, sources)
548 for src in sorted(cached):
549 report.done(src, Changed.CACHED)
554 sources_to_cache = []
556 if write_back == WriteBack.DIFF:
557 # For diff output, we need locks to ensure we don't interleave output
558 # from different processes.
560 lock = manager.Lock()
562 asyncio.ensure_future(
563 loop.run_in_executor(
564 executor, format_file_in_place, src, fast, mode, write_back, lock
567 for src in sorted(sources)
569 pending: Iterable[asyncio.Future] = tasks.keys()
571 loop.add_signal_handler(signal.SIGINT, cancel, pending)
572 loop.add_signal_handler(signal.SIGTERM, cancel, pending)
573 except NotImplementedError:
574 # There are no good alternatives for these on Windows.
577 done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
579 src = tasks.pop(task)
581 cancelled.append(task)
582 elif task.exception():
583 report.failed(src, str(task.exception()))
585 changed = Changed.YES if task.result() else Changed.NO
586 # If the file was written back or was successfully checked as
587 # well-formatted, store this information in the cache.
588 if write_back is WriteBack.YES or (
589 write_back is WriteBack.CHECK and changed is Changed.NO
591 sources_to_cache.append(src)
592 report.done(src, changed)
594 await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
596 write_cache(cache, sources_to_cache, mode)
599 def format_file_in_place(
603 write_back: WriteBack = WriteBack.NO,
604 lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
606 """Format file under `src` path. Return True if changed.
608 If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted
610 `mode` and `fast` options are passed to :func:`format_file_contents`.
612 if src.suffix == ".pyi":
613 mode = evolve(mode, is_pyi=True)
615 then = datetime.utcfromtimestamp(src.stat().st_mtime)
616 with open(src, "rb") as buf:
617 src_contents, encoding, newline = decode_bytes(buf.read())
619 dst_contents = format_file_contents(src_contents, fast=fast, mode=mode)
620 except NothingChanged:
623 if write_back == write_back.YES:
624 with open(src, "w", encoding=encoding, newline=newline) as f:
625 f.write(dst_contents)
626 elif write_back == write_back.DIFF:
627 now = datetime.utcnow()
628 src_name = f"{src}\t{then} +0000"
629 dst_name = f"{src}\t{now} +0000"
630 diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
634 f = io.TextIOWrapper(
640 f.write(diff_contents)
648 def format_stdin_to_stdout(
649 fast: bool, *, write_back: WriteBack = WriteBack.NO, mode: FileMode
651 """Format file on stdin. Return True if changed.
653 If `write_back` is YES, write reformatted code back to stdout. If it is DIFF,
654 write a diff to stdout. The `mode` argument is passed to
655 :func:`format_file_contents`.
657 then = datetime.utcnow()
658 src, encoding, newline = decode_bytes(sys.stdin.buffer.read())
661 dst = format_file_contents(src, fast=fast, mode=mode)
664 except NothingChanged:
668 f = io.TextIOWrapper(
669 sys.stdout.buffer, encoding=encoding, newline=newline, write_through=True
671 if write_back == WriteBack.YES:
673 elif write_back == WriteBack.DIFF:
674 now = datetime.utcnow()
675 src_name = f"STDIN\t{then} +0000"
676 dst_name = f"STDOUT\t{now} +0000"
677 f.write(diff(src, dst, src_name, dst_name))
681 def format_file_contents(
682 src_contents: str, *, fast: bool, mode: FileMode
684 """Reformat contents a file and return new contents.
686 If `fast` is False, additionally confirm that the reformatted code is
687 valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
688 `mode` is passed to :func:`format_str`.
690 if src_contents.strip() == "":
693 dst_contents = format_str(src_contents, mode=mode)
694 if src_contents == dst_contents:
698 assert_equivalent(src_contents, dst_contents)
699 assert_stable(src_contents, dst_contents, mode=mode)
703 def format_str(src_contents: str, *, mode: FileMode) -> FileContent:
704 """Reformat a string and return new contents.
706 `mode` determines formatting options, such as how many characters per line are
709 src_node = lib2to3_parse(src_contents.lstrip(), mode.target_versions)
711 future_imports = get_future_imports(src_node)
712 if mode.target_versions:
713 versions = mode.target_versions
715 versions = detect_target_versions(src_node)
716 normalize_fmt_off(src_node)
717 lines = LineGenerator(
718 remove_u_prefix="unicode_literals" in future_imports
719 or supports_feature(versions, Feature.UNICODE_LITERALS),
721 normalize_strings=mode.string_normalization,
723 elt = EmptyLineTracker(is_pyi=mode.is_pyi)
726 split_line_features = {
728 for feature in {Feature.TRAILING_COMMA_IN_CALL, Feature.TRAILING_COMMA_IN_DEF}
729 if supports_feature(versions, feature)
731 for current_line in lines.visit(src_node):
732 for _ in range(after):
733 dst_contents.append(str(empty_line))
734 before, after = elt.maybe_empty_lines(current_line)
735 for _ in range(before):
736 dst_contents.append(str(empty_line))
737 for line in split_line(
738 current_line, line_length=mode.line_length, features=split_line_features
740 dst_contents.append(str(line))
741 return "".join(dst_contents)
744 def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine]:
745 """Return a tuple of (decoded_contents, encoding, newline).
747 `newline` is either CRLF or LF but `decoded_contents` is decoded with
748 universal newlines (i.e. only contains LF).
750 srcbuf = io.BytesIO(src)
751 encoding, lines = tokenize.detect_encoding(srcbuf.readline)
753 return "", encoding, "\n"
755 newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
757 with io.TextIOWrapper(srcbuf, encoding) as tiow:
758 return tiow.read(), encoding, newline
761 def get_grammars(target_versions: Set[TargetVersion]) -> List[Grammar]:
762 if not target_versions:
763 # No target_version specified, so try all grammars.
766 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords,
768 pygram.python_grammar_no_print_statement_no_exec_statement,
769 # Python 2.7 with future print_function import
770 pygram.python_grammar_no_print_statement,
772 pygram.python_grammar,
774 elif all(version.is_python2() for version in target_versions):
775 # Python 2-only code, so try Python 2 grammars.
777 # Python 2.7 with future print_function import
778 pygram.python_grammar_no_print_statement,
780 pygram.python_grammar,
783 # Python 3-compatible code, so only try Python 3 grammar.
785 # If we have to parse both, try to parse async as a keyword first
786 if not supports_feature(target_versions, Feature.ASYNC_IDENTIFIERS):
789 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords # noqa: B950
791 if not supports_feature(target_versions, Feature.ASYNC_KEYWORDS):
793 grammars.append(pygram.python_grammar_no_print_statement_no_exec_statement)
794 # At least one of the above branches must have been taken, because every Python
795 # version has exactly one of the two 'ASYNC_*' flags
799 def lib2to3_parse(src_txt: str, target_versions: Iterable[TargetVersion] = ()) -> Node:
800 """Given a string with source, return the lib2to3 Node."""
801 if src_txt[-1:] != "\n":
804 for grammar in get_grammars(set(target_versions)):
805 drv = driver.Driver(grammar, pytree.convert)
807 result = drv.parse_string(src_txt, True)
810 except ParseError as pe:
811 lineno, column = pe.context[1]
812 lines = src_txt.splitlines()
814 faulty_line = lines[lineno - 1]
816 faulty_line = "<line number missing in source>"
817 exc = InvalidInput(f"Cannot parse: {lineno}:{column}: {faulty_line}")
821 if isinstance(result, Leaf):
822 result = Node(syms.file_input, [result])
826 def lib2to3_unparse(node: Node) -> str:
827 """Given a lib2to3 node, return its string representation."""
835 class Visitor(Generic[T]):
836 """Basic lib2to3 visitor that yields things of type `T` on `visit()`."""
838 def visit(self, node: LN) -> Iterator[T]:
839 """Main method to visit `node` and its children.
841 It tries to find a `visit_*()` method for the given `node.type`, like
842 `visit_simple_stmt` for Node objects or `visit_INDENT` for Leaf objects.
843 If no dedicated `visit_*()` method is found, chooses `visit_default()`
846 Then yields objects of type `T` from the selected visitor.
849 name = token.tok_name[node.type]
851 name = type_repr(node.type)
852 yield from getattr(self, f"visit_{name}", self.visit_default)(node)
854 def visit_default(self, node: LN) -> Iterator[T]:
855 """Default `visit_*()` implementation. Recurses to children of `node`."""
856 if isinstance(node, Node):
857 for child in node.children:
858 yield from self.visit(child)
862 class DebugVisitor(Visitor[T]):
865 def visit_default(self, node: LN) -> Iterator[T]:
866 indent = " " * (2 * self.tree_depth)
867 if isinstance(node, Node):
868 _type = type_repr(node.type)
869 out(f"{indent}{_type}", fg="yellow")
871 for child in node.children:
872 yield from self.visit(child)
875 out(f"{indent}/{_type}", fg="yellow", bold=False)
877 _type = token.tok_name.get(node.type, str(node.type))
878 out(f"{indent}{_type}", fg="blue", nl=False)
880 # We don't have to handle prefixes for `Node` objects since
881 # that delegates to the first child anyway.
882 out(f" {node.prefix!r}", fg="green", bold=False, nl=False)
883 out(f" {node.value!r}", fg="blue", bold=False)
886 def show(cls, code: Union[str, Leaf, Node]) -> None:
887 """Pretty-print the lib2to3 AST of a given string of `code`.
889 Convenience method for debugging.
891 v: DebugVisitor[None] = DebugVisitor()
892 if isinstance(code, str):
893 code = lib2to3_parse(code)
897 WHITESPACE = {token.DEDENT, token.INDENT, token.NEWLINE}
908 STANDALONE_COMMENT = 153
909 token.tok_name[STANDALONE_COMMENT] = "STANDALONE_COMMENT"
910 LOGIC_OPERATORS = {"and", "or"}
935 STARS = {token.STAR, token.DOUBLESTAR}
938 syms.argument, # double star in arglist
939 syms.trailer, # single argument to call
941 syms.varargslist, # lambdas
943 UNPACKING_PARENTS = {
944 syms.atom, # single element of a list or set literal
948 syms.testlist_star_expr,
983 COMPREHENSION_PRIORITY = 20
985 TERNARY_PRIORITY = 16
988 COMPARATOR_PRIORITY = 10
999 token.DOUBLESLASH: 4,
1003 token.DOUBLESTAR: 2,
1009 class BracketTracker:
1010 """Keeps track of brackets on a line."""
1013 bracket_match: Dict[Tuple[Depth, NodeType], Leaf] = Factory(dict)
1014 delimiters: Dict[LeafID, Priority] = Factory(dict)
1015 previous: Optional[Leaf] = None
1016 _for_loop_depths: List[int] = Factory(list)
1017 _lambda_argument_depths: List[int] = Factory(list)
1019 def mark(self, leaf: Leaf) -> None:
1020 """Mark `leaf` with bracket-related metadata. Keep track of delimiters.
1022 All leaves receive an int `bracket_depth` field that stores how deep
1023 within brackets a given leaf is. 0 means there are no enclosing brackets
1024 that started on this line.
1026 If a leaf is itself a closing bracket, it receives an `opening_bracket`
1027 field that it forms a pair with. This is a one-directional link to
1028 avoid reference cycles.
1030 If a leaf is a delimiter (a token on which Black can split the line if
1031 needed) and it's on depth 0, its `id()` is stored in the tracker's
1034 if leaf.type == token.COMMENT:
1037 self.maybe_decrement_after_for_loop_variable(leaf)
1038 self.maybe_decrement_after_lambda_arguments(leaf)
1039 if leaf.type in CLOSING_BRACKETS:
1041 opening_bracket = self.bracket_match.pop((self.depth, leaf.type))
1042 leaf.opening_bracket = opening_bracket
1043 leaf.bracket_depth = self.depth
1045 delim = is_split_before_delimiter(leaf, self.previous)
1046 if delim and self.previous is not None:
1047 self.delimiters[id(self.previous)] = delim
1049 delim = is_split_after_delimiter(leaf, self.previous)
1051 self.delimiters[id(leaf)] = delim
1052 if leaf.type in OPENING_BRACKETS:
1053 self.bracket_match[self.depth, BRACKET[leaf.type]] = leaf
1055 self.previous = leaf
1056 self.maybe_increment_lambda_arguments(leaf)
1057 self.maybe_increment_for_loop_variable(leaf)
1059 def any_open_brackets(self) -> bool:
1060 """Return True if there is an yet unmatched open bracket on the line."""
1061 return bool(self.bracket_match)
1063 def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> Priority:
1064 """Return the highest priority of a delimiter found on the line.
1066 Values are consistent with what `is_split_*_delimiter()` return.
1067 Raises ValueError on no delimiters.
1069 return max(v for k, v in self.delimiters.items() if k not in exclude)
1071 def delimiter_count_with_priority(self, priority: Priority = 0) -> int:
1072 """Return the number of delimiters with the given `priority`.
1074 If no `priority` is passed, defaults to max priority on the line.
1076 if not self.delimiters:
1079 priority = priority or self.max_delimiter_priority()
1080 return sum(1 for p in self.delimiters.values() if p == priority)
1082 def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
1083 """In a for loop, or comprehension, the variables are often unpacks.
1085 To avoid splitting on the comma in this situation, increase the depth of
1086 tokens between `for` and `in`.
1088 if leaf.type == token.NAME and leaf.value == "for":
1090 self._for_loop_depths.append(self.depth)
1095 def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
1096 """See `maybe_increment_for_loop_variable` above for explanation."""
1098 self._for_loop_depths
1099 and self._for_loop_depths[-1] == self.depth
1100 and leaf.type == token.NAME
1101 and leaf.value == "in"
1104 self._for_loop_depths.pop()
1109 def maybe_increment_lambda_arguments(self, leaf: Leaf) -> bool:
1110 """In a lambda expression, there might be more than one argument.
1112 To avoid splitting on the comma in this situation, increase the depth of
1113 tokens between `lambda` and `:`.
1115 if leaf.type == token.NAME and leaf.value == "lambda":
1117 self._lambda_argument_depths.append(self.depth)
1122 def maybe_decrement_after_lambda_arguments(self, leaf: Leaf) -> bool:
1123 """See `maybe_increment_lambda_arguments` above for explanation."""
1125 self._lambda_argument_depths
1126 and self._lambda_argument_depths[-1] == self.depth
1127 and leaf.type == token.COLON
1130 self._lambda_argument_depths.pop()
1135 def get_open_lsqb(self) -> Optional[Leaf]:
1136 """Return the most recent opening square bracket (if any)."""
1137 return self.bracket_match.get((self.depth - 1, token.RSQB))
1142 """Holds leaves and comments. Can be printed with `str(line)`."""
1145 leaves: List[Leaf] = Factory(list)
1146 comments: Dict[LeafID, List[Leaf]] = Factory(dict) # keys ordered like `leaves`
1147 bracket_tracker: BracketTracker = Factory(BracketTracker)
1148 inside_brackets: bool = False
1149 should_explode: bool = False
1151 def append(self, leaf: Leaf, preformatted: bool = False) -> None:
1152 """Add a new `leaf` to the end of the line.
1154 Unless `preformatted` is True, the `leaf` will receive a new consistent
1155 whitespace prefix and metadata applied by :class:`BracketTracker`.
1156 Trailing commas are maybe removed, unpacked for loop variables are
1157 demoted from being delimiters.
1159 Inline comments are put aside.
1161 has_value = leaf.type in BRACKETS or bool(leaf.value.strip())
1165 if token.COLON == leaf.type and self.is_class_paren_empty:
1166 del self.leaves[-2:]
1167 if self.leaves and not preformatted:
1168 # Note: at this point leaf.prefix should be empty except for
1169 # imports, for which we only preserve newlines.
1170 leaf.prefix += whitespace(
1171 leaf, complex_subscript=self.is_complex_subscript(leaf)
1173 if self.inside_brackets or not preformatted:
1174 self.bracket_tracker.mark(leaf)
1175 self.maybe_remove_trailing_comma(leaf)
1176 if not self.append_comment(leaf):
1177 self.leaves.append(leaf)
1179 def append_safe(self, leaf: Leaf, preformatted: bool = False) -> None:
1180 """Like :func:`append()` but disallow invalid standalone comment structure.
1182 Raises ValueError when any `leaf` is appended after a standalone comment
1183 or when a standalone comment is not the first leaf on the line.
1185 if self.bracket_tracker.depth == 0:
1187 raise ValueError("cannot append to standalone comments")
1189 if self.leaves and leaf.type == STANDALONE_COMMENT:
1191 "cannot append standalone comments to a populated line"
1194 self.append(leaf, preformatted=preformatted)
1197 def is_comment(self) -> bool:
1198 """Is this line a standalone comment?"""
1199 return len(self.leaves) == 1 and self.leaves[0].type == STANDALONE_COMMENT
1202 def is_decorator(self) -> bool:
1203 """Is this line a decorator?"""
1204 return bool(self) and self.leaves[0].type == token.AT
1207 def is_import(self) -> bool:
1208 """Is this an import line?"""
1209 return bool(self) and is_import(self.leaves[0])
1212 def is_class(self) -> bool:
1213 """Is this line a class definition?"""
1216 and self.leaves[0].type == token.NAME
1217 and self.leaves[0].value == "class"
1221 def is_stub_class(self) -> bool:
1222 """Is this line a class definition with a body consisting only of "..."?"""
1223 return self.is_class and self.leaves[-3:] == [
1224 Leaf(token.DOT, ".") for _ in range(3)
1228 def is_def(self) -> bool:
1229 """Is this a function definition? (Also returns True for async defs.)"""
1231 first_leaf = self.leaves[0]
1236 second_leaf: Optional[Leaf] = self.leaves[1]
1239 return (first_leaf.type == token.NAME and first_leaf.value == "def") or (
1240 first_leaf.type == token.ASYNC
1241 and second_leaf is not None
1242 and second_leaf.type == token.NAME
1243 and second_leaf.value == "def"
1247 def is_class_paren_empty(self) -> bool:
1248 """Is this a class with no base classes but using parentheses?
1250 Those are unnecessary and should be removed.
1254 and len(self.leaves) == 4
1256 and self.leaves[2].type == token.LPAR
1257 and self.leaves[2].value == "("
1258 and self.leaves[3].type == token.RPAR
1259 and self.leaves[3].value == ")"
1263 def is_triple_quoted_string(self) -> bool:
1264 """Is the line a triple quoted string?"""
1267 and self.leaves[0].type == token.STRING
1268 and self.leaves[0].value.startswith(('"""', "'''"))
1271 def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
1272 """If so, needs to be split before emitting."""
1273 for leaf in self.leaves:
1274 if leaf.type == STANDALONE_COMMENT:
1275 if leaf.bracket_depth <= depth_limit:
1279 def contains_inner_type_comments(self) -> bool:
1282 last_leaf = self.leaves[-1]
1283 ignored_ids.add(id(last_leaf))
1284 if last_leaf.type == token.COMMA or (
1285 last_leaf.type == token.RPAR and not last_leaf.value
1287 # When trailing commas or optional parens are inserted by Black for
1288 # consistency, comments after the previous last element are not moved
1289 # (they don't have to, rendering will still be correct). So we ignore
1290 # trailing commas and invisible.
1291 last_leaf = self.leaves[-2]
1292 ignored_ids.add(id(last_leaf))
1296 for leaf_id, comments in self.comments.items():
1297 if leaf_id in ignored_ids:
1300 for comment in comments:
1301 if is_type_comment(comment):
1306 def contains_multiline_strings(self) -> bool:
1307 for leaf in self.leaves:
1308 if is_multiline_string(leaf):
1313 def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
1314 """Remove trailing comma if there is one and it's safe."""
1317 and self.leaves[-1].type == token.COMMA
1318 and closing.type in CLOSING_BRACKETS
1322 if closing.type == token.RBRACE:
1323 self.remove_trailing_comma()
1326 if closing.type == token.RSQB:
1327 comma = self.leaves[-1]
1328 if comma.parent and comma.parent.type == syms.listmaker:
1329 self.remove_trailing_comma()
1332 # For parens let's check if it's safe to remove the comma.
1333 # Imports are always safe.
1335 self.remove_trailing_comma()
1338 # Otherwise, if the trailing one is the only one, we might mistakenly
1339 # change a tuple into a different type by removing the comma.
1340 depth = closing.bracket_depth + 1
1342 opening = closing.opening_bracket
1343 for _opening_index, leaf in enumerate(self.leaves):
1350 for leaf in self.leaves[_opening_index + 1 :]:
1354 bracket_depth = leaf.bracket_depth
1355 if bracket_depth == depth and leaf.type == token.COMMA:
1357 if leaf.parent and leaf.parent.type in {
1365 self.remove_trailing_comma()
1370 def append_comment(self, comment: Leaf) -> bool:
1371 """Add an inline or standalone comment to the line."""
1373 comment.type == STANDALONE_COMMENT
1374 and self.bracket_tracker.any_open_brackets()
1379 if comment.type != token.COMMENT:
1383 comment.type = STANDALONE_COMMENT
1387 last_leaf = self.leaves[-1]
1389 last_leaf.type == token.RPAR
1390 and not last_leaf.value
1391 and last_leaf.parent
1392 and len(list(last_leaf.parent.leaves())) <= 3
1393 and not is_type_comment(comment)
1395 # Comments on an optional parens wrapping a single leaf should belong to
1396 # the wrapped node except if it's a type comment. Pinning the comment like
1397 # this avoids unstable formatting caused by comment migration.
1398 if len(self.leaves) < 2:
1399 comment.type = STANDALONE_COMMENT
1402 last_leaf = self.leaves[-2]
1403 self.comments.setdefault(id(last_leaf), []).append(comment)
1406 def comments_after(self, leaf: Leaf) -> List[Leaf]:
1407 """Generate comments that should appear directly after `leaf`."""
1408 return self.comments.get(id(leaf), [])
1410 def remove_trailing_comma(self) -> None:
1411 """Remove the trailing comma and moves the comments attached to it."""
1412 trailing_comma = self.leaves.pop()
1413 trailing_comma_comments = self.comments.pop(id(trailing_comma), [])
1414 self.comments.setdefault(id(self.leaves[-1]), []).extend(
1415 trailing_comma_comments
1418 def is_complex_subscript(self, leaf: Leaf) -> bool:
1419 """Return True iff `leaf` is part of a slice with non-trivial exprs."""
1420 open_lsqb = self.bracket_tracker.get_open_lsqb()
1421 if open_lsqb is None:
1424 subscript_start = open_lsqb.next_sibling
1426 if isinstance(subscript_start, Node):
1427 if subscript_start.type == syms.listmaker:
1430 if subscript_start.type == syms.subscriptlist:
1431 subscript_start = child_towards(subscript_start, leaf)
1432 return subscript_start is not None and any(
1433 n.type in TEST_DESCENDANTS for n in subscript_start.pre_order()
1436 def __str__(self) -> str:
1437 """Render the line."""
1441 indent = " " * self.depth
1442 leaves = iter(self.leaves)
1443 first = next(leaves)
1444 res = f"{first.prefix}{indent}{first.value}"
1447 for comment in itertools.chain.from_iterable(self.comments.values()):
1451 def __bool__(self) -> bool:
1452 """Return True if the line has leaves or comments."""
1453 return bool(self.leaves or self.comments)
1457 class EmptyLineTracker:
1458 """Provides a stateful method that returns the number of potential extra
1459 empty lines needed before and after the currently processed line.
1461 Note: this tracker works on lines that haven't been split yet. It assumes
1462 the prefix of the first leaf consists of optional newlines. Those newlines
1463 are consumed by `maybe_empty_lines()` and included in the computation.
1466 is_pyi: bool = False
1467 previous_line: Optional[Line] = None
1468 previous_after: int = 0
1469 previous_defs: List[int] = Factory(list)
1471 def maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1472 """Return the number of extra empty lines before and after the `current_line`.
1474 This is for separating `def`, `async def` and `class` with extra empty
1475 lines (two on module-level).
1477 before, after = self._maybe_empty_lines(current_line)
1478 before -= self.previous_after
1479 self.previous_after = after
1480 self.previous_line = current_line
1481 return before, after
1483 def _maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1485 if current_line.depth == 0:
1486 max_allowed = 1 if self.is_pyi else 2
1487 if current_line.leaves:
1488 # Consume the first leaf's extra newlines.
1489 first_leaf = current_line.leaves[0]
1490 before = first_leaf.prefix.count("\n")
1491 before = min(before, max_allowed)
1492 first_leaf.prefix = ""
1495 depth = current_line.depth
1496 while self.previous_defs and self.previous_defs[-1] >= depth:
1497 self.previous_defs.pop()
1499 before = 0 if depth else 1
1501 before = 1 if depth else 2
1502 if current_line.is_decorator or current_line.is_def or current_line.is_class:
1503 return self._maybe_empty_lines_for_class_or_def(current_line, before)
1507 and self.previous_line.is_import
1508 and not current_line.is_import
1509 and depth == self.previous_line.depth
1511 return (before or 1), 0
1515 and self.previous_line.is_class
1516 and current_line.is_triple_quoted_string
1522 def _maybe_empty_lines_for_class_or_def(
1523 self, current_line: Line, before: int
1524 ) -> Tuple[int, int]:
1525 if not current_line.is_decorator:
1526 self.previous_defs.append(current_line.depth)
1527 if self.previous_line is None:
1528 # Don't insert empty lines before the first line in the file.
1531 if self.previous_line.is_decorator:
1534 if self.previous_line.depth < current_line.depth and (
1535 self.previous_line.is_class or self.previous_line.is_def
1540 self.previous_line.is_comment
1541 and self.previous_line.depth == current_line.depth
1547 if self.previous_line.depth > current_line.depth:
1549 elif current_line.is_class or self.previous_line.is_class:
1550 if current_line.is_stub_class and self.previous_line.is_stub_class:
1551 # No blank line between classes with an empty body
1555 elif current_line.is_def and not self.previous_line.is_def:
1556 # Blank line between a block of functions and a block of non-functions
1562 if current_line.depth and newlines:
1568 class LineGenerator(Visitor[Line]):
1569 """Generates reformatted Line objects. Empty lines are not emitted.
1571 Note: destroys the tree it's visiting by mutating prefixes of its leaves
1572 in ways that will no longer stringify to valid Python code on the tree.
1575 is_pyi: bool = False
1576 normalize_strings: bool = True
1577 current_line: Line = Factory(Line)
1578 remove_u_prefix: bool = False
1580 def line(self, indent: int = 0) -> Iterator[Line]:
1583 If the line is empty, only emit if it makes sense.
1584 If the line is too long, split it first and then generate.
1586 If any lines were generated, set up a new current_line.
1588 if not self.current_line:
1589 self.current_line.depth += indent
1590 return # Line is empty, don't emit. Creating a new one unnecessary.
1592 complete_line = self.current_line
1593 self.current_line = Line(depth=complete_line.depth + indent)
1596 def visit_default(self, node: LN) -> Iterator[Line]:
1597 """Default `visit_*()` implementation. Recurses to children of `node`."""
1598 if isinstance(node, Leaf):
1599 any_open_brackets = self.current_line.bracket_tracker.any_open_brackets()
1600 for comment in generate_comments(node):
1601 if any_open_brackets:
1602 # any comment within brackets is subject to splitting
1603 self.current_line.append(comment)
1604 elif comment.type == token.COMMENT:
1605 # regular trailing comment
1606 self.current_line.append(comment)
1607 yield from self.line()
1610 # regular standalone comment
1611 yield from self.line()
1613 self.current_line.append(comment)
1614 yield from self.line()
1616 normalize_prefix(node, inside_brackets=any_open_brackets)
1617 if self.normalize_strings and node.type == token.STRING:
1618 normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
1619 normalize_string_quotes(node)
1620 if node.type == token.NUMBER:
1621 normalize_numeric_literal(node)
1622 if node.type not in WHITESPACE:
1623 self.current_line.append(node)
1624 yield from super().visit_default(node)
1626 def visit_atom(self, node: Node) -> Iterator[Line]:
1627 # Always make parentheses invisible around a single node, because it should
1628 # not be needed (except in the case of yield, where removing the parentheses
1629 # produces a SyntaxError).
1631 len(node.children) == 3
1632 and isinstance(node.children[0], Leaf)
1633 and node.children[0].type == token.LPAR
1634 and isinstance(node.children[2], Leaf)
1635 and node.children[2].type == token.RPAR
1636 and isinstance(node.children[1], Leaf)
1638 node.children[1].type == token.NAME
1639 and node.children[1].value == "yield"
1642 node.children[0].value = ""
1643 node.children[2].value = ""
1644 yield from super().visit_default(node)
1646 def visit_factor(self, node: Node) -> Iterator[Line]:
1647 """Force parentheses between a unary op and a binary power:
1649 -2 ** 8 -> -(2 ** 8)
1651 child = node.children[1]
1652 if child.type == syms.power and len(child.children) == 3:
1653 lpar = Leaf(token.LPAR, "(")
1654 rpar = Leaf(token.RPAR, ")")
1655 index = child.remove() or 0
1656 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
1657 yield from self.visit_default(node)
1659 def visit_INDENT(self, node: Node) -> Iterator[Line]:
1660 """Increase indentation level, maybe yield a line."""
1661 # In blib2to3 INDENT never holds comments.
1662 yield from self.line(+1)
1663 yield from self.visit_default(node)
1665 def visit_DEDENT(self, node: Node) -> Iterator[Line]:
1666 """Decrease indentation level, maybe yield a line."""
1667 # The current line might still wait for trailing comments. At DEDENT time
1668 # there won't be any (they would be prefixes on the preceding NEWLINE).
1669 # Emit the line then.
1670 yield from self.line()
1672 # While DEDENT has no value, its prefix may contain standalone comments
1673 # that belong to the current indentation level. Get 'em.
1674 yield from self.visit_default(node)
1676 # Finally, emit the dedent.
1677 yield from self.line(-1)
1680 self, node: Node, keywords: Set[str], parens: Set[str]
1681 ) -> Iterator[Line]:
1682 """Visit a statement.
1684 This implementation is shared for `if`, `while`, `for`, `try`, `except`,
1685 `def`, `with`, `class`, `assert` and assignments.
1687 The relevant Python language `keywords` for a given statement will be
1688 NAME leaves within it. This methods puts those on a separate line.
1690 `parens` holds a set of string leaf values immediately after which
1691 invisible parens should be put.
1693 normalize_invisible_parens(node, parens_after=parens)
1694 for child in node.children:
1695 if child.type == token.NAME and child.value in keywords: # type: ignore
1696 yield from self.line()
1698 yield from self.visit(child)
1700 def visit_suite(self, node: Node) -> Iterator[Line]:
1701 """Visit a suite."""
1702 if self.is_pyi and is_stub_suite(node):
1703 yield from self.visit(node.children[2])
1705 yield from self.visit_default(node)
1707 def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
1708 """Visit a statement without nested statements."""
1709 is_suite_like = node.parent and node.parent.type in STATEMENT
1711 if self.is_pyi and is_stub_body(node):
1712 yield from self.visit_default(node)
1714 yield from self.line(+1)
1715 yield from self.visit_default(node)
1716 yield from self.line(-1)
1719 if not self.is_pyi or not node.parent or not is_stub_suite(node.parent):
1720 yield from self.line()
1721 yield from self.visit_default(node)
1723 def visit_async_stmt(self, node: Node) -> Iterator[Line]:
1724 """Visit `async def`, `async for`, `async with`."""
1725 yield from self.line()
1727 children = iter(node.children)
1728 for child in children:
1729 yield from self.visit(child)
1731 if child.type == token.ASYNC:
1734 internal_stmt = next(children)
1735 for child in internal_stmt.children:
1736 yield from self.visit(child)
1738 def visit_decorators(self, node: Node) -> Iterator[Line]:
1739 """Visit decorators."""
1740 for child in node.children:
1741 yield from self.line()
1742 yield from self.visit(child)
1744 def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
1745 """Remove a semicolon and put the other statement on a separate line."""
1746 yield from self.line()
1748 def visit_ENDMARKER(self, leaf: Leaf) -> Iterator[Line]:
1749 """End of file. Process outstanding comments and end with a newline."""
1750 yield from self.visit_default(leaf)
1751 yield from self.line()
1753 def visit_STANDALONE_COMMENT(self, leaf: Leaf) -> Iterator[Line]:
1754 if not self.current_line.bracket_tracker.any_open_brackets():
1755 yield from self.line()
1756 yield from self.visit_default(leaf)
1758 def __attrs_post_init__(self) -> None:
1759 """You are in a twisty little maze of passages."""
1762 self.visit_assert_stmt = partial(v, keywords={"assert"}, parens={"assert", ","})
1763 self.visit_if_stmt = partial(
1764 v, keywords={"if", "else", "elif"}, parens={"if", "elif"}
1766 self.visit_while_stmt = partial(v, keywords={"while", "else"}, parens={"while"})
1767 self.visit_for_stmt = partial(v, keywords={"for", "else"}, parens={"for", "in"})
1768 self.visit_try_stmt = partial(
1769 v, keywords={"try", "except", "else", "finally"}, parens=Ø
1771 self.visit_except_clause = partial(v, keywords={"except"}, parens=Ø)
1772 self.visit_with_stmt = partial(v, keywords={"with"}, parens=Ø)
1773 self.visit_funcdef = partial(v, keywords={"def"}, parens=Ø)
1774 self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
1775 self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
1776 self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
1777 self.visit_import_from = partial(v, keywords=Ø, parens={"import"})
1778 self.visit_del_stmt = partial(v, keywords=Ø, parens={"del"})
1779 self.visit_async_funcdef = self.visit_async_stmt
1780 self.visit_decorated = self.visit_decorators
1783 IMPLICIT_TUPLE = {syms.testlist, syms.testlist_star_expr, syms.exprlist}
1784 BRACKET = {token.LPAR: token.RPAR, token.LSQB: token.RSQB, token.LBRACE: token.RBRACE}
1785 OPENING_BRACKETS = set(BRACKET.keys())
1786 CLOSING_BRACKETS = set(BRACKET.values())
1787 BRACKETS = OPENING_BRACKETS | CLOSING_BRACKETS
1788 ALWAYS_NO_SPACE = CLOSING_BRACKETS | {token.COMMA, STANDALONE_COMMENT}
1791 def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str: # noqa: C901
1792 """Return whitespace prefix if needed for the given `leaf`.
1794 `complex_subscript` signals whether the given leaf is part of a subscription
1795 which has non-trivial arguments, like arithmetic expressions or function calls.
1803 if t in ALWAYS_NO_SPACE:
1806 if t == token.COMMENT:
1809 assert p is not None, f"INTERNAL ERROR: hand-made leaf without parent: {leaf!r}"
1810 if t == token.COLON and p.type not in {
1817 prev = leaf.prev_sibling
1819 prevp = preceding_leaf(p)
1820 if not prevp or prevp.type in OPENING_BRACKETS:
1823 if t == token.COLON:
1824 if prevp.type == token.COLON:
1827 elif prevp.type != token.COMMA and not complex_subscript:
1832 if prevp.type == token.EQUAL:
1834 if prevp.parent.type in {
1842 elif prevp.parent.type == syms.typedargslist:
1843 # A bit hacky: if the equal sign has whitespace, it means we
1844 # previously found it's a typed argument. So, we're using
1848 elif prevp.type in STARS:
1849 if is_vararg(prevp, within=VARARGS_PARENTS | UNPACKING_PARENTS):
1852 elif prevp.type == token.COLON:
1853 if prevp.parent and prevp.parent.type in {syms.subscript, syms.sliceop}:
1854 return SPACE if complex_subscript else NO
1858 and prevp.parent.type == syms.factor
1859 and prevp.type in MATH_OPERATORS
1864 prevp.type == token.RIGHTSHIFT
1866 and prevp.parent.type == syms.shift_expr
1867 and prevp.prev_sibling
1868 and prevp.prev_sibling.type == token.NAME
1869 and prevp.prev_sibling.value == "print" # type: ignore
1871 # Python 2 print chevron
1874 elif prev.type in OPENING_BRACKETS:
1877 if p.type in {syms.parameters, syms.arglist}:
1878 # untyped function signatures or calls
1879 if not prev or prev.type != token.COMMA:
1882 elif p.type == syms.varargslist:
1884 if prev and prev.type != token.COMMA:
1887 elif p.type == syms.typedargslist:
1888 # typed function signatures
1892 if t == token.EQUAL:
1893 if prev.type != syms.tname:
1896 elif prev.type == token.EQUAL:
1897 # A bit hacky: if the equal sign has whitespace, it means we
1898 # previously found it's a typed argument. So, we're using that, too.
1901 elif prev.type != token.COMMA:
1904 elif p.type == syms.tname:
1907 prevp = preceding_leaf(p)
1908 if not prevp or prevp.type != token.COMMA:
1911 elif p.type == syms.trailer:
1912 # attributes and calls
1913 if t == token.LPAR or t == token.RPAR:
1918 prevp = preceding_leaf(p)
1919 if not prevp or prevp.type != token.NUMBER:
1922 elif t == token.LSQB:
1925 elif prev.type != token.COMMA:
1928 elif p.type == syms.argument:
1930 if t == token.EQUAL:
1934 prevp = preceding_leaf(p)
1935 if not prevp or prevp.type == token.LPAR:
1938 elif prev.type in {token.EQUAL} | STARS:
1941 elif p.type == syms.decorator:
1945 elif p.type == syms.dotted_name:
1949 prevp = preceding_leaf(p)
1950 if not prevp or prevp.type == token.AT or prevp.type == token.DOT:
1953 elif p.type == syms.classdef:
1957 if prev and prev.type == token.LPAR:
1960 elif p.type in {syms.subscript, syms.sliceop}:
1963 assert p.parent is not None, "subscripts are always parented"
1964 if p.parent.type == syms.subscriptlist:
1969 elif not complex_subscript:
1972 elif p.type == syms.atom:
1973 if prev and t == token.DOT:
1974 # dots, but not the first one.
1977 elif p.type == syms.dictsetmaker:
1979 if prev and prev.type == token.DOUBLESTAR:
1982 elif p.type in {syms.factor, syms.star_expr}:
1985 prevp = preceding_leaf(p)
1986 if not prevp or prevp.type in OPENING_BRACKETS:
1989 prevp_parent = prevp.parent
1990 assert prevp_parent is not None
1991 if prevp.type == token.COLON and prevp_parent.type in {
1997 elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
2000 elif t in {token.NAME, token.NUMBER, token.STRING}:
2003 elif p.type == syms.import_from:
2005 if prev and prev.type == token.DOT:
2008 elif t == token.NAME:
2012 if prev and prev.type == token.DOT:
2015 elif p.type == syms.sliceop:
2021 def preceding_leaf(node: Optional[LN]) -> Optional[Leaf]:
2022 """Return the first leaf that precedes `node`, if any."""
2024 res = node.prev_sibling
2026 if isinstance(res, Leaf):
2030 return list(res.leaves())[-1]
2039 def child_towards(ancestor: Node, descendant: LN) -> Optional[LN]:
2040 """Return the child of `ancestor` that contains `descendant`."""
2041 node: Optional[LN] = descendant
2042 while node and node.parent != ancestor:
2047 def container_of(leaf: Leaf) -> LN:
2048 """Return `leaf` or one of its ancestors that is the topmost container of it.
2050 By "container" we mean a node where `leaf` is the very first child.
2052 same_prefix = leaf.prefix
2053 container: LN = leaf
2055 parent = container.parent
2059 if parent.children[0].prefix != same_prefix:
2062 if parent.type == syms.file_input:
2065 if parent.prev_sibling is not None and parent.prev_sibling.type in BRACKETS:
2072 def is_split_after_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2073 """Return the priority of the `leaf` delimiter, given a line break after it.
2075 The delimiter priorities returned here are from those delimiters that would
2076 cause a line break after themselves.
2078 Higher numbers are higher priority.
2080 if leaf.type == token.COMMA:
2081 return COMMA_PRIORITY
2086 def is_split_before_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2087 """Return the priority of the `leaf` delimiter, given a line break before it.
2089 The delimiter priorities returned here are from those delimiters that would
2090 cause a line break before themselves.
2092 Higher numbers are higher priority.
2094 if is_vararg(leaf, within=VARARGS_PARENTS | UNPACKING_PARENTS):
2095 # * and ** might also be MATH_OPERATORS but in this case they are not.
2096 # Don't treat them as a delimiter.
2100 leaf.type == token.DOT
2102 and leaf.parent.type not in {syms.import_from, syms.dotted_name}
2103 and (previous is None or previous.type in CLOSING_BRACKETS)
2108 leaf.type in MATH_OPERATORS
2110 and leaf.parent.type not in {syms.factor, syms.star_expr}
2112 return MATH_PRIORITIES[leaf.type]
2114 if leaf.type in COMPARATORS:
2115 return COMPARATOR_PRIORITY
2118 leaf.type == token.STRING
2119 and previous is not None
2120 and previous.type == token.STRING
2122 return STRING_PRIORITY
2124 if leaf.type not in {token.NAME, token.ASYNC}:
2130 and leaf.parent.type in {syms.comp_for, syms.old_comp_for}
2131 or leaf.type == token.ASYNC
2134 not isinstance(leaf.prev_sibling, Leaf)
2135 or leaf.prev_sibling.value != "async"
2137 return COMPREHENSION_PRIORITY
2142 and leaf.parent.type in {syms.comp_if, syms.old_comp_if}
2144 return COMPREHENSION_PRIORITY
2146 if leaf.value in {"if", "else"} and leaf.parent and leaf.parent.type == syms.test:
2147 return TERNARY_PRIORITY
2149 if leaf.value == "is":
2150 return COMPARATOR_PRIORITY
2155 and leaf.parent.type in {syms.comp_op, syms.comparison}
2157 previous is not None
2158 and previous.type == token.NAME
2159 and previous.value == "not"
2162 return COMPARATOR_PRIORITY
2167 and leaf.parent.type == syms.comp_op
2169 previous is not None
2170 and previous.type == token.NAME
2171 and previous.value == "is"
2174 return COMPARATOR_PRIORITY
2176 if leaf.value in LOGIC_OPERATORS and leaf.parent:
2177 return LOGIC_PRIORITY
2182 FMT_OFF = {"# fmt: off", "# fmt:off", "# yapf: disable"}
2183 FMT_ON = {"# fmt: on", "# fmt:on", "# yapf: enable"}
2186 def generate_comments(leaf: LN) -> Iterator[Leaf]:
2187 """Clean the prefix of the `leaf` and generate comments from it, if any.
2189 Comments in lib2to3 are shoved into the whitespace prefix. This happens
2190 in `pgen2/driver.py:Driver.parse_tokens()`. This was a brilliant implementation
2191 move because it does away with modifying the grammar to include all the
2192 possible places in which comments can be placed.
2194 The sad consequence for us though is that comments don't "belong" anywhere.
2195 This is why this function generates simple parentless Leaf objects for
2196 comments. We simply don't know what the correct parent should be.
2198 No matter though, we can live without this. We really only need to
2199 differentiate between inline and standalone comments. The latter don't
2200 share the line with any code.
2202 Inline comments are emitted as regular token.COMMENT leaves. Standalone
2203 are emitted with a fake STANDALONE_COMMENT token identifier.
2205 for pc in list_comments(leaf.prefix, is_endmarker=leaf.type == token.ENDMARKER):
2206 yield Leaf(pc.type, pc.value, prefix="\n" * pc.newlines)
2211 """Describes a piece of syntax that is a comment.
2213 It's not a :class:`blib2to3.pytree.Leaf` so that:
2215 * it can be cached (`Leaf` objects should not be reused more than once as
2216 they store their lineno, column, prefix, and parent information);
2217 * `newlines` and `consumed` fields are kept separate from the `value`. This
2218 simplifies handling of special marker comments like ``# fmt: off/on``.
2221 type: int # token.COMMENT or STANDALONE_COMMENT
2222 value: str # content of the comment
2223 newlines: int # how many newlines before the comment
2224 consumed: int # how many characters of the original leaf's prefix did we consume
2227 @lru_cache(maxsize=4096)
2228 def list_comments(prefix: str, *, is_endmarker: bool) -> List[ProtoComment]:
2229 """Return a list of :class:`ProtoComment` objects parsed from the given `prefix`."""
2230 result: List[ProtoComment] = []
2231 if not prefix or "#" not in prefix:
2237 for index, line in enumerate(prefix.split("\n")):
2238 consumed += len(line) + 1 # adding the length of the split '\n'
2239 line = line.lstrip()
2242 if not line.startswith("#"):
2243 # Escaped newlines outside of a comment are not really newlines at
2244 # all. We treat a single-line comment following an escaped newline
2245 # as a simple trailing comment.
2246 if line.endswith("\\"):
2250 if index == ignored_lines and not is_endmarker:
2251 comment_type = token.COMMENT # simple trailing comment
2253 comment_type = STANDALONE_COMMENT
2254 comment = make_comment(line)
2257 type=comment_type, value=comment, newlines=nlines, consumed=consumed
2264 def make_comment(content: str) -> str:
2265 """Return a consistently formatted comment from the given `content` string.
2267 All comments (except for "##", "#!", "#:", '#'", "#%%") should have a single
2268 space between the hash sign and the content.
2270 If `content` didn't start with a hash sign, one is provided.
2272 content = content.rstrip()
2276 if content[0] == "#":
2277 content = content[1:]
2278 if content and content[0] not in " !:#'%":
2279 content = " " + content
2280 return "#" + content
2286 inner: bool = False,
2287 features: Collection[Feature] = (),
2288 ) -> Iterator[Line]:
2289 """Split a `line` into potentially many lines.
2291 They should fit in the allotted `line_length` but might not be able to.
2292 `inner` signifies that there were a pair of brackets somewhere around the
2293 current `line`, possibly transitively. This means we can fallback to splitting
2294 by delimiters if the LHS/RHS don't yield any results.
2296 `features` are syntactical features that may be used in the output.
2302 line_str = str(line).strip("\n")
2305 not line.contains_inner_type_comments()
2306 and not line.should_explode
2307 and is_line_short_enough(line, line_length=line_length, line_str=line_str)
2312 split_funcs: List[SplitFunc]
2314 split_funcs = [left_hand_split]
2317 def rhs(line: Line, features: Collection[Feature]) -> Iterator[Line]:
2318 for omit in generate_trailers_to_omit(line, line_length):
2319 lines = list(right_hand_split(line, line_length, features, omit=omit))
2320 if is_line_short_enough(lines[0], line_length=line_length):
2324 # All splits failed, best effort split with no omits.
2325 # This mostly happens to multiline strings that are by definition
2326 # reported as not fitting a single line.
2327 yield from right_hand_split(line, line_length, features=features)
2329 if line.inside_brackets:
2330 split_funcs = [delimiter_split, standalone_comment_split, rhs]
2333 for split_func in split_funcs:
2334 # We are accumulating lines in `result` because we might want to abort
2335 # mission and return the original line in the end, or attempt a different
2337 result: List[Line] = []
2339 for l in split_func(line, features):
2340 if str(l).strip("\n") == line_str:
2341 raise CannotSplit("Split function returned an unchanged result")
2345 l, line_length=line_length, inner=True, features=features
2359 def left_hand_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2360 """Split line into many lines, starting with the first matching bracket pair.
2362 Note: this usually looks weird, only use this for function definitions.
2363 Prefer RHS otherwise. This is why this function is not symmetrical with
2364 :func:`right_hand_split` which also handles optional parentheses.
2366 tail_leaves: List[Leaf] = []
2367 body_leaves: List[Leaf] = []
2368 head_leaves: List[Leaf] = []
2369 current_leaves = head_leaves
2370 matching_bracket = None
2371 for leaf in line.leaves:
2373 current_leaves is body_leaves
2374 and leaf.type in CLOSING_BRACKETS
2375 and leaf.opening_bracket is matching_bracket
2377 current_leaves = tail_leaves if body_leaves else head_leaves
2378 current_leaves.append(leaf)
2379 if current_leaves is head_leaves:
2380 if leaf.type in OPENING_BRACKETS:
2381 matching_bracket = leaf
2382 current_leaves = body_leaves
2383 if not matching_bracket:
2384 raise CannotSplit("No brackets found")
2386 head = bracket_split_build_line(head_leaves, line, matching_bracket)
2387 body = bracket_split_build_line(body_leaves, line, matching_bracket, is_body=True)
2388 tail = bracket_split_build_line(tail_leaves, line, matching_bracket)
2389 bracket_split_succeeded_or_raise(head, body, tail)
2390 for result in (head, body, tail):
2395 def right_hand_split(
2398 features: Collection[Feature] = (),
2399 omit: Collection[LeafID] = (),
2400 ) -> Iterator[Line]:
2401 """Split line into many lines, starting with the last matching bracket pair.
2403 If the split was by optional parentheses, attempt splitting without them, too.
2404 `omit` is a collection of closing bracket IDs that shouldn't be considered for
2407 Note: running this function modifies `bracket_depth` on the leaves of `line`.
2409 tail_leaves: List[Leaf] = []
2410 body_leaves: List[Leaf] = []
2411 head_leaves: List[Leaf] = []
2412 current_leaves = tail_leaves
2413 opening_bracket = None
2414 closing_bracket = None
2415 for leaf in reversed(line.leaves):
2416 if current_leaves is body_leaves:
2417 if leaf is opening_bracket:
2418 current_leaves = head_leaves if body_leaves else tail_leaves
2419 current_leaves.append(leaf)
2420 if current_leaves is tail_leaves:
2421 if leaf.type in CLOSING_BRACKETS and id(leaf) not in omit:
2422 opening_bracket = leaf.opening_bracket
2423 closing_bracket = leaf
2424 current_leaves = body_leaves
2425 if not (opening_bracket and closing_bracket and head_leaves):
2426 # If there is no opening or closing_bracket that means the split failed and
2427 # all content is in the tail. Otherwise, if `head_leaves` are empty, it means
2428 # the matching `opening_bracket` wasn't available on `line` anymore.
2429 raise CannotSplit("No brackets found")
2431 tail_leaves.reverse()
2432 body_leaves.reverse()
2433 head_leaves.reverse()
2434 head = bracket_split_build_line(head_leaves, line, opening_bracket)
2435 body = bracket_split_build_line(body_leaves, line, opening_bracket, is_body=True)
2436 tail = bracket_split_build_line(tail_leaves, line, opening_bracket)
2437 bracket_split_succeeded_or_raise(head, body, tail)
2439 # the body shouldn't be exploded
2440 not body.should_explode
2441 # the opening bracket is an optional paren
2442 and opening_bracket.type == token.LPAR
2443 and not opening_bracket.value
2444 # the closing bracket is an optional paren
2445 and closing_bracket.type == token.RPAR
2446 and not closing_bracket.value
2447 # it's not an import (optional parens are the only thing we can split on
2448 # in this case; attempting a split without them is a waste of time)
2449 and not line.is_import
2450 # there are no standalone comments in the body
2451 and not body.contains_standalone_comments(0)
2452 # and we can actually remove the parens
2453 and can_omit_invisible_parens(body, line_length)
2455 omit = {id(closing_bracket), *omit}
2457 yield from right_hand_split(line, line_length, features=features, omit=omit)
2463 or is_line_short_enough(body, line_length=line_length)
2466 "Splitting failed, body is still too long and can't be split."
2469 elif head.contains_multiline_strings() or tail.contains_multiline_strings():
2471 "The current optional pair of parentheses is bound to fail to "
2472 "satisfy the splitting algorithm because the head or the tail "
2473 "contains multiline strings which by definition never fit one "
2477 ensure_visible(opening_bracket)
2478 ensure_visible(closing_bracket)
2479 for result in (head, body, tail):
2484 def bracket_split_succeeded_or_raise(head: Line, body: Line, tail: Line) -> None:
2485 """Raise :exc:`CannotSplit` if the last left- or right-hand split failed.
2487 Do nothing otherwise.
2489 A left- or right-hand split is based on a pair of brackets. Content before
2490 (and including) the opening bracket is left on one line, content inside the
2491 brackets is put on a separate line, and finally content starting with and
2492 following the closing bracket is put on a separate line.
2494 Those are called `head`, `body`, and `tail`, respectively. If the split
2495 produced the same line (all content in `head`) or ended up with an empty `body`
2496 and the `tail` is just the closing bracket, then it's considered failed.
2498 tail_len = len(str(tail).strip())
2501 raise CannotSplit("Splitting brackets produced the same line")
2505 f"Splitting brackets on an empty body to save "
2506 f"{tail_len} characters is not worth it"
2510 def bracket_split_build_line(
2511 leaves: List[Leaf], original: Line, opening_bracket: Leaf, *, is_body: bool = False
2513 """Return a new line with given `leaves` and respective comments from `original`.
2515 If `is_body` is True, the result line is one-indented inside brackets and as such
2516 has its first leaf's prefix normalized and a trailing comma added when expected.
2518 result = Line(depth=original.depth)
2520 result.inside_brackets = True
2523 # Since body is a new indent level, remove spurious leading whitespace.
2524 normalize_prefix(leaves[0], inside_brackets=True)
2525 # Ensure a trailing comma for imports and standalone function arguments, but
2526 # be careful not to add one after any comments.
2527 no_commas = original.is_def and not any(
2528 l.type == token.COMMA for l in leaves
2531 if original.is_import or no_commas:
2532 for i in range(len(leaves) - 1, -1, -1):
2533 if leaves[i].type == STANDALONE_COMMENT:
2535 elif leaves[i].type == token.COMMA:
2538 leaves.insert(i + 1, Leaf(token.COMMA, ","))
2542 result.append(leaf, preformatted=True)
2543 for comment_after in original.comments_after(leaf):
2544 result.append(comment_after, preformatted=True)
2546 result.should_explode = should_explode(result, opening_bracket)
2550 def dont_increase_indentation(split_func: SplitFunc) -> SplitFunc:
2551 """Normalize prefix of the first leaf in every line returned by `split_func`.
2553 This is a decorator over relevant split functions.
2557 def split_wrapper(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2558 for l in split_func(line, features):
2559 normalize_prefix(l.leaves[0], inside_brackets=True)
2562 return split_wrapper
2565 @dont_increase_indentation
2566 def delimiter_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2567 """Split according to delimiters of the highest priority.
2569 If the appropriate Features are given, the split will add trailing commas
2570 also in function signatures and calls that contain `*` and `**`.
2573 last_leaf = line.leaves[-1]
2575 raise CannotSplit("Line empty")
2577 bt = line.bracket_tracker
2579 delimiter_priority = bt.max_delimiter_priority(exclude={id(last_leaf)})
2581 raise CannotSplit("No delimiters found")
2583 if delimiter_priority == DOT_PRIORITY:
2584 if bt.delimiter_count_with_priority(delimiter_priority) == 1:
2585 raise CannotSplit("Splitting a single attribute from its owner looks wrong")
2587 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2588 lowest_depth = sys.maxsize
2589 trailing_comma_safe = True
2591 def append_to_line(leaf: Leaf) -> Iterator[Line]:
2592 """Append `leaf` to current line or to new line if appending impossible."""
2593 nonlocal current_line
2595 current_line.append_safe(leaf, preformatted=True)
2599 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2600 current_line.append(leaf)
2602 for leaf in line.leaves:
2603 yield from append_to_line(leaf)
2605 for comment_after in line.comments_after(leaf):
2606 yield from append_to_line(comment_after)
2608 lowest_depth = min(lowest_depth, leaf.bracket_depth)
2609 if leaf.bracket_depth == lowest_depth:
2610 if is_vararg(leaf, within={syms.typedargslist}):
2611 trailing_comma_safe = (
2612 trailing_comma_safe and Feature.TRAILING_COMMA_IN_DEF in features
2614 elif is_vararg(leaf, within={syms.arglist, syms.argument}):
2615 trailing_comma_safe = (
2616 trailing_comma_safe and Feature.TRAILING_COMMA_IN_CALL in features
2619 leaf_priority = bt.delimiters.get(id(leaf))
2620 if leaf_priority == delimiter_priority:
2623 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2627 and delimiter_priority == COMMA_PRIORITY
2628 and current_line.leaves[-1].type != token.COMMA
2629 and current_line.leaves[-1].type != STANDALONE_COMMENT
2631 current_line.append(Leaf(token.COMMA, ","))
2635 @dont_increase_indentation
2636 def standalone_comment_split(
2637 line: Line, features: Collection[Feature] = ()
2638 ) -> Iterator[Line]:
2639 """Split standalone comments from the rest of the line."""
2640 if not line.contains_standalone_comments(0):
2641 raise CannotSplit("Line does not have any standalone comments")
2643 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2645 def append_to_line(leaf: Leaf) -> Iterator[Line]:
2646 """Append `leaf` to current line or to new line if appending impossible."""
2647 nonlocal current_line
2649 current_line.append_safe(leaf, preformatted=True)
2653 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2654 current_line.append(leaf)
2656 for leaf in line.leaves:
2657 yield from append_to_line(leaf)
2659 for comment_after in line.comments_after(leaf):
2660 yield from append_to_line(comment_after)
2666 def is_import(leaf: Leaf) -> bool:
2667 """Return True if the given leaf starts an import statement."""
2674 (v == "import" and p and p.type == syms.import_name)
2675 or (v == "from" and p and p.type == syms.import_from)
2680 def is_type_comment(leaf: Leaf) -> bool:
2681 """Return True if the given leaf is a special comment.
2682 Only returns true for type comments for now."""
2685 return t in {token.COMMENT, t == STANDALONE_COMMENT} and v.startswith("# type:")
2688 def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
2689 """Leave existing extra newlines if not `inside_brackets`. Remove everything
2692 Note: don't use backslashes for formatting or you'll lose your voting rights.
2694 if not inside_brackets:
2695 spl = leaf.prefix.split("#")
2696 if "\\" not in spl[0]:
2697 nl_count = spl[-1].count("\n")
2700 leaf.prefix = "\n" * nl_count
2706 def normalize_string_prefix(leaf: Leaf, remove_u_prefix: bool = False) -> None:
2707 """Make all string prefixes lowercase.
2709 If remove_u_prefix is given, also removes any u prefix from the string.
2711 Note: Mutates its argument.
2713 match = re.match(r"^([furbFURB]*)(.*)$", leaf.value, re.DOTALL)
2714 assert match is not None, f"failed to match string {leaf.value!r}"
2715 orig_prefix = match.group(1)
2716 new_prefix = orig_prefix.lower()
2718 new_prefix = new_prefix.replace("u", "")
2719 leaf.value = f"{new_prefix}{match.group(2)}"
2722 def normalize_string_quotes(leaf: Leaf) -> None:
2723 """Prefer double quotes but only if it doesn't cause more escaping.
2725 Adds or removes backslashes as appropriate. Doesn't parse and fix
2726 strings nested in f-strings (yet).
2728 Note: Mutates its argument.
2730 value = leaf.value.lstrip("furbFURB")
2731 if value[:3] == '"""':
2734 elif value[:3] == "'''":
2737 elif value[0] == '"':
2743 first_quote_pos = leaf.value.find(orig_quote)
2744 if first_quote_pos == -1:
2745 return # There's an internal error
2747 prefix = leaf.value[:first_quote_pos]
2748 unescaped_new_quote = re.compile(rf"(([^\\]|^)(\\\\)*){new_quote}")
2749 escaped_new_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){new_quote}")
2750 escaped_orig_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){orig_quote}")
2751 body = leaf.value[first_quote_pos + len(orig_quote) : -len(orig_quote)]
2752 if "r" in prefix.casefold():
2753 if unescaped_new_quote.search(body):
2754 # There's at least one unescaped new_quote in this raw string
2755 # so converting is impossible
2758 # Do not introduce or remove backslashes in raw strings
2761 # remove unnecessary escapes
2762 new_body = sub_twice(escaped_new_quote, rf"\1\2{new_quote}", body)
2763 if body != new_body:
2764 # Consider the string without unnecessary escapes as the original
2766 leaf.value = f"{prefix}{orig_quote}{body}{orig_quote}"
2767 new_body = sub_twice(escaped_orig_quote, rf"\1\2{orig_quote}", new_body)
2768 new_body = sub_twice(unescaped_new_quote, rf"\1\\{new_quote}", new_body)
2769 if "f" in prefix.casefold():
2770 matches = re.findall(
2772 (?:[^{]|^)\{ # start of the string or a non-{ followed by a single {
2773 ([^{].*?) # contents of the brackets except if begins with {{
2774 \}(?:[^}]|$) # A } followed by end of the string or a non-}
2781 # Do not introduce backslashes in interpolated expressions
2783 if new_quote == '"""' and new_body[-1:] == '"':
2785 new_body = new_body[:-1] + '\\"'
2786 orig_escape_count = body.count("\\")
2787 new_escape_count = new_body.count("\\")
2788 if new_escape_count > orig_escape_count:
2789 return # Do not introduce more escaping
2791 if new_escape_count == orig_escape_count and orig_quote == '"':
2792 return # Prefer double quotes
2794 leaf.value = f"{prefix}{new_quote}{new_body}{new_quote}"
2797 def normalize_numeric_literal(leaf: Leaf) -> None:
2798 """Normalizes numeric (float, int, and complex) literals.
2800 All letters used in the representation are normalized to lowercase (except
2801 in Python 2 long literals).
2803 text = leaf.value.lower()
2804 if text.startswith(("0o", "0b")):
2805 # Leave octal and binary literals alone.
2807 elif text.startswith("0x"):
2808 # Change hex literals to upper case.
2809 before, after = text[:2], text[2:]
2810 text = f"{before}{after.upper()}"
2812 before, after = text.split("e")
2814 if after.startswith("-"):
2817 elif after.startswith("+"):
2819 before = format_float_or_int_string(before)
2820 text = f"{before}e{sign}{after}"
2821 elif text.endswith(("j", "l")):
2824 # Capitalize in "2L" because "l" looks too similar to "1".
2827 text = f"{format_float_or_int_string(number)}{suffix}"
2829 text = format_float_or_int_string(text)
2833 def format_float_or_int_string(text: str) -> str:
2834 """Formats a float string like "1.0"."""
2838 before, after = text.split(".")
2839 return f"{before or 0}.{after or 0}"
2842 def normalize_invisible_parens(node: Node, parens_after: Set[str]) -> None:
2843 """Make existing optional parentheses invisible or create new ones.
2845 `parens_after` is a set of string leaf values immediately after which parens
2848 Standardizes on visible parentheses for single-element tuples, and keeps
2849 existing visible parentheses for other tuples and generator expressions.
2851 for pc in list_comments(node.prefix, is_endmarker=False):
2852 if pc.value in FMT_OFF:
2853 # This `node` has a prefix with `# fmt: off`, don't mess with parens.
2857 for index, child in enumerate(list(node.children)):
2858 # Add parentheses around long tuple unpacking in assignments.
2861 and isinstance(child, Node)
2862 and child.type == syms.testlist_star_expr
2867 if child.type == syms.atom:
2868 if maybe_make_parens_invisible_in_atom(child, parent=node):
2869 lpar = Leaf(token.LPAR, "")
2870 rpar = Leaf(token.RPAR, "")
2871 index = child.remove() or 0
2872 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2873 elif is_one_tuple(child):
2874 # wrap child in visible parentheses
2875 lpar = Leaf(token.LPAR, "(")
2876 rpar = Leaf(token.RPAR, ")")
2878 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2879 elif node.type == syms.import_from:
2880 # "import from" nodes store parentheses directly as part of
2882 if child.type == token.LPAR:
2883 # make parentheses invisible
2884 child.value = "" # type: ignore
2885 node.children[-1].value = "" # type: ignore
2886 elif child.type != token.STAR:
2887 # insert invisible parentheses
2888 node.insert_child(index, Leaf(token.LPAR, ""))
2889 node.append_child(Leaf(token.RPAR, ""))
2892 elif not (isinstance(child, Leaf) and is_multiline_string(child)):
2893 # wrap child in invisible parentheses
2894 lpar = Leaf(token.LPAR, "")
2895 rpar = Leaf(token.RPAR, "")
2896 index = child.remove() or 0
2897 prefix = child.prefix
2899 new_child = Node(syms.atom, [lpar, child, rpar])
2900 new_child.prefix = prefix
2901 node.insert_child(index, new_child)
2903 check_lpar = isinstance(child, Leaf) and child.value in parens_after
2906 def normalize_fmt_off(node: Node) -> None:
2907 """Convert content between `# fmt: off`/`# fmt: on` into standalone comments."""
2910 try_again = convert_one_fmt_off_pair(node)
2913 def convert_one_fmt_off_pair(node: Node) -> bool:
2914 """Convert content of a single `# fmt: off`/`# fmt: on` into a standalone comment.
2916 Returns True if a pair was converted.
2918 for leaf in node.leaves():
2919 previous_consumed = 0
2920 for comment in list_comments(leaf.prefix, is_endmarker=False):
2921 if comment.value in FMT_OFF:
2922 # We only want standalone comments. If there's no previous leaf or
2923 # the previous leaf is indentation, it's a standalone comment in
2925 if comment.type != STANDALONE_COMMENT:
2926 prev = preceding_leaf(leaf)
2927 if prev and prev.type not in WHITESPACE:
2930 ignored_nodes = list(generate_ignored_nodes(leaf))
2931 if not ignored_nodes:
2934 first = ignored_nodes[0] # Can be a container node with the `leaf`.
2935 parent = first.parent
2936 prefix = first.prefix
2937 first.prefix = prefix[comment.consumed :]
2939 comment.value + "\n" + "".join(str(n) for n in ignored_nodes)
2941 if hidden_value.endswith("\n"):
2942 # That happens when one of the `ignored_nodes` ended with a NEWLINE
2943 # leaf (possibly followed by a DEDENT).
2944 hidden_value = hidden_value[:-1]
2946 for ignored in ignored_nodes:
2947 index = ignored.remove()
2948 if first_idx is None:
2950 assert parent is not None, "INTERNAL ERROR: fmt: on/off handling (1)"
2951 assert first_idx is not None, "INTERNAL ERROR: fmt: on/off handling (2)"
2952 parent.insert_child(
2957 prefix=prefix[:previous_consumed] + "\n" * comment.newlines,
2962 previous_consumed = comment.consumed
2967 def generate_ignored_nodes(leaf: Leaf) -> Iterator[LN]:
2968 """Starting from the container of `leaf`, generate all leaves until `# fmt: on`.
2970 Stops at the end of the block.
2972 container: Optional[LN] = container_of(leaf)
2973 while container is not None and container.type != token.ENDMARKER:
2974 for comment in list_comments(container.prefix, is_endmarker=False):
2975 if comment.value in FMT_ON:
2980 container = container.next_sibling
2983 def maybe_make_parens_invisible_in_atom(node: LN, parent: LN) -> bool:
2984 """If it's safe, make the parens in the atom `node` invisible, recursively.
2986 Returns whether the node should itself be wrapped in invisible parentheses.
2990 node.type != syms.atom
2991 or is_empty_tuple(node)
2992 or is_one_tuple(node)
2993 or (is_yield(node) and parent.type != syms.expr_stmt)
2994 or max_delimiter_priority_in_atom(node) >= COMMA_PRIORITY
2998 first = node.children[0]
2999 last = node.children[-1]
3000 if first.type == token.LPAR and last.type == token.RPAR:
3001 # make parentheses invisible
3002 first.value = "" # type: ignore
3003 last.value = "" # type: ignore
3004 if len(node.children) > 1:
3005 maybe_make_parens_invisible_in_atom(node.children[1], parent=parent)
3011 def is_empty_tuple(node: LN) -> bool:
3012 """Return True if `node` holds an empty tuple."""
3014 node.type == syms.atom
3015 and len(node.children) == 2
3016 and node.children[0].type == token.LPAR
3017 and node.children[1].type == token.RPAR
3021 def is_one_tuple(node: LN) -> bool:
3022 """Return True if `node` holds a tuple with one element, with or without parens."""
3023 if node.type == syms.atom:
3024 if len(node.children) != 3:
3027 lpar, gexp, rpar = node.children
3029 lpar.type == token.LPAR
3030 and gexp.type == syms.testlist_gexp
3031 and rpar.type == token.RPAR
3035 return len(gexp.children) == 2 and gexp.children[1].type == token.COMMA
3038 node.type in IMPLICIT_TUPLE
3039 and len(node.children) == 2
3040 and node.children[1].type == token.COMMA
3044 def is_yield(node: LN) -> bool:
3045 """Return True if `node` holds a `yield` or `yield from` expression."""
3046 if node.type == syms.yield_expr:
3049 if node.type == token.NAME and node.value == "yield": # type: ignore
3052 if node.type != syms.atom:
3055 if len(node.children) != 3:
3058 lpar, expr, rpar = node.children
3059 if lpar.type == token.LPAR and rpar.type == token.RPAR:
3060 return is_yield(expr)
3065 def is_vararg(leaf: Leaf, within: Set[NodeType]) -> bool:
3066 """Return True if `leaf` is a star or double star in a vararg or kwarg.
3068 If `within` includes VARARGS_PARENTS, this applies to function signatures.
3069 If `within` includes UNPACKING_PARENTS, it applies to right hand-side
3070 extended iterable unpacking (PEP 3132) and additional unpacking
3071 generalizations (PEP 448).
3073 if leaf.type not in STARS or not leaf.parent:
3077 if p.type == syms.star_expr:
3078 # Star expressions are also used as assignment targets in extended
3079 # iterable unpacking (PEP 3132). See what its parent is instead.
3085 return p.type in within
3088 def is_multiline_string(leaf: Leaf) -> bool:
3089 """Return True if `leaf` is a multiline string that actually spans many lines."""
3090 value = leaf.value.lstrip("furbFURB")
3091 return value[:3] in {'"""', "'''"} and "\n" in value
3094 def is_stub_suite(node: Node) -> bool:
3095 """Return True if `node` is a suite with a stub body."""
3097 len(node.children) != 4
3098 or node.children[0].type != token.NEWLINE
3099 or node.children[1].type != token.INDENT
3100 or node.children[3].type != token.DEDENT
3104 return is_stub_body(node.children[2])
3107 def is_stub_body(node: LN) -> bool:
3108 """Return True if `node` is a simple statement containing an ellipsis."""
3109 if not isinstance(node, Node) or node.type != syms.simple_stmt:
3112 if len(node.children) != 2:
3115 child = node.children[0]
3117 child.type == syms.atom
3118 and len(child.children) == 3
3119 and all(leaf == Leaf(token.DOT, ".") for leaf in child.children)
3123 def max_delimiter_priority_in_atom(node: LN) -> Priority:
3124 """Return maximum delimiter priority inside `node`.
3126 This is specific to atoms with contents contained in a pair of parentheses.
3127 If `node` isn't an atom or there are no enclosing parentheses, returns 0.
3129 if node.type != syms.atom:
3132 first = node.children[0]
3133 last = node.children[-1]
3134 if not (first.type == token.LPAR and last.type == token.RPAR):
3137 bt = BracketTracker()
3138 for c in node.children[1:-1]:
3139 if isinstance(c, Leaf):
3142 for leaf in c.leaves():
3145 return bt.max_delimiter_priority()
3151 def ensure_visible(leaf: Leaf) -> None:
3152 """Make sure parentheses are visible.
3154 They could be invisible as part of some statements (see
3155 :func:`normalize_invisible_parens` and :func:`visit_import_from`).
3157 if leaf.type == token.LPAR:
3159 elif leaf.type == token.RPAR:
3163 def should_explode(line: Line, opening_bracket: Leaf) -> bool:
3164 """Should `line` immediately be split with `delimiter_split()` after RHS?"""
3167 opening_bracket.parent
3168 and opening_bracket.parent.type in {syms.atom, syms.import_from}
3169 and opening_bracket.value in "[{("
3174 last_leaf = line.leaves[-1]
3175 exclude = {id(last_leaf)} if last_leaf.type == token.COMMA else set()
3176 max_priority = line.bracket_tracker.max_delimiter_priority(exclude=exclude)
3177 except (IndexError, ValueError):
3180 return max_priority == COMMA_PRIORITY
3183 def get_features_used(node: Node) -> Set[Feature]:
3184 """Return a set of (relatively) new Python features used in this file.
3186 Currently looking for:
3188 - underscores in numeric literals; and
3189 - trailing commas after * or ** in function signatures and calls.
3191 features: Set[Feature] = set()
3192 for n in node.pre_order():
3193 if n.type == token.STRING:
3194 value_head = n.value[:2] # type: ignore
3195 if value_head in {'f"', 'F"', "f'", "F'", "rf", "fr", "RF", "FR"}:
3196 features.add(Feature.F_STRINGS)
3198 elif n.type == token.NUMBER:
3199 if "_" in n.value: # type: ignore
3200 features.add(Feature.NUMERIC_UNDERSCORES)
3203 n.type in {syms.typedargslist, syms.arglist}
3205 and n.children[-1].type == token.COMMA
3207 if n.type == syms.typedargslist:
3208 feature = Feature.TRAILING_COMMA_IN_DEF
3210 feature = Feature.TRAILING_COMMA_IN_CALL
3212 for ch in n.children:
3213 if ch.type in STARS:
3214 features.add(feature)
3216 if ch.type == syms.argument:
3217 for argch in ch.children:
3218 if argch.type in STARS:
3219 features.add(feature)
3224 def detect_target_versions(node: Node) -> Set[TargetVersion]:
3225 """Detect the version to target based on the nodes used."""
3226 features = get_features_used(node)
3228 version for version in TargetVersion if features <= VERSION_TO_FEATURES[version]
3232 def generate_trailers_to_omit(line: Line, line_length: int) -> Iterator[Set[LeafID]]:
3233 """Generate sets of closing bracket IDs that should be omitted in a RHS.
3235 Brackets can be omitted if the entire trailer up to and including
3236 a preceding closing bracket fits in one line.
3238 Yielded sets are cumulative (contain results of previous yields, too). First
3242 omit: Set[LeafID] = set()
3245 length = 4 * line.depth
3246 opening_bracket = None
3247 closing_bracket = None
3248 inner_brackets: Set[LeafID] = set()
3249 for index, leaf, leaf_length in enumerate_with_length(line, reversed=True):
3250 length += leaf_length
3251 if length > line_length:
3254 has_inline_comment = leaf_length > len(leaf.value) + len(leaf.prefix)
3255 if leaf.type == STANDALONE_COMMENT or has_inline_comment:
3259 if leaf is opening_bracket:
3260 opening_bracket = None
3261 elif leaf.type in CLOSING_BRACKETS:
3262 inner_brackets.add(id(leaf))
3263 elif leaf.type in CLOSING_BRACKETS:
3264 if index > 0 and line.leaves[index - 1].type in OPENING_BRACKETS:
3265 # Empty brackets would fail a split so treat them as "inner"
3266 # brackets (e.g. only add them to the `omit` set if another
3267 # pair of brackets was good enough.
3268 inner_brackets.add(id(leaf))
3272 omit.add(id(closing_bracket))
3273 omit.update(inner_brackets)
3274 inner_brackets.clear()
3278 opening_bracket = leaf.opening_bracket
3279 closing_bracket = leaf
3282 def get_future_imports(node: Node) -> Set[str]:
3283 """Return a set of __future__ imports in the file."""
3284 imports: Set[str] = set()
3286 def get_imports_from_children(children: List[LN]) -> Generator[str, None, None]:
3287 for child in children:
3288 if isinstance(child, Leaf):
3289 if child.type == token.NAME:
3291 elif child.type == syms.import_as_name:
3292 orig_name = child.children[0]
3293 assert isinstance(orig_name, Leaf), "Invalid syntax parsing imports"
3294 assert orig_name.type == token.NAME, "Invalid syntax parsing imports"
3295 yield orig_name.value
3296 elif child.type == syms.import_as_names:
3297 yield from get_imports_from_children(child.children)
3299 raise AssertionError("Invalid syntax parsing imports")
3301 for child in node.children:
3302 if child.type != syms.simple_stmt:
3304 first_child = child.children[0]
3305 if isinstance(first_child, Leaf):
3306 # Continue looking if we see a docstring; otherwise stop.
3308 len(child.children) == 2
3309 and first_child.type == token.STRING
3310 and child.children[1].type == token.NEWLINE
3315 elif first_child.type == syms.import_from:
3316 module_name = first_child.children[1]
3317 if not isinstance(module_name, Leaf) or module_name.value != "__future__":
3319 imports |= set(get_imports_from_children(first_child.children[3:]))
3325 def gen_python_files_in_dir(
3328 include: Pattern[str],
3329 exclude: Pattern[str],
3331 ) -> Iterator[Path]:
3332 """Generate all files under `path` whose paths are not excluded by the
3333 `exclude` regex, but are included by the `include` regex.
3335 Symbolic links pointing outside of the `root` directory are ignored.
3337 `report` is where output about exclusions goes.
3339 assert root.is_absolute(), f"INTERNAL ERROR: `root` must be absolute but is {root}"
3340 for child in path.iterdir():
3342 normalized_path = "/" + child.resolve().relative_to(root).as_posix()
3344 if child.is_symlink():
3345 report.path_ignored(
3346 child, f"is a symbolic link that points outside {root}"
3353 normalized_path += "/"
3354 exclude_match = exclude.search(normalized_path)
3355 if exclude_match and exclude_match.group(0):
3356 report.path_ignored(child, f"matches the --exclude regular expression")
3360 yield from gen_python_files_in_dir(child, root, include, exclude, report)
3362 elif child.is_file():
3363 include_match = include.search(normalized_path)
3369 def find_project_root(srcs: Iterable[str]) -> Path:
3370 """Return a directory containing .git, .hg, or pyproject.toml.
3372 That directory can be one of the directories passed in `srcs` or their
3375 If no directory in the tree contains a marker that would specify it's the
3376 project root, the root of the file system is returned.
3379 return Path("/").resolve()
3381 common_base = min(Path(src).resolve() for src in srcs)
3382 if common_base.is_dir():
3383 # Append a fake file so `parents` below returns `common_base_dir`, too.
3384 common_base /= "fake-file"
3385 for directory in common_base.parents:
3386 if (directory / ".git").is_dir():
3389 if (directory / ".hg").is_dir():
3392 if (directory / "pyproject.toml").is_file():
3400 """Provides a reformatting counter. Can be rendered with `str(report)`."""
3404 verbose: bool = False
3405 change_count: int = 0
3407 failure_count: int = 0
3409 def done(self, src: Path, changed: Changed) -> None:
3410 """Increment the counter for successful reformatting. Write out a message."""
3411 if changed is Changed.YES:
3412 reformatted = "would reformat" if self.check else "reformatted"
3413 if self.verbose or not self.quiet:
3414 out(f"{reformatted} {src}")
3415 self.change_count += 1
3418 if changed is Changed.NO:
3419 msg = f"{src} already well formatted, good job."
3421 msg = f"{src} wasn't modified on disk since last run."
3422 out(msg, bold=False)
3423 self.same_count += 1
3425 def failed(self, src: Path, message: str) -> None:
3426 """Increment the counter for failed reformatting. Write out a message."""
3427 err(f"error: cannot format {src}: {message}")
3428 self.failure_count += 1
3430 def path_ignored(self, path: Path, message: str) -> None:
3432 out(f"{path} ignored: {message}", bold=False)
3435 def return_code(self) -> int:
3436 """Return the exit code that the app should use.
3438 This considers the current state of changed files and failures:
3439 - if there were any failures, return 123;
3440 - if any files were changed and --check is being used, return 1;
3441 - otherwise return 0.
3443 # According to http://tldp.org/LDP/abs/html/exitcodes.html starting with
3444 # 126 we have special return codes reserved by the shell.
3445 if self.failure_count:
3448 elif self.change_count and self.check:
3453 def __str__(self) -> str:
3454 """Render a color report of the current state.
3456 Use `click.unstyle` to remove colors.
3459 reformatted = "would be reformatted"
3460 unchanged = "would be left unchanged"
3461 failed = "would fail to reformat"
3463 reformatted = "reformatted"
3464 unchanged = "left unchanged"
3465 failed = "failed to reformat"
3467 if self.change_count:
3468 s = "s" if self.change_count > 1 else ""
3470 click.style(f"{self.change_count} file{s} {reformatted}", bold=True)
3473 s = "s" if self.same_count > 1 else ""
3474 report.append(f"{self.same_count} file{s} {unchanged}")
3475 if self.failure_count:
3476 s = "s" if self.failure_count > 1 else ""
3478 click.style(f"{self.failure_count} file{s} {failed}", fg="red")
3480 return ", ".join(report) + "."
3483 def parse_ast(src: str) -> Union[ast3.AST, ast27.AST]:
3484 for feature_version in (7, 6):
3486 return ast3.parse(src, feature_version=feature_version)
3490 return ast27.parse(src)
3493 def assert_equivalent(src: str, dst: str) -> None:
3494 """Raise AssertionError if `src` and `dst` aren't equivalent."""
3496 def _v(node: Union[ast3.AST, ast27.AST], depth: int = 0) -> Iterator[str]:
3497 """Simple visitor generating strings to compare ASTs by content."""
3498 yield f"{' ' * depth}{node.__class__.__name__}("
3500 for field in sorted(node._fields):
3501 # TypeIgnore has only one field 'lineno' which breaks this comparison
3502 if isinstance(node, (ast3.TypeIgnore, ast27.TypeIgnore)):
3505 # Ignore str kind which is case sensitive / and ignores unicode_literals
3506 if isinstance(node, (ast3.Str, ast27.Str, ast3.Bytes)) and field == "kind":
3510 value = getattr(node, field)
3511 except AttributeError:
3514 yield f"{' ' * (depth+1)}{field}="
3516 if isinstance(value, list):
3518 # Ignore nested tuples within del statements, because we may insert
3519 # parentheses and they change the AST.
3522 and isinstance(node, (ast3.Delete, ast27.Delete))
3523 and isinstance(item, (ast3.Tuple, ast27.Tuple))
3525 for item in item.elts:
3526 yield from _v(item, depth + 2)
3527 elif isinstance(item, (ast3.AST, ast27.AST)):
3528 yield from _v(item, depth + 2)
3530 elif isinstance(value, (ast3.AST, ast27.AST)):
3531 yield from _v(value, depth + 2)
3534 yield f"{' ' * (depth+2)}{value!r}, # {value.__class__.__name__}"
3536 yield f"{' ' * depth}) # /{node.__class__.__name__}"
3539 src_ast = parse_ast(src)
3540 except Exception as exc:
3541 raise AssertionError(
3542 f"cannot use --safe with this file; failed to parse source file. "
3543 f"AST error message: {exc}"
3547 dst_ast = parse_ast(dst)
3548 except Exception as exc:
3549 log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
3550 raise AssertionError(
3551 f"INTERNAL ERROR: Black produced invalid code: {exc}. "
3552 f"Please report a bug on https://github.com/python/black/issues. "
3553 f"This invalid output might be helpful: {log}"
3556 src_ast_str = "\n".join(_v(src_ast))
3557 dst_ast_str = "\n".join(_v(dst_ast))
3558 if src_ast_str != dst_ast_str:
3559 log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst"))
3560 raise AssertionError(
3561 f"INTERNAL ERROR: Black produced code that is not equivalent to "
3563 f"Please report a bug on https://github.com/python/black/issues. "
3564 f"This diff might be helpful: {log}"
3568 def assert_stable(src: str, dst: str, mode: FileMode) -> None:
3569 """Raise AssertionError if `dst` reformats differently the second time."""
3570 newdst = format_str(dst, mode=mode)
3573 diff(src, dst, "source", "first pass"),
3574 diff(dst, newdst, "first pass", "second pass"),
3576 raise AssertionError(
3577 f"INTERNAL ERROR: Black produced different code on the second pass "
3578 f"of the formatter. "
3579 f"Please report a bug on https://github.com/python/black/issues. "
3580 f"This diff might be helpful: {log}"
3584 def dump_to_file(*output: str) -> str:
3585 """Dump `output` to a temporary file. Return path to the file."""
3586 with tempfile.NamedTemporaryFile(
3587 mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
3589 for lines in output:
3591 if lines and lines[-1] != "\n":
3596 def diff(a: str, b: str, a_name: str, b_name: str) -> str:
3597 """Return a unified diff string between strings `a` and `b`."""
3600 a_lines = [line + "\n" for line in a.split("\n")]
3601 b_lines = [line + "\n" for line in b.split("\n")]
3603 difflib.unified_diff(a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5)
3607 def cancel(tasks: Iterable[asyncio.Task]) -> None:
3608 """asyncio signal handler that cancels all `tasks` and reports to stderr."""
3614 def shutdown(loop: asyncio.AbstractEventLoop) -> None:
3615 """Cancel all pending tasks on `loop`, wait for them, and close the loop."""
3617 if sys.version_info[:2] >= (3, 7):
3618 all_tasks = asyncio.all_tasks
3620 all_tasks = asyncio.Task.all_tasks
3621 # This part is borrowed from asyncio/runners.py in Python 3.7b2.
3622 to_cancel = [task for task in all_tasks(loop) if not task.done()]
3626 for task in to_cancel:
3628 loop.run_until_complete(
3629 asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
3632 # `concurrent.futures.Future` objects cannot be cancelled once they
3633 # are already running. There might be some when the `shutdown()` happened.
3634 # Silence their logger's spew about the event loop being closed.
3635 cf_logger = logging.getLogger("concurrent.futures")
3636 cf_logger.setLevel(logging.CRITICAL)
3640 def sub_twice(regex: Pattern[str], replacement: str, original: str) -> str:
3641 """Replace `regex` with `replacement` twice on `original`.
3643 This is used by string normalization to perform replaces on
3644 overlapping matches.
3646 return regex.sub(replacement, regex.sub(replacement, original))
3649 def re_compile_maybe_verbose(regex: str) -> Pattern[str]:
3650 """Compile a regular expression string in `regex`.
3652 If it contains newlines, use verbose mode.
3655 regex = "(?x)" + regex
3656 return re.compile(regex)
3659 def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
3660 """Like `reversed(enumerate(sequence))` if that were possible."""
3661 index = len(sequence) - 1
3662 for element in reversed(sequence):
3663 yield (index, element)
3667 def enumerate_with_length(
3668 line: Line, reversed: bool = False
3669 ) -> Iterator[Tuple[Index, Leaf, int]]:
3670 """Return an enumeration of leaves with their length.
3672 Stops prematurely on multiline strings and standalone comments.
3675 Callable[[Sequence[Leaf]], Iterator[Tuple[Index, Leaf]]],
3676 enumerate_reversed if reversed else enumerate,
3678 for index, leaf in op(line.leaves):
3679 length = len(leaf.prefix) + len(leaf.value)
3680 if "\n" in leaf.value:
3681 return # Multiline strings, we can't continue.
3683 for comment in line.comments_after(leaf):
3684 length += len(comment.value)
3686 yield index, leaf, length
3689 def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
3690 """Return True if `line` is no longer than `line_length`.
3692 Uses the provided `line_str` rendering, if any, otherwise computes a new one.
3695 line_str = str(line).strip("\n")
3697 len(line_str) <= line_length
3698 and "\n" not in line_str # multiline strings
3699 and not line.contains_standalone_comments()
3703 def can_be_split(line: Line) -> bool:
3704 """Return False if the line cannot be split *for sure*.
3706 This is not an exhaustive search but a cheap heuristic that we can use to
3707 avoid some unfortunate formattings (mostly around wrapping unsplittable code
3708 in unnecessary parentheses).
3710 leaves = line.leaves
3714 if leaves[0].type == token.STRING and leaves[1].type == token.DOT:
3718 for leaf in leaves[-2::-1]:
3719 if leaf.type in OPENING_BRACKETS:
3720 if next.type not in CLOSING_BRACKETS:
3724 elif leaf.type == token.DOT:
3726 elif leaf.type == token.NAME:
3727 if not (next.type == token.DOT or next.type in OPENING_BRACKETS):
3730 elif leaf.type not in CLOSING_BRACKETS:
3733 if dot_count > 1 and call_count > 1:
3739 def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
3740 """Does `line` have a shape safe to reformat without optional parens around it?
3742 Returns True for only a subset of potentially nice looking formattings but
3743 the point is to not return false positives that end up producing lines that
3746 bt = line.bracket_tracker
3747 if not bt.delimiters:
3748 # Without delimiters the optional parentheses are useless.
3751 max_priority = bt.max_delimiter_priority()
3752 if bt.delimiter_count_with_priority(max_priority) > 1:
3753 # With more than one delimiter of a kind the optional parentheses read better.
3756 if max_priority == DOT_PRIORITY:
3757 # A single stranded method call doesn't require optional parentheses.
3760 assert len(line.leaves) >= 2, "Stranded delimiter"
3762 first = line.leaves[0]
3763 second = line.leaves[1]
3764 penultimate = line.leaves[-2]
3765 last = line.leaves[-1]
3767 # With a single delimiter, omit if the expression starts or ends with
3769 if first.type in OPENING_BRACKETS and second.type not in CLOSING_BRACKETS:
3771 length = 4 * line.depth
3772 for _index, leaf, leaf_length in enumerate_with_length(line):
3773 if leaf.type in CLOSING_BRACKETS and leaf.opening_bracket is first:
3776 length += leaf_length
3777 if length > line_length:
3780 if leaf.type in OPENING_BRACKETS:
3781 # There are brackets we can further split on.
3785 # checked the entire string and line length wasn't exceeded
3786 if len(line.leaves) == _index + 1:
3789 # Note: we are not returning False here because a line might have *both*
3790 # a leading opening bracket and a trailing closing bracket. If the
3791 # opening bracket doesn't match our rule, maybe the closing will.
3794 last.type == token.RPAR
3795 or last.type == token.RBRACE
3797 # don't use indexing for omitting optional parentheses;
3799 last.type == token.RSQB
3801 and last.parent.type != syms.trailer
3804 if penultimate.type in OPENING_BRACKETS:
3805 # Empty brackets don't help.
3808 if is_multiline_string(first):
3809 # Additional wrapping of a multiline string in this situation is
3813 length = 4 * line.depth
3814 seen_other_brackets = False
3815 for _index, leaf, leaf_length in enumerate_with_length(line):
3816 length += leaf_length
3817 if leaf is last.opening_bracket:
3818 if seen_other_brackets or length <= line_length:
3821 elif leaf.type in OPENING_BRACKETS:
3822 # There are brackets we can further split on.
3823 seen_other_brackets = True
3828 def get_cache_file(mode: FileMode) -> Path:
3829 return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
3832 def read_cache(mode: FileMode) -> Cache:
3833 """Read the cache if it exists and is well formed.
3835 If it is not well formed, the call to write_cache later should resolve the issue.
3837 cache_file = get_cache_file(mode)
3838 if not cache_file.exists():
3841 with cache_file.open("rb") as fobj:
3843 cache: Cache = pickle.load(fobj)
3844 except pickle.UnpicklingError:
3850 def get_cache_info(path: Path) -> CacheInfo:
3851 """Return the information used to check if a file is already formatted or not."""
3853 return stat.st_mtime, stat.st_size
3856 def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
3857 """Split an iterable of paths in `sources` into two sets.
3859 The first contains paths of files that modified on disk or are not in the
3860 cache. The other contains paths to non-modified files.
3862 todo, done = set(), set()
3865 if cache.get(src) != get_cache_info(src):
3872 def write_cache(cache: Cache, sources: Iterable[Path], mode: FileMode) -> None:
3873 """Update the cache file."""
3874 cache_file = get_cache_file(mode)
3876 CACHE_DIR.mkdir(parents=True, exist_ok=True)
3877 new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
3878 with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
3879 pickle.dump(new_cache, f, protocol=pickle.HIGHEST_PROTOCOL)
3880 os.replace(f.name, cache_file)
3885 def patch_click() -> None:
3886 """Make Click not crash.
3888 On certain misconfigured environments, Python 3 selects the ASCII encoding as the
3889 default which restricts paths that it can access during the lifetime of the
3890 application. Click refuses to work in this scenario by raising a RuntimeError.
3892 In case of Black the likelihood that non-ASCII characters are going to be used in
3893 file paths is minimal since it's Python source code. Moreover, this crash was
3894 spurious on Python 3.7 thanks to PEP 538 and PEP 540.
3897 from click import core
3898 from click import _unicodefun # type: ignore
3899 except ModuleNotFoundError:
3902 for module in (core, _unicodefun):
3903 if hasattr(module, "_verify_python3_env"):
3904 module._verify_python3_env = lambda: None
3907 def patched_main() -> None:
3913 if __name__ == "__main__":