All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
2 from asyncio.base_events import BaseEventLoop
3 from concurrent.futures import Executor, ProcessPoolExecutor
4 from datetime import datetime
6 from functools import lru_cache, partial, wraps
10 from multiprocessing import Manager, freeze_support
12 from pathlib import Path
39 from appdirs import user_cache_dir
40 from attr import dataclass, evolve, Factory
43 from typed_ast import ast3, ast27
46 from blib2to3.pytree import Node, Leaf, type_repr
47 from blib2to3 import pygram, pytree
48 from blib2to3.pgen2 import driver, token
49 from blib2to3.pgen2.grammar import Grammar
50 from blib2to3.pgen2.parse import ParseError
53 __version__ = "19.3b0"
54 DEFAULT_LINE_LENGTH = 88
56 r"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|_build|buck-out|build|dist)/"
58 DEFAULT_INCLUDES = r"\.pyi?$"
59 CACHE_DIR = Path(user_cache_dir("black", version=__version__))
71 LN = Union[Leaf, Node]
72 SplitFunc = Callable[["Line", Collection["Feature"]], Iterator["Line"]]
75 CacheInfo = Tuple[Timestamp, FileSize]
76 Cache = Dict[Path, CacheInfo]
77 out = partial(click.secho, bold=True, err=True)
78 err = partial(click.secho, fg="red", err=True)
80 pygram.initialize(CACHE_DIR)
81 syms = pygram.python_symbols
84 class NothingChanged(UserWarning):
85 """Raised when reformatted code is the same as source."""
88 class CannotSplit(Exception):
89 """A readable split that fits the allotted line length is impossible."""
92 class InvalidInput(ValueError):
93 """Raised when input source code fails all parse attempts."""
96 class WriteBack(Enum):
103 def from_configuration(cls, *, check: bool, diff: bool) -> "WriteBack":
104 if check and not diff:
107 return cls.DIFF if diff else cls.YES
116 class TargetVersion(Enum):
125 def is_python2(self) -> bool:
126 return self is TargetVersion.PY27
129 PY36_VERSIONS = {TargetVersion.PY36, TargetVersion.PY37, TargetVersion.PY38}
133 # All string literals are unicode
136 NUMERIC_UNDERSCORES = 3
137 TRAILING_COMMA_IN_CALL = 4
138 TRAILING_COMMA_IN_DEF = 5
139 # The following two feature-flags are mutually exclusive, and exactly one should be
140 # set for every version of python.
141 ASYNC_IDENTIFIERS = 6
145 VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
146 TargetVersion.PY27: {Feature.ASYNC_IDENTIFIERS},
147 TargetVersion.PY33: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
148 TargetVersion.PY34: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
149 TargetVersion.PY35: {
150 Feature.UNICODE_LITERALS,
151 Feature.TRAILING_COMMA_IN_CALL,
152 Feature.ASYNC_IDENTIFIERS,
154 TargetVersion.PY36: {
155 Feature.UNICODE_LITERALS,
157 Feature.NUMERIC_UNDERSCORES,
158 Feature.TRAILING_COMMA_IN_CALL,
159 Feature.TRAILING_COMMA_IN_DEF,
160 Feature.ASYNC_IDENTIFIERS,
162 TargetVersion.PY37: {
163 Feature.UNICODE_LITERALS,
165 Feature.NUMERIC_UNDERSCORES,
166 Feature.TRAILING_COMMA_IN_CALL,
167 Feature.TRAILING_COMMA_IN_DEF,
168 Feature.ASYNC_KEYWORDS,
170 TargetVersion.PY38: {
171 Feature.UNICODE_LITERALS,
173 Feature.NUMERIC_UNDERSCORES,
174 Feature.TRAILING_COMMA_IN_CALL,
175 Feature.TRAILING_COMMA_IN_DEF,
176 Feature.ASYNC_KEYWORDS,
183 target_versions: Set[TargetVersion] = Factory(set)
184 line_length: int = DEFAULT_LINE_LENGTH
185 string_normalization: bool = True
188 def get_cache_key(self) -> str:
189 if self.target_versions:
190 version_str = ",".join(
192 for version in sorted(self.target_versions, key=lambda v: v.value)
198 str(self.line_length),
199 str(int(self.string_normalization)),
200 str(int(self.is_pyi)),
202 return ".".join(parts)
205 def supports_feature(target_versions: Set[TargetVersion], feature: Feature) -> bool:
206 return all(feature in VERSION_TO_FEATURES[version] for version in target_versions)
209 def read_pyproject_toml(
210 ctx: click.Context, param: click.Parameter, value: Union[str, int, bool, None]
212 """Inject Black configuration from "pyproject.toml" into defaults in `ctx`.
214 Returns the path to a successfully found and read configuration file, None
217 assert not isinstance(value, (int, bool)), "Invalid parameter type passed"
219 root = find_project_root(ctx.params.get("src", ()))
220 path = root / "pyproject.toml"
227 pyproject_toml = toml.load(value)
228 config = pyproject_toml.get("tool", {}).get("black", {})
229 except (toml.TomlDecodeError, OSError) as e:
230 raise click.FileError(
231 filename=value, hint=f"Error reading configuration file: {e}"
237 if ctx.default_map is None:
239 ctx.default_map.update( # type: ignore # bad types in .pyi
240 {k.replace("--", "").replace("-", "_"): v for k, v in config.items()}
245 @click.command(context_settings=dict(help_option_names=["-h", "--help"]))
246 @click.option("-c", "--code", type=str, help="Format the code passed in as a string.")
251 default=DEFAULT_LINE_LENGTH,
252 help="How many characters per line to allow.",
258 type=click.Choice([v.name.lower() for v in TargetVersion]),
259 callback=lambda c, p, v: [TargetVersion[val.upper()] for val in v],
262 "Python versions that should be supported by Black's output. [default: "
263 "per-file auto-detection]"
270 "Allow using Python 3.6-only syntax on all input files. This will put "
271 "trailing commas in function signatures and calls also after *args and "
272 "**kwargs. Deprecated; use --target-version instead. "
273 "[default: per-file auto-detection]"
280 "Format all input files like typing stubs regardless of file extension "
281 "(useful when piping source on standard input)."
286 "--skip-string-normalization",
288 help="Don't normalize string quotes or prefixes.",
294 "Don't write the files back, just return the status. Return code 0 "
295 "means nothing would change. Return code 1 means some files would be "
296 "reformatted. Return code 123 means there was an internal error."
302 help="Don't write the files back, just output a diff for each file on stdout.",
307 help="If --fast given, skip temporary sanity checks. [default: --safe]",
312 default=DEFAULT_INCLUDES,
314 "A regular expression that matches files and directories that should be "
315 "included on recursive searches. An empty value means all files are "
316 "included regardless of the name. Use forward slashes for directories on "
317 "all platforms (Windows, too). Exclusions are calculated first, inclusions "
325 default=DEFAULT_EXCLUDES,
327 "A regular expression that matches files and directories that should be "
328 "excluded on recursive searches. An empty value means no paths are excluded. "
329 "Use forward slashes for directories on all platforms (Windows, too). "
330 "Exclusions are calculated first, inclusions later."
339 "Don't emit non-error messages to stderr. Errors are still emitted, "
340 "silence those with 2>/dev/null."
348 "Also emit messages to stderr about files that were not changed or were "
349 "ignored due to --exclude=."
352 @click.version_option(version=__version__)
357 exists=True, file_okay=True, dir_okay=True, readable=True, allow_dash=True
364 exists=False, file_okay=True, dir_okay=False, readable=True, allow_dash=False
367 callback=read_pyproject_toml,
368 help="Read configuration from PATH.",
375 target_version: List[TargetVersion],
381 skip_string_normalization: bool,
387 config: Optional[str],
389 """The uncompromising code formatter."""
390 write_back = WriteBack.from_configuration(check=check, diff=diff)
393 err(f"Cannot use both --target-version and --py36")
396 versions = set(target_version)
399 "--py36 is deprecated and will be removed in a future version. "
400 "Use --target-version py36 instead."
402 versions = PY36_VERSIONS
404 # We'll autodetect later.
407 target_versions=versions,
408 line_length=line_length,
410 string_normalization=not skip_string_normalization,
412 if config and verbose:
413 out(f"Using configuration from {config}.", bold=False, fg="blue")
415 print(format_str(code, mode=mode))
418 include_regex = re_compile_maybe_verbose(include)
420 err(f"Invalid regular expression for include given: {include!r}")
423 exclude_regex = re_compile_maybe_verbose(exclude)
425 err(f"Invalid regular expression for exclude given: {exclude!r}")
427 report = Report(check=check, quiet=quiet, verbose=verbose)
428 root = find_project_root(src)
429 sources: Set[Path] = set()
434 gen_python_files_in_dir(p, root, include_regex, exclude_regex, report)
436 elif p.is_file() or s == "-":
437 # if a file was explicitly given, we don't care about its extension
440 err(f"invalid path: {s}")
441 if len(sources) == 0:
442 if verbose or not quiet:
443 out("No paths given. Nothing to do 😴")
446 if len(sources) == 1:
450 write_back=write_back,
456 sources=sources, fast=fast, write_back=write_back, mode=mode, report=report
459 if verbose or not quiet:
460 out("Oh no! 💥 💔 💥" if report.return_code else "All done! ✨ 🍰 ✨")
461 click.secho(str(report), err=True)
462 ctx.exit(report.return_code)
466 src: Path, fast: bool, write_back: WriteBack, mode: FileMode, report: "Report"
468 """Reformat a single file under `src` without spawning child processes.
470 If `quiet` is True, non-error messages are not output. `line_length`,
471 `write_back`, `fast` and `pyi` options are passed to
472 :func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
476 if not src.is_file() and str(src) == "-":
477 if format_stdin_to_stdout(fast=fast, write_back=write_back, mode=mode):
478 changed = Changed.YES
481 if write_back != WriteBack.DIFF:
482 cache = read_cache(mode)
483 res_src = src.resolve()
484 if res_src in cache and cache[res_src] == get_cache_info(res_src):
485 changed = Changed.CACHED
486 if changed is not Changed.CACHED and format_file_in_place(
487 src, fast=fast, write_back=write_back, mode=mode
489 changed = Changed.YES
490 if (write_back is WriteBack.YES and changed is not Changed.CACHED) or (
491 write_back is WriteBack.CHECK and changed is Changed.NO
493 write_cache(cache, [src], mode)
494 report.done(src, changed)
495 except Exception as exc:
496 report.failed(src, str(exc))
502 write_back: WriteBack,
506 """Reformat multiple files using a ProcessPoolExecutor."""
507 loop = asyncio.get_event_loop()
508 worker_count = os.cpu_count()
509 if sys.platform == "win32":
510 # Work around https://bugs.python.org/issue26903
511 worker_count = min(worker_count, 61)
512 executor = ProcessPoolExecutor(max_workers=worker_count)
514 loop.run_until_complete(
518 write_back=write_back,
529 async def schedule_formatting(
532 write_back: WriteBack,
538 """Run formatting of `sources` in parallel using the provided `executor`.
540 (Use ProcessPoolExecutors for actual parallelism.)
542 `line_length`, `write_back`, `fast`, and `pyi` options are passed to
543 :func:`format_file_in_place`.
546 if write_back != WriteBack.DIFF:
547 cache = read_cache(mode)
548 sources, cached = filter_cached(cache, sources)
549 for src in sorted(cached):
550 report.done(src, Changed.CACHED)
555 sources_to_cache = []
557 if write_back == WriteBack.DIFF:
558 # For diff output, we need locks to ensure we don't interleave output
559 # from different processes.
561 lock = manager.Lock()
563 asyncio.ensure_future(
564 loop.run_in_executor(
565 executor, format_file_in_place, src, fast, mode, write_back, lock
568 for src in sorted(sources)
570 pending: Iterable[asyncio.Future] = tasks.keys()
572 loop.add_signal_handler(signal.SIGINT, cancel, pending)
573 loop.add_signal_handler(signal.SIGTERM, cancel, pending)
574 except NotImplementedError:
575 # There are no good alternatives for these on Windows.
578 done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
580 src = tasks.pop(task)
582 cancelled.append(task)
583 elif task.exception():
584 report.failed(src, str(task.exception()))
586 changed = Changed.YES if task.result() else Changed.NO
587 # If the file was written back or was successfully checked as
588 # well-formatted, store this information in the cache.
589 if write_back is WriteBack.YES or (
590 write_back is WriteBack.CHECK and changed is Changed.NO
592 sources_to_cache.append(src)
593 report.done(src, changed)
595 await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
597 write_cache(cache, sources_to_cache, mode)
600 def format_file_in_place(
604 write_back: WriteBack = WriteBack.NO,
605 lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
607 """Format file under `src` path. Return True if changed.
609 If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted
611 `line_length` and `fast` options are passed to :func:`format_file_contents`.
613 if src.suffix == ".pyi":
614 mode = evolve(mode, is_pyi=True)
616 then = datetime.utcfromtimestamp(src.stat().st_mtime)
617 with open(src, "rb") as buf:
618 src_contents, encoding, newline = decode_bytes(buf.read())
620 dst_contents = format_file_contents(src_contents, fast=fast, mode=mode)
621 except NothingChanged:
624 if write_back == write_back.YES:
625 with open(src, "w", encoding=encoding, newline=newline) as f:
626 f.write(dst_contents)
627 elif write_back == write_back.DIFF:
628 now = datetime.utcnow()
629 src_name = f"{src}\t{then} +0000"
630 dst_name = f"{src}\t{now} +0000"
631 diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
635 f = io.TextIOWrapper(
641 f.write(diff_contents)
649 def format_stdin_to_stdout(
650 fast: bool, *, write_back: WriteBack = WriteBack.NO, mode: FileMode
652 """Format file on stdin. Return True if changed.
654 If `write_back` is YES, write reformatted code back to stdout. If it is DIFF,
655 write a diff to stdout. The `mode` argument is passed to
656 :func:`format_file_contents`.
658 then = datetime.utcnow()
659 src, encoding, newline = decode_bytes(sys.stdin.buffer.read())
662 dst = format_file_contents(src, fast=fast, mode=mode)
665 except NothingChanged:
669 f = io.TextIOWrapper(
670 sys.stdout.buffer, encoding=encoding, newline=newline, write_through=True
672 if write_back == WriteBack.YES:
674 elif write_back == WriteBack.DIFF:
675 now = datetime.utcnow()
676 src_name = f"STDIN\t{then} +0000"
677 dst_name = f"STDOUT\t{now} +0000"
678 f.write(diff(src, dst, src_name, dst_name))
682 def format_file_contents(
683 src_contents: str, *, fast: bool, mode: FileMode
685 """Reformat contents a file and return new contents.
687 If `fast` is False, additionally confirm that the reformatted code is
688 valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
689 `line_length` is passed to :func:`format_str`.
691 if src_contents.strip() == "":
694 dst_contents = format_str(src_contents, mode=mode)
695 if src_contents == dst_contents:
699 assert_equivalent(src_contents, dst_contents)
700 assert_stable(src_contents, dst_contents, mode=mode)
704 def format_str(src_contents: str, *, mode: FileMode) -> FileContent:
705 """Reformat a string and return new contents.
707 `line_length` determines how many characters per line are allowed.
709 src_node = lib2to3_parse(src_contents.lstrip(), mode.target_versions)
711 future_imports = get_future_imports(src_node)
712 if mode.target_versions:
713 versions = mode.target_versions
715 versions = detect_target_versions(src_node)
716 normalize_fmt_off(src_node)
717 lines = LineGenerator(
718 remove_u_prefix="unicode_literals" in future_imports
719 or supports_feature(versions, Feature.UNICODE_LITERALS),
721 normalize_strings=mode.string_normalization,
723 elt = EmptyLineTracker(is_pyi=mode.is_pyi)
726 split_line_features = {
728 for feature in {Feature.TRAILING_COMMA_IN_CALL, Feature.TRAILING_COMMA_IN_DEF}
729 if supports_feature(versions, feature)
731 for current_line in lines.visit(src_node):
732 for _ in range(after):
733 dst_contents.append(str(empty_line))
734 before, after = elt.maybe_empty_lines(current_line)
735 for _ in range(before):
736 dst_contents.append(str(empty_line))
737 for line in split_line(
738 current_line, line_length=mode.line_length, features=split_line_features
740 dst_contents.append(str(line))
741 return "".join(dst_contents)
744 def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine]:
745 """Return a tuple of (decoded_contents, encoding, newline).
747 `newline` is either CRLF or LF but `decoded_contents` is decoded with
748 universal newlines (i.e. only contains LF).
750 srcbuf = io.BytesIO(src)
751 encoding, lines = tokenize.detect_encoding(srcbuf.readline)
753 return "", encoding, "\n"
755 newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
757 with io.TextIOWrapper(srcbuf, encoding) as tiow:
758 return tiow.read(), encoding, newline
761 def get_grammars(target_versions: Set[TargetVersion]) -> List[Grammar]:
762 if not target_versions:
763 # No target_version specified, so try all grammars.
766 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords,
768 pygram.python_grammar_no_print_statement_no_exec_statement,
769 # Python 2.7 with future print_function import
770 pygram.python_grammar_no_print_statement,
772 pygram.python_grammar,
774 elif all(version.is_python2() for version in target_versions):
775 # Python 2-only code, so try Python 2 grammars.
777 # Python 2.7 with future print_function import
778 pygram.python_grammar_no_print_statement,
780 pygram.python_grammar,
783 # Python 3-compatible code, so only try Python 3 grammar.
785 # If we have to parse both, try to parse async as a keyword first
786 if not supports_feature(target_versions, Feature.ASYNC_IDENTIFIERS):
789 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords # noqa: B950
791 if not supports_feature(target_versions, Feature.ASYNC_KEYWORDS):
793 grammars.append(pygram.python_grammar_no_print_statement_no_exec_statement)
794 # At least one of the above branches must have been taken, because every Python
795 # version has exactly one of the two 'ASYNC_*' flags
799 def lib2to3_parse(src_txt: str, target_versions: Iterable[TargetVersion] = ()) -> Node:
800 """Given a string with source, return the lib2to3 Node."""
801 if src_txt[-1:] != "\n":
804 for grammar in get_grammars(set(target_versions)):
805 drv = driver.Driver(grammar, pytree.convert)
807 result = drv.parse_string(src_txt, True)
810 except ParseError as pe:
811 lineno, column = pe.context[1]
812 lines = src_txt.splitlines()
814 faulty_line = lines[lineno - 1]
816 faulty_line = "<line number missing in source>"
817 exc = InvalidInput(f"Cannot parse: {lineno}:{column}: {faulty_line}")
821 if isinstance(result, Leaf):
822 result = Node(syms.file_input, [result])
826 def lib2to3_unparse(node: Node) -> str:
827 """Given a lib2to3 node, return its string representation."""
835 class Visitor(Generic[T]):
836 """Basic lib2to3 visitor that yields things of type `T` on `visit()`."""
838 def visit(self, node: LN) -> Iterator[T]:
839 """Main method to visit `node` and its children.
841 It tries to find a `visit_*()` method for the given `node.type`, like
842 `visit_simple_stmt` for Node objects or `visit_INDENT` for Leaf objects.
843 If no dedicated `visit_*()` method is found, chooses `visit_default()`
846 Then yields objects of type `T` from the selected visitor.
849 name = token.tok_name[node.type]
851 name = type_repr(node.type)
852 yield from getattr(self, f"visit_{name}", self.visit_default)(node)
854 def visit_default(self, node: LN) -> Iterator[T]:
855 """Default `visit_*()` implementation. Recurses to children of `node`."""
856 if isinstance(node, Node):
857 for child in node.children:
858 yield from self.visit(child)
862 class DebugVisitor(Visitor[T]):
865 def visit_default(self, node: LN) -> Iterator[T]:
866 indent = " " * (2 * self.tree_depth)
867 if isinstance(node, Node):
868 _type = type_repr(node.type)
869 out(f"{indent}{_type}", fg="yellow")
871 for child in node.children:
872 yield from self.visit(child)
875 out(f"{indent}/{_type}", fg="yellow", bold=False)
877 _type = token.tok_name.get(node.type, str(node.type))
878 out(f"{indent}{_type}", fg="blue", nl=False)
880 # We don't have to handle prefixes for `Node` objects since
881 # that delegates to the first child anyway.
882 out(f" {node.prefix!r}", fg="green", bold=False, nl=False)
883 out(f" {node.value!r}", fg="blue", bold=False)
886 def show(cls, code: Union[str, Leaf, Node]) -> None:
887 """Pretty-print the lib2to3 AST of a given string of `code`.
889 Convenience method for debugging.
891 v: DebugVisitor[None] = DebugVisitor()
892 if isinstance(code, str):
893 code = lib2to3_parse(code)
897 WHITESPACE = {token.DEDENT, token.INDENT, token.NEWLINE}
908 STANDALONE_COMMENT = 153
909 token.tok_name[STANDALONE_COMMENT] = "STANDALONE_COMMENT"
910 LOGIC_OPERATORS = {"and", "or"}
935 STARS = {token.STAR, token.DOUBLESTAR}
938 syms.argument, # double star in arglist
939 syms.trailer, # single argument to call
941 syms.varargslist, # lambdas
943 UNPACKING_PARENTS = {
944 syms.atom, # single element of a list or set literal
948 syms.testlist_star_expr,
983 COMPREHENSION_PRIORITY = 20
985 TERNARY_PRIORITY = 16
988 COMPARATOR_PRIORITY = 10
999 token.DOUBLESLASH: 4,
1003 token.DOUBLESTAR: 2,
1009 class BracketTracker:
1010 """Keeps track of brackets on a line."""
1013 bracket_match: Dict[Tuple[Depth, NodeType], Leaf] = Factory(dict)
1014 delimiters: Dict[LeafID, Priority] = Factory(dict)
1015 previous: Optional[Leaf] = None
1016 _for_loop_depths: List[int] = Factory(list)
1017 _lambda_argument_depths: List[int] = Factory(list)
1019 def mark(self, leaf: Leaf) -> None:
1020 """Mark `leaf` with bracket-related metadata. Keep track of delimiters.
1022 All leaves receive an int `bracket_depth` field that stores how deep
1023 within brackets a given leaf is. 0 means there are no enclosing brackets
1024 that started on this line.
1026 If a leaf is itself a closing bracket, it receives an `opening_bracket`
1027 field that it forms a pair with. This is a one-directional link to
1028 avoid reference cycles.
1030 If a leaf is a delimiter (a token on which Black can split the line if
1031 needed) and it's on depth 0, its `id()` is stored in the tracker's
1034 if leaf.type == token.COMMENT:
1037 self.maybe_decrement_after_for_loop_variable(leaf)
1038 self.maybe_decrement_after_lambda_arguments(leaf)
1039 if leaf.type in CLOSING_BRACKETS:
1041 opening_bracket = self.bracket_match.pop((self.depth, leaf.type))
1042 leaf.opening_bracket = opening_bracket
1043 leaf.bracket_depth = self.depth
1045 delim = is_split_before_delimiter(leaf, self.previous)
1046 if delim and self.previous is not None:
1047 self.delimiters[id(self.previous)] = delim
1049 delim = is_split_after_delimiter(leaf, self.previous)
1051 self.delimiters[id(leaf)] = delim
1052 if leaf.type in OPENING_BRACKETS:
1053 self.bracket_match[self.depth, BRACKET[leaf.type]] = leaf
1055 self.previous = leaf
1056 self.maybe_increment_lambda_arguments(leaf)
1057 self.maybe_increment_for_loop_variable(leaf)
1059 def any_open_brackets(self) -> bool:
1060 """Return True if there is an yet unmatched open bracket on the line."""
1061 return bool(self.bracket_match)
1063 def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> int:
1064 """Return the highest priority of a delimiter found on the line.
1066 Values are consistent with what `is_split_*_delimiter()` return.
1067 Raises ValueError on no delimiters.
1069 return max(v for k, v in self.delimiters.items() if k not in exclude)
1071 def delimiter_count_with_priority(self, priority: int = 0) -> int:
1072 """Return the number of delimiters with the given `priority`.
1074 If no `priority` is passed, defaults to max priority on the line.
1076 if not self.delimiters:
1079 priority = priority or self.max_delimiter_priority()
1080 return sum(1 for p in self.delimiters.values() if p == priority)
1082 def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
1083 """In a for loop, or comprehension, the variables are often unpacks.
1085 To avoid splitting on the comma in this situation, increase the depth of
1086 tokens between `for` and `in`.
1088 if leaf.type == token.NAME and leaf.value == "for":
1090 self._for_loop_depths.append(self.depth)
1095 def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
1096 """See `maybe_increment_for_loop_variable` above for explanation."""
1098 self._for_loop_depths
1099 and self._for_loop_depths[-1] == self.depth
1100 and leaf.type == token.NAME
1101 and leaf.value == "in"
1104 self._for_loop_depths.pop()
1109 def maybe_increment_lambda_arguments(self, leaf: Leaf) -> bool:
1110 """In a lambda expression, there might be more than one argument.
1112 To avoid splitting on the comma in this situation, increase the depth of
1113 tokens between `lambda` and `:`.
1115 if leaf.type == token.NAME and leaf.value == "lambda":
1117 self._lambda_argument_depths.append(self.depth)
1122 def maybe_decrement_after_lambda_arguments(self, leaf: Leaf) -> bool:
1123 """See `maybe_increment_lambda_arguments` above for explanation."""
1125 self._lambda_argument_depths
1126 and self._lambda_argument_depths[-1] == self.depth
1127 and leaf.type == token.COLON
1130 self._lambda_argument_depths.pop()
1135 def get_open_lsqb(self) -> Optional[Leaf]:
1136 """Return the most recent opening square bracket (if any)."""
1137 return self.bracket_match.get((self.depth - 1, token.RSQB))
1142 """Holds leaves and comments. Can be printed with `str(line)`."""
1145 leaves: List[Leaf] = Factory(list)
1146 comments: Dict[LeafID, List[Leaf]] = Factory(dict) # keys ordered like `leaves`
1147 bracket_tracker: BracketTracker = Factory(BracketTracker)
1148 inside_brackets: bool = False
1149 should_explode: bool = False
1151 def append(self, leaf: Leaf, preformatted: bool = False) -> None:
1152 """Add a new `leaf` to the end of the line.
1154 Unless `preformatted` is True, the `leaf` will receive a new consistent
1155 whitespace prefix and metadata applied by :class:`BracketTracker`.
1156 Trailing commas are maybe removed, unpacked for loop variables are
1157 demoted from being delimiters.
1159 Inline comments are put aside.
1161 has_value = leaf.type in BRACKETS or bool(leaf.value.strip())
1165 if token.COLON == leaf.type and self.is_class_paren_empty:
1166 del self.leaves[-2:]
1167 if self.leaves and not preformatted:
1168 # Note: at this point leaf.prefix should be empty except for
1169 # imports, for which we only preserve newlines.
1170 leaf.prefix += whitespace(
1171 leaf, complex_subscript=self.is_complex_subscript(leaf)
1173 if self.inside_brackets or not preformatted:
1174 self.bracket_tracker.mark(leaf)
1175 self.maybe_remove_trailing_comma(leaf)
1176 if not self.append_comment(leaf):
1177 self.leaves.append(leaf)
1179 def append_safe(self, leaf: Leaf, preformatted: bool = False) -> None:
1180 """Like :func:`append()` but disallow invalid standalone comment structure.
1182 Raises ValueError when any `leaf` is appended after a standalone comment
1183 or when a standalone comment is not the first leaf on the line.
1185 if self.bracket_tracker.depth == 0:
1187 raise ValueError("cannot append to standalone comments")
1189 if self.leaves and leaf.type == STANDALONE_COMMENT:
1191 "cannot append standalone comments to a populated line"
1194 self.append(leaf, preformatted=preformatted)
1197 def is_comment(self) -> bool:
1198 """Is this line a standalone comment?"""
1199 return len(self.leaves) == 1 and self.leaves[0].type == STANDALONE_COMMENT
1202 def is_decorator(self) -> bool:
1203 """Is this line a decorator?"""
1204 return bool(self) and self.leaves[0].type == token.AT
1207 def is_import(self) -> bool:
1208 """Is this an import line?"""
1209 return bool(self) and is_import(self.leaves[0])
1212 def is_class(self) -> bool:
1213 """Is this line a class definition?"""
1216 and self.leaves[0].type == token.NAME
1217 and self.leaves[0].value == "class"
1221 def is_stub_class(self) -> bool:
1222 """Is this line a class definition with a body consisting only of "..."?"""
1223 return self.is_class and self.leaves[-3:] == [
1224 Leaf(token.DOT, ".") for _ in range(3)
1228 def is_def(self) -> bool:
1229 """Is this a function definition? (Also returns True for async defs.)"""
1231 first_leaf = self.leaves[0]
1236 second_leaf: Optional[Leaf] = self.leaves[1]
1239 return (first_leaf.type == token.NAME and first_leaf.value == "def") or (
1240 first_leaf.type == token.ASYNC
1241 and second_leaf is not None
1242 and second_leaf.type == token.NAME
1243 and second_leaf.value == "def"
1247 def is_class_paren_empty(self) -> bool:
1248 """Is this a class with no base classes but using parentheses?
1250 Those are unnecessary and should be removed.
1254 and len(self.leaves) == 4
1256 and self.leaves[2].type == token.LPAR
1257 and self.leaves[2].value == "("
1258 and self.leaves[3].type == token.RPAR
1259 and self.leaves[3].value == ")"
1263 def is_triple_quoted_string(self) -> bool:
1264 """Is the line a triple quoted string?"""
1267 and self.leaves[0].type == token.STRING
1268 and self.leaves[0].value.startswith(('"""', "'''"))
1271 def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
1272 """If so, needs to be split before emitting."""
1273 for leaf in self.leaves:
1274 if leaf.type == STANDALONE_COMMENT:
1275 if leaf.bracket_depth <= depth_limit:
1279 def contains_inner_type_comments(self) -> bool:
1282 last_leaf = self.leaves[-1]
1283 ignored_ids.add(id(last_leaf))
1284 if last_leaf.type == token.COMMA:
1285 # When trailing commas are inserted by Black for consistency, comments
1286 # after the previous last element are not moved (they don't have to,
1287 # rendering will still be correct). So we ignore trailing commas.
1288 last_leaf = self.leaves[-2]
1289 ignored_ids.add(id(last_leaf))
1293 for leaf_id, comments in self.comments.items():
1294 if leaf_id in ignored_ids:
1297 for comment in comments:
1298 if is_type_comment(comment):
1303 def contains_multiline_strings(self) -> bool:
1304 for leaf in self.leaves:
1305 if is_multiline_string(leaf):
1310 def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
1311 """Remove trailing comma if there is one and it's safe."""
1314 and self.leaves[-1].type == token.COMMA
1315 and closing.type in CLOSING_BRACKETS
1319 if closing.type == token.RBRACE:
1320 self.remove_trailing_comma()
1323 if closing.type == token.RSQB:
1324 comma = self.leaves[-1]
1325 if comma.parent and comma.parent.type == syms.listmaker:
1326 self.remove_trailing_comma()
1329 # For parens let's check if it's safe to remove the comma.
1330 # Imports are always safe.
1332 self.remove_trailing_comma()
1335 # Otherwise, if the trailing one is the only one, we might mistakenly
1336 # change a tuple into a different type by removing the comma.
1337 depth = closing.bracket_depth + 1
1339 opening = closing.opening_bracket
1340 for _opening_index, leaf in enumerate(self.leaves):
1347 for leaf in self.leaves[_opening_index + 1 :]:
1351 bracket_depth = leaf.bracket_depth
1352 if bracket_depth == depth and leaf.type == token.COMMA:
1354 if leaf.parent and leaf.parent.type == syms.arglist:
1359 self.remove_trailing_comma()
1364 def append_comment(self, comment: Leaf) -> bool:
1365 """Add an inline or standalone comment to the line."""
1367 comment.type == STANDALONE_COMMENT
1368 and self.bracket_tracker.any_open_brackets()
1373 if comment.type != token.COMMENT:
1377 comment.type = STANDALONE_COMMENT
1381 self.comments.setdefault(id(self.leaves[-1]), []).append(comment)
1384 def comments_after(self, leaf: Leaf) -> List[Leaf]:
1385 """Generate comments that should appear directly after `leaf`."""
1386 return self.comments.get(id(leaf), [])
1388 def remove_trailing_comma(self) -> None:
1389 """Remove the trailing comma and moves the comments attached to it."""
1390 trailing_comma = self.leaves.pop()
1391 trailing_comma_comments = self.comments.pop(id(trailing_comma), [])
1392 self.comments.setdefault(id(self.leaves[-1]), []).extend(
1393 trailing_comma_comments
1396 def is_complex_subscript(self, leaf: Leaf) -> bool:
1397 """Return True iff `leaf` is part of a slice with non-trivial exprs."""
1398 open_lsqb = self.bracket_tracker.get_open_lsqb()
1399 if open_lsqb is None:
1402 subscript_start = open_lsqb.next_sibling
1404 if isinstance(subscript_start, Node):
1405 if subscript_start.type == syms.listmaker:
1408 if subscript_start.type == syms.subscriptlist:
1409 subscript_start = child_towards(subscript_start, leaf)
1410 return subscript_start is not None and any(
1411 n.type in TEST_DESCENDANTS for n in subscript_start.pre_order()
1414 def __str__(self) -> str:
1415 """Render the line."""
1419 indent = " " * self.depth
1420 leaves = iter(self.leaves)
1421 first = next(leaves)
1422 res = f"{first.prefix}{indent}{first.value}"
1425 for comment in itertools.chain.from_iterable(self.comments.values()):
1429 def __bool__(self) -> bool:
1430 """Return True if the line has leaves or comments."""
1431 return bool(self.leaves or self.comments)
1435 class EmptyLineTracker:
1436 """Provides a stateful method that returns the number of potential extra
1437 empty lines needed before and after the currently processed line.
1439 Note: this tracker works on lines that haven't been split yet. It assumes
1440 the prefix of the first leaf consists of optional newlines. Those newlines
1441 are consumed by `maybe_empty_lines()` and included in the computation.
1444 is_pyi: bool = False
1445 previous_line: Optional[Line] = None
1446 previous_after: int = 0
1447 previous_defs: List[int] = Factory(list)
1449 def maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1450 """Return the number of extra empty lines before and after the `current_line`.
1452 This is for separating `def`, `async def` and `class` with extra empty
1453 lines (two on module-level).
1455 before, after = self._maybe_empty_lines(current_line)
1456 before -= self.previous_after
1457 self.previous_after = after
1458 self.previous_line = current_line
1459 return before, after
1461 def _maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1463 if current_line.depth == 0:
1464 max_allowed = 1 if self.is_pyi else 2
1465 if current_line.leaves:
1466 # Consume the first leaf's extra newlines.
1467 first_leaf = current_line.leaves[0]
1468 before = first_leaf.prefix.count("\n")
1469 before = min(before, max_allowed)
1470 first_leaf.prefix = ""
1473 depth = current_line.depth
1474 while self.previous_defs and self.previous_defs[-1] >= depth:
1475 self.previous_defs.pop()
1477 before = 0 if depth else 1
1479 before = 1 if depth else 2
1480 if current_line.is_decorator or current_line.is_def or current_line.is_class:
1481 return self._maybe_empty_lines_for_class_or_def(current_line, before)
1485 and self.previous_line.is_import
1486 and not current_line.is_import
1487 and depth == self.previous_line.depth
1489 return (before or 1), 0
1493 and self.previous_line.is_class
1494 and current_line.is_triple_quoted_string
1500 def _maybe_empty_lines_for_class_or_def(
1501 self, current_line: Line, before: int
1502 ) -> Tuple[int, int]:
1503 if not current_line.is_decorator:
1504 self.previous_defs.append(current_line.depth)
1505 if self.previous_line is None:
1506 # Don't insert empty lines before the first line in the file.
1509 if self.previous_line.is_decorator:
1512 if self.previous_line.depth < current_line.depth and (
1513 self.previous_line.is_class or self.previous_line.is_def
1518 self.previous_line.is_comment
1519 and self.previous_line.depth == current_line.depth
1525 if self.previous_line.depth > current_line.depth:
1527 elif current_line.is_class or self.previous_line.is_class:
1528 if current_line.is_stub_class and self.previous_line.is_stub_class:
1529 # No blank line between classes with an empty body
1533 elif current_line.is_def and not self.previous_line.is_def:
1534 # Blank line between a block of functions and a block of non-functions
1540 if current_line.depth and newlines:
1546 class LineGenerator(Visitor[Line]):
1547 """Generates reformatted Line objects. Empty lines are not emitted.
1549 Note: destroys the tree it's visiting by mutating prefixes of its leaves
1550 in ways that will no longer stringify to valid Python code on the tree.
1553 is_pyi: bool = False
1554 normalize_strings: bool = True
1555 current_line: Line = Factory(Line)
1556 remove_u_prefix: bool = False
1558 def line(self, indent: int = 0) -> Iterator[Line]:
1561 If the line is empty, only emit if it makes sense.
1562 If the line is too long, split it first and then generate.
1564 If any lines were generated, set up a new current_line.
1566 if not self.current_line:
1567 self.current_line.depth += indent
1568 return # Line is empty, don't emit. Creating a new one unnecessary.
1570 complete_line = self.current_line
1571 self.current_line = Line(depth=complete_line.depth + indent)
1574 def visit_default(self, node: LN) -> Iterator[Line]:
1575 """Default `visit_*()` implementation. Recurses to children of `node`."""
1576 if isinstance(node, Leaf):
1577 any_open_brackets = self.current_line.bracket_tracker.any_open_brackets()
1578 for comment in generate_comments(node):
1579 if any_open_brackets:
1580 # any comment within brackets is subject to splitting
1581 self.current_line.append(comment)
1582 elif comment.type == token.COMMENT:
1583 # regular trailing comment
1584 self.current_line.append(comment)
1585 yield from self.line()
1588 # regular standalone comment
1589 yield from self.line()
1591 self.current_line.append(comment)
1592 yield from self.line()
1594 normalize_prefix(node, inside_brackets=any_open_brackets)
1595 if self.normalize_strings and node.type == token.STRING:
1596 normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
1597 normalize_string_quotes(node)
1598 if node.type == token.NUMBER:
1599 normalize_numeric_literal(node)
1600 if node.type not in WHITESPACE:
1601 self.current_line.append(node)
1602 yield from super().visit_default(node)
1604 def visit_atom(self, node: Node) -> Iterator[Line]:
1605 # Always make parentheses invisible around a single node, because it should
1606 # not be needed (except in the case of yield, where removing the parentheses
1607 # produces a SyntaxError).
1609 len(node.children) == 3
1610 and isinstance(node.children[0], Leaf)
1611 and node.children[0].type == token.LPAR
1612 and isinstance(node.children[2], Leaf)
1613 and node.children[2].type == token.RPAR
1614 and isinstance(node.children[1], Leaf)
1616 node.children[1].type == token.NAME
1617 and node.children[1].value == "yield"
1620 node.children[0].value = ""
1621 node.children[2].value = ""
1622 yield from super().visit_default(node)
1624 def visit_INDENT(self, node: Node) -> Iterator[Line]:
1625 """Increase indentation level, maybe yield a line."""
1626 # In blib2to3 INDENT never holds comments.
1627 yield from self.line(+1)
1628 yield from self.visit_default(node)
1630 def visit_DEDENT(self, node: Node) -> Iterator[Line]:
1631 """Decrease indentation level, maybe yield a line."""
1632 # The current line might still wait for trailing comments. At DEDENT time
1633 # there won't be any (they would be prefixes on the preceding NEWLINE).
1634 # Emit the line then.
1635 yield from self.line()
1637 # While DEDENT has no value, its prefix may contain standalone comments
1638 # that belong to the current indentation level. Get 'em.
1639 yield from self.visit_default(node)
1641 # Finally, emit the dedent.
1642 yield from self.line(-1)
1645 self, node: Node, keywords: Set[str], parens: Set[str]
1646 ) -> Iterator[Line]:
1647 """Visit a statement.
1649 This implementation is shared for `if`, `while`, `for`, `try`, `except`,
1650 `def`, `with`, `class`, `assert` and assignments.
1652 The relevant Python language `keywords` for a given statement will be
1653 NAME leaves within it. This methods puts those on a separate line.
1655 `parens` holds a set of string leaf values immediately after which
1656 invisible parens should be put.
1658 normalize_invisible_parens(node, parens_after=parens)
1659 for child in node.children:
1660 if child.type == token.NAME and child.value in keywords: # type: ignore
1661 yield from self.line()
1663 yield from self.visit(child)
1665 def visit_suite(self, node: Node) -> Iterator[Line]:
1666 """Visit a suite."""
1667 if self.is_pyi and is_stub_suite(node):
1668 yield from self.visit(node.children[2])
1670 yield from self.visit_default(node)
1672 def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
1673 """Visit a statement without nested statements."""
1674 is_suite_like = node.parent and node.parent.type in STATEMENT
1676 if self.is_pyi and is_stub_body(node):
1677 yield from self.visit_default(node)
1679 yield from self.line(+1)
1680 yield from self.visit_default(node)
1681 yield from self.line(-1)
1684 if not self.is_pyi or not node.parent or not is_stub_suite(node.parent):
1685 yield from self.line()
1686 yield from self.visit_default(node)
1688 def visit_async_stmt(self, node: Node) -> Iterator[Line]:
1689 """Visit `async def`, `async for`, `async with`."""
1690 yield from self.line()
1692 children = iter(node.children)
1693 for child in children:
1694 yield from self.visit(child)
1696 if child.type == token.ASYNC:
1699 internal_stmt = next(children)
1700 for child in internal_stmt.children:
1701 yield from self.visit(child)
1703 def visit_decorators(self, node: Node) -> Iterator[Line]:
1704 """Visit decorators."""
1705 for child in node.children:
1706 yield from self.line()
1707 yield from self.visit(child)
1709 def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
1710 """Remove a semicolon and put the other statement on a separate line."""
1711 yield from self.line()
1713 def visit_ENDMARKER(self, leaf: Leaf) -> Iterator[Line]:
1714 """End of file. Process outstanding comments and end with a newline."""
1715 yield from self.visit_default(leaf)
1716 yield from self.line()
1718 def visit_STANDALONE_COMMENT(self, leaf: Leaf) -> Iterator[Line]:
1719 if not self.current_line.bracket_tracker.any_open_brackets():
1720 yield from self.line()
1721 yield from self.visit_default(leaf)
1723 def __attrs_post_init__(self) -> None:
1724 """You are in a twisty little maze of passages."""
1727 self.visit_assert_stmt = partial(v, keywords={"assert"}, parens={"assert", ","})
1728 self.visit_if_stmt = partial(
1729 v, keywords={"if", "else", "elif"}, parens={"if", "elif"}
1731 self.visit_while_stmt = partial(v, keywords={"while", "else"}, parens={"while"})
1732 self.visit_for_stmt = partial(v, keywords={"for", "else"}, parens={"for", "in"})
1733 self.visit_try_stmt = partial(
1734 v, keywords={"try", "except", "else", "finally"}, parens=Ø
1736 self.visit_except_clause = partial(v, keywords={"except"}, parens=Ø)
1737 self.visit_with_stmt = partial(v, keywords={"with"}, parens=Ø)
1738 self.visit_funcdef = partial(v, keywords={"def"}, parens=Ø)
1739 self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
1740 self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
1741 self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
1742 self.visit_import_from = partial(v, keywords=Ø, parens={"import"})
1743 self.visit_del_stmt = partial(v, keywords=Ø, parens={"del"})
1744 self.visit_async_funcdef = self.visit_async_stmt
1745 self.visit_decorated = self.visit_decorators
1748 IMPLICIT_TUPLE = {syms.testlist, syms.testlist_star_expr, syms.exprlist}
1749 BRACKET = {token.LPAR: token.RPAR, token.LSQB: token.RSQB, token.LBRACE: token.RBRACE}
1750 OPENING_BRACKETS = set(BRACKET.keys())
1751 CLOSING_BRACKETS = set(BRACKET.values())
1752 BRACKETS = OPENING_BRACKETS | CLOSING_BRACKETS
1753 ALWAYS_NO_SPACE = CLOSING_BRACKETS | {token.COMMA, STANDALONE_COMMENT}
1756 def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str: # noqa: C901
1757 """Return whitespace prefix if needed for the given `leaf`.
1759 `complex_subscript` signals whether the given leaf is part of a subscription
1760 which has non-trivial arguments, like arithmetic expressions or function calls.
1768 if t in ALWAYS_NO_SPACE:
1771 if t == token.COMMENT:
1774 assert p is not None, f"INTERNAL ERROR: hand-made leaf without parent: {leaf!r}"
1775 if t == token.COLON and p.type not in {
1782 prev = leaf.prev_sibling
1784 prevp = preceding_leaf(p)
1785 if not prevp or prevp.type in OPENING_BRACKETS:
1788 if t == token.COLON:
1789 if prevp.type == token.COLON:
1792 elif prevp.type != token.COMMA and not complex_subscript:
1797 if prevp.type == token.EQUAL:
1799 if prevp.parent.type in {
1807 elif prevp.parent.type == syms.typedargslist:
1808 # A bit hacky: if the equal sign has whitespace, it means we
1809 # previously found it's a typed argument. So, we're using
1813 elif prevp.type in STARS:
1814 if is_vararg(prevp, within=VARARGS_PARENTS | UNPACKING_PARENTS):
1817 elif prevp.type == token.COLON:
1818 if prevp.parent and prevp.parent.type in {syms.subscript, syms.sliceop}:
1819 return SPACE if complex_subscript else NO
1823 and prevp.parent.type == syms.factor
1824 and prevp.type in MATH_OPERATORS
1829 prevp.type == token.RIGHTSHIFT
1831 and prevp.parent.type == syms.shift_expr
1832 and prevp.prev_sibling
1833 and prevp.prev_sibling.type == token.NAME
1834 and prevp.prev_sibling.value == "print" # type: ignore
1836 # Python 2 print chevron
1839 elif prev.type in OPENING_BRACKETS:
1842 if p.type in {syms.parameters, syms.arglist}:
1843 # untyped function signatures or calls
1844 if not prev or prev.type != token.COMMA:
1847 elif p.type == syms.varargslist:
1849 if prev and prev.type != token.COMMA:
1852 elif p.type == syms.typedargslist:
1853 # typed function signatures
1857 if t == token.EQUAL:
1858 if prev.type != syms.tname:
1861 elif prev.type == token.EQUAL:
1862 # A bit hacky: if the equal sign has whitespace, it means we
1863 # previously found it's a typed argument. So, we're using that, too.
1866 elif prev.type != token.COMMA:
1869 elif p.type == syms.tname:
1872 prevp = preceding_leaf(p)
1873 if not prevp or prevp.type != token.COMMA:
1876 elif p.type == syms.trailer:
1877 # attributes and calls
1878 if t == token.LPAR or t == token.RPAR:
1883 prevp = preceding_leaf(p)
1884 if not prevp or prevp.type != token.NUMBER:
1887 elif t == token.LSQB:
1890 elif prev.type != token.COMMA:
1893 elif p.type == syms.argument:
1895 if t == token.EQUAL:
1899 prevp = preceding_leaf(p)
1900 if not prevp or prevp.type == token.LPAR:
1903 elif prev.type in {token.EQUAL} | STARS:
1906 elif p.type == syms.decorator:
1910 elif p.type == syms.dotted_name:
1914 prevp = preceding_leaf(p)
1915 if not prevp or prevp.type == token.AT or prevp.type == token.DOT:
1918 elif p.type == syms.classdef:
1922 if prev and prev.type == token.LPAR:
1925 elif p.type in {syms.subscript, syms.sliceop}:
1928 assert p.parent is not None, "subscripts are always parented"
1929 if p.parent.type == syms.subscriptlist:
1934 elif not complex_subscript:
1937 elif p.type == syms.atom:
1938 if prev and t == token.DOT:
1939 # dots, but not the first one.
1942 elif p.type == syms.dictsetmaker:
1944 if prev and prev.type == token.DOUBLESTAR:
1947 elif p.type in {syms.factor, syms.star_expr}:
1950 prevp = preceding_leaf(p)
1951 if not prevp or prevp.type in OPENING_BRACKETS:
1954 prevp_parent = prevp.parent
1955 assert prevp_parent is not None
1956 if prevp.type == token.COLON and prevp_parent.type in {
1962 elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
1965 elif t in {token.NAME, token.NUMBER, token.STRING}:
1968 elif p.type == syms.import_from:
1970 if prev and prev.type == token.DOT:
1973 elif t == token.NAME:
1977 if prev and prev.type == token.DOT:
1980 elif p.type == syms.sliceop:
1986 def preceding_leaf(node: Optional[LN]) -> Optional[Leaf]:
1987 """Return the first leaf that precedes `node`, if any."""
1989 res = node.prev_sibling
1991 if isinstance(res, Leaf):
1995 return list(res.leaves())[-1]
2004 def child_towards(ancestor: Node, descendant: LN) -> Optional[LN]:
2005 """Return the child of `ancestor` that contains `descendant`."""
2006 node: Optional[LN] = descendant
2007 while node and node.parent != ancestor:
2012 def container_of(leaf: Leaf) -> LN:
2013 """Return `leaf` or one of its ancestors that is the topmost container of it.
2015 By "container" we mean a node where `leaf` is the very first child.
2017 same_prefix = leaf.prefix
2018 container: LN = leaf
2020 parent = container.parent
2024 if parent.children[0].prefix != same_prefix:
2027 if parent.type == syms.file_input:
2030 if parent.prev_sibling is not None and parent.prev_sibling.type in BRACKETS:
2037 def is_split_after_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> int:
2038 """Return the priority of the `leaf` delimiter, given a line break after it.
2040 The delimiter priorities returned here are from those delimiters that would
2041 cause a line break after themselves.
2043 Higher numbers are higher priority.
2045 if leaf.type == token.COMMA:
2046 return COMMA_PRIORITY
2051 def is_split_before_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> int:
2052 """Return the priority of the `leaf` delimiter, given a line break before it.
2054 The delimiter priorities returned here are from those delimiters that would
2055 cause a line break before themselves.
2057 Higher numbers are higher priority.
2059 if is_vararg(leaf, within=VARARGS_PARENTS | UNPACKING_PARENTS):
2060 # * and ** might also be MATH_OPERATORS but in this case they are not.
2061 # Don't treat them as a delimiter.
2065 leaf.type == token.DOT
2067 and leaf.parent.type not in {syms.import_from, syms.dotted_name}
2068 and (previous is None or previous.type in CLOSING_BRACKETS)
2073 leaf.type in MATH_OPERATORS
2075 and leaf.parent.type not in {syms.factor, syms.star_expr}
2077 return MATH_PRIORITIES[leaf.type]
2079 if leaf.type in COMPARATORS:
2080 return COMPARATOR_PRIORITY
2083 leaf.type == token.STRING
2084 and previous is not None
2085 and previous.type == token.STRING
2087 return STRING_PRIORITY
2089 if leaf.type not in {token.NAME, token.ASYNC}:
2095 and leaf.parent.type in {syms.comp_for, syms.old_comp_for}
2096 or leaf.type == token.ASYNC
2099 not isinstance(leaf.prev_sibling, Leaf)
2100 or leaf.prev_sibling.value != "async"
2102 return COMPREHENSION_PRIORITY
2107 and leaf.parent.type in {syms.comp_if, syms.old_comp_if}
2109 return COMPREHENSION_PRIORITY
2111 if leaf.value in {"if", "else"} and leaf.parent and leaf.parent.type == syms.test:
2112 return TERNARY_PRIORITY
2114 if leaf.value == "is":
2115 return COMPARATOR_PRIORITY
2120 and leaf.parent.type in {syms.comp_op, syms.comparison}
2122 previous is not None
2123 and previous.type == token.NAME
2124 and previous.value == "not"
2127 return COMPARATOR_PRIORITY
2132 and leaf.parent.type == syms.comp_op
2134 previous is not None
2135 and previous.type == token.NAME
2136 and previous.value == "is"
2139 return COMPARATOR_PRIORITY
2141 if leaf.value in LOGIC_OPERATORS and leaf.parent:
2142 return LOGIC_PRIORITY
2147 FMT_OFF = {"# fmt: off", "# fmt:off", "# yapf: disable"}
2148 FMT_ON = {"# fmt: on", "# fmt:on", "# yapf: enable"}
2151 def generate_comments(leaf: LN) -> Iterator[Leaf]:
2152 """Clean the prefix of the `leaf` and generate comments from it, if any.
2154 Comments in lib2to3 are shoved into the whitespace prefix. This happens
2155 in `pgen2/driver.py:Driver.parse_tokens()`. This was a brilliant implementation
2156 move because it does away with modifying the grammar to include all the
2157 possible places in which comments can be placed.
2159 The sad consequence for us though is that comments don't "belong" anywhere.
2160 This is why this function generates simple parentless Leaf objects for
2161 comments. We simply don't know what the correct parent should be.
2163 No matter though, we can live without this. We really only need to
2164 differentiate between inline and standalone comments. The latter don't
2165 share the line with any code.
2167 Inline comments are emitted as regular token.COMMENT leaves. Standalone
2168 are emitted with a fake STANDALONE_COMMENT token identifier.
2170 for pc in list_comments(leaf.prefix, is_endmarker=leaf.type == token.ENDMARKER):
2171 yield Leaf(pc.type, pc.value, prefix="\n" * pc.newlines)
2176 """Describes a piece of syntax that is a comment.
2178 It's not a :class:`blib2to3.pytree.Leaf` so that:
2180 * it can be cached (`Leaf` objects should not be reused more than once as
2181 they store their lineno, column, prefix, and parent information);
2182 * `newlines` and `consumed` fields are kept separate from the `value`. This
2183 simplifies handling of special marker comments like ``# fmt: off/on``.
2186 type: int # token.COMMENT or STANDALONE_COMMENT
2187 value: str # content of the comment
2188 newlines: int # how many newlines before the comment
2189 consumed: int # how many characters of the original leaf's prefix did we consume
2192 @lru_cache(maxsize=4096)
2193 def list_comments(prefix: str, *, is_endmarker: bool) -> List[ProtoComment]:
2194 """Return a list of :class:`ProtoComment` objects parsed from the given `prefix`."""
2195 result: List[ProtoComment] = []
2196 if not prefix or "#" not in prefix:
2202 for index, line in enumerate(prefix.split("\n")):
2203 consumed += len(line) + 1 # adding the length of the split '\n'
2204 line = line.lstrip()
2207 if not line.startswith("#"):
2208 # Escaped newlines outside of a comment are not really newlines at
2209 # all. We treat a single-line comment following an escaped newline
2210 # as a simple trailing comment.
2211 if line.endswith("\\"):
2215 if index == ignored_lines and not is_endmarker:
2216 comment_type = token.COMMENT # simple trailing comment
2218 comment_type = STANDALONE_COMMENT
2219 comment = make_comment(line)
2222 type=comment_type, value=comment, newlines=nlines, consumed=consumed
2229 def make_comment(content: str) -> str:
2230 """Return a consistently formatted comment from the given `content` string.
2232 All comments (except for "##", "#!", "#:", '#'", "#%%") should have a single
2233 space between the hash sign and the content.
2235 If `content` didn't start with a hash sign, one is provided.
2237 content = content.rstrip()
2241 if content[0] == "#":
2242 content = content[1:]
2243 if content and content[0] not in " !:#'%":
2244 content = " " + content
2245 return "#" + content
2251 inner: bool = False,
2252 features: Collection[Feature] = (),
2253 ) -> Iterator[Line]:
2254 """Split a `line` into potentially many lines.
2256 They should fit in the allotted `line_length` but might not be able to.
2257 `inner` signifies that there were a pair of brackets somewhere around the
2258 current `line`, possibly transitively. This means we can fallback to splitting
2259 by delimiters if the LHS/RHS don't yield any results.
2261 `features` are syntactical features that may be used in the output.
2267 line_str = str(line).strip("\n")
2270 not line.contains_inner_type_comments()
2271 and not line.should_explode
2272 and is_line_short_enough(line, line_length=line_length, line_str=line_str)
2277 split_funcs: List[SplitFunc]
2279 split_funcs = [left_hand_split]
2282 def rhs(line: Line, features: Collection[Feature]) -> Iterator[Line]:
2283 for omit in generate_trailers_to_omit(line, line_length):
2284 lines = list(right_hand_split(line, line_length, features, omit=omit))
2285 if is_line_short_enough(lines[0], line_length=line_length):
2289 # All splits failed, best effort split with no omits.
2290 # This mostly happens to multiline strings that are by definition
2291 # reported as not fitting a single line.
2292 yield from right_hand_split(line, line_length, features=features)
2294 if line.inside_brackets:
2295 split_funcs = [delimiter_split, standalone_comment_split, rhs]
2298 for split_func in split_funcs:
2299 # We are accumulating lines in `result` because we might want to abort
2300 # mission and return the original line in the end, or attempt a different
2302 result: List[Line] = []
2304 for l in split_func(line, features):
2305 if str(l).strip("\n") == line_str:
2306 raise CannotSplit("Split function returned an unchanged result")
2310 l, line_length=line_length, inner=True, features=features
2324 def left_hand_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2325 """Split line into many lines, starting with the first matching bracket pair.
2327 Note: this usually looks weird, only use this for function definitions.
2328 Prefer RHS otherwise. This is why this function is not symmetrical with
2329 :func:`right_hand_split` which also handles optional parentheses.
2331 tail_leaves: List[Leaf] = []
2332 body_leaves: List[Leaf] = []
2333 head_leaves: List[Leaf] = []
2334 current_leaves = head_leaves
2335 matching_bracket = None
2336 for leaf in line.leaves:
2338 current_leaves is body_leaves
2339 and leaf.type in CLOSING_BRACKETS
2340 and leaf.opening_bracket is matching_bracket
2342 current_leaves = tail_leaves if body_leaves else head_leaves
2343 current_leaves.append(leaf)
2344 if current_leaves is head_leaves:
2345 if leaf.type in OPENING_BRACKETS:
2346 matching_bracket = leaf
2347 current_leaves = body_leaves
2348 if not matching_bracket:
2349 raise CannotSplit("No brackets found")
2351 head = bracket_split_build_line(head_leaves, line, matching_bracket)
2352 body = bracket_split_build_line(body_leaves, line, matching_bracket, is_body=True)
2353 tail = bracket_split_build_line(tail_leaves, line, matching_bracket)
2354 bracket_split_succeeded_or_raise(head, body, tail)
2355 for result in (head, body, tail):
2360 def right_hand_split(
2363 features: Collection[Feature] = (),
2364 omit: Collection[LeafID] = (),
2365 ) -> Iterator[Line]:
2366 """Split line into many lines, starting with the last matching bracket pair.
2368 If the split was by optional parentheses, attempt splitting without them, too.
2369 `omit` is a collection of closing bracket IDs that shouldn't be considered for
2372 Note: running this function modifies `bracket_depth` on the leaves of `line`.
2374 tail_leaves: List[Leaf] = []
2375 body_leaves: List[Leaf] = []
2376 head_leaves: List[Leaf] = []
2377 current_leaves = tail_leaves
2378 opening_bracket = None
2379 closing_bracket = None
2380 for leaf in reversed(line.leaves):
2381 if current_leaves is body_leaves:
2382 if leaf is opening_bracket:
2383 current_leaves = head_leaves if body_leaves else tail_leaves
2384 current_leaves.append(leaf)
2385 if current_leaves is tail_leaves:
2386 if leaf.type in CLOSING_BRACKETS and id(leaf) not in omit:
2387 opening_bracket = leaf.opening_bracket
2388 closing_bracket = leaf
2389 current_leaves = body_leaves
2390 if not (opening_bracket and closing_bracket and head_leaves):
2391 # If there is no opening or closing_bracket that means the split failed and
2392 # all content is in the tail. Otherwise, if `head_leaves` are empty, it means
2393 # the matching `opening_bracket` wasn't available on `line` anymore.
2394 raise CannotSplit("No brackets found")
2396 tail_leaves.reverse()
2397 body_leaves.reverse()
2398 head_leaves.reverse()
2399 head = bracket_split_build_line(head_leaves, line, opening_bracket)
2400 body = bracket_split_build_line(body_leaves, line, opening_bracket, is_body=True)
2401 tail = bracket_split_build_line(tail_leaves, line, opening_bracket)
2402 bracket_split_succeeded_or_raise(head, body, tail)
2404 # the body shouldn't be exploded
2405 not body.should_explode
2406 # the opening bracket is an optional paren
2407 and opening_bracket.type == token.LPAR
2408 and not opening_bracket.value
2409 # the closing bracket is an optional paren
2410 and closing_bracket.type == token.RPAR
2411 and not closing_bracket.value
2412 # it's not an import (optional parens are the only thing we can split on
2413 # in this case; attempting a split without them is a waste of time)
2414 and not line.is_import
2415 # there are no standalone comments in the body
2416 and not body.contains_standalone_comments(0)
2417 # and we can actually remove the parens
2418 and can_omit_invisible_parens(body, line_length)
2420 omit = {id(closing_bracket), *omit}
2422 yield from right_hand_split(line, line_length, features=features, omit=omit)
2428 or is_line_short_enough(body, line_length=line_length)
2431 "Splitting failed, body is still too long and can't be split."
2434 elif head.contains_multiline_strings() or tail.contains_multiline_strings():
2436 "The current optional pair of parentheses is bound to fail to "
2437 "satisfy the splitting algorithm because the head or the tail "
2438 "contains multiline strings which by definition never fit one "
2442 ensure_visible(opening_bracket)
2443 ensure_visible(closing_bracket)
2444 for result in (head, body, tail):
2449 def bracket_split_succeeded_or_raise(head: Line, body: Line, tail: Line) -> None:
2450 """Raise :exc:`CannotSplit` if the last left- or right-hand split failed.
2452 Do nothing otherwise.
2454 A left- or right-hand split is based on a pair of brackets. Content before
2455 (and including) the opening bracket is left on one line, content inside the
2456 brackets is put on a separate line, and finally content starting with and
2457 following the closing bracket is put on a separate line.
2459 Those are called `head`, `body`, and `tail`, respectively. If the split
2460 produced the same line (all content in `head`) or ended up with an empty `body`
2461 and the `tail` is just the closing bracket, then it's considered failed.
2463 tail_len = len(str(tail).strip())
2466 raise CannotSplit("Splitting brackets produced the same line")
2470 f"Splitting brackets on an empty body to save "
2471 f"{tail_len} characters is not worth it"
2475 def bracket_split_build_line(
2476 leaves: List[Leaf], original: Line, opening_bracket: Leaf, *, is_body: bool = False
2478 """Return a new line with given `leaves` and respective comments from `original`.
2480 If `is_body` is True, the result line is one-indented inside brackets and as such
2481 has its first leaf's prefix normalized and a trailing comma added when expected.
2483 result = Line(depth=original.depth)
2485 result.inside_brackets = True
2488 # Since body is a new indent level, remove spurious leading whitespace.
2489 normalize_prefix(leaves[0], inside_brackets=True)
2490 # Ensure a trailing comma for imports, but be careful not to add one after
2492 if original.is_import:
2493 for i in range(len(leaves) - 1, -1, -1):
2494 if leaves[i].type == STANDALONE_COMMENT:
2496 elif leaves[i].type == token.COMMA:
2499 leaves.insert(i + 1, Leaf(token.COMMA, ","))
2503 result.append(leaf, preformatted=True)
2504 for comment_after in original.comments_after(leaf):
2505 result.append(comment_after, preformatted=True)
2507 result.should_explode = should_explode(result, opening_bracket)
2511 def dont_increase_indentation(split_func: SplitFunc) -> SplitFunc:
2512 """Normalize prefix of the first leaf in every line returned by `split_func`.
2514 This is a decorator over relevant split functions.
2518 def split_wrapper(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2519 for l in split_func(line, features):
2520 normalize_prefix(l.leaves[0], inside_brackets=True)
2523 return split_wrapper
2526 @dont_increase_indentation
2527 def delimiter_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2528 """Split according to delimiters of the highest priority.
2530 If the appropriate Features are given, the split will add trailing commas
2531 also in function signatures and calls that contain `*` and `**`.
2534 last_leaf = line.leaves[-1]
2536 raise CannotSplit("Line empty")
2538 bt = line.bracket_tracker
2540 delimiter_priority = bt.max_delimiter_priority(exclude={id(last_leaf)})
2542 raise CannotSplit("No delimiters found")
2544 if delimiter_priority == DOT_PRIORITY:
2545 if bt.delimiter_count_with_priority(delimiter_priority) == 1:
2546 raise CannotSplit("Splitting a single attribute from its owner looks wrong")
2548 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2549 lowest_depth = sys.maxsize
2550 trailing_comma_safe = True
2552 def append_to_line(leaf: Leaf) -> Iterator[Line]:
2553 """Append `leaf` to current line or to new line if appending impossible."""
2554 nonlocal current_line
2556 current_line.append_safe(leaf, preformatted=True)
2560 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2561 current_line.append(leaf)
2563 for leaf in line.leaves:
2564 yield from append_to_line(leaf)
2566 for comment_after in line.comments_after(leaf):
2567 yield from append_to_line(comment_after)
2569 lowest_depth = min(lowest_depth, leaf.bracket_depth)
2570 if leaf.bracket_depth == lowest_depth:
2571 if is_vararg(leaf, within={syms.typedargslist}):
2572 trailing_comma_safe = (
2573 trailing_comma_safe and Feature.TRAILING_COMMA_IN_DEF in features
2575 elif is_vararg(leaf, within={syms.arglist, syms.argument}):
2576 trailing_comma_safe = (
2577 trailing_comma_safe and Feature.TRAILING_COMMA_IN_CALL in features
2580 leaf_priority = bt.delimiters.get(id(leaf))
2581 if leaf_priority == delimiter_priority:
2584 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2588 and delimiter_priority == COMMA_PRIORITY
2589 and current_line.leaves[-1].type != token.COMMA
2590 and current_line.leaves[-1].type != STANDALONE_COMMENT
2592 current_line.append(Leaf(token.COMMA, ","))
2596 @dont_increase_indentation
2597 def standalone_comment_split(
2598 line: Line, features: Collection[Feature] = ()
2599 ) -> Iterator[Line]:
2600 """Split standalone comments from the rest of the line."""
2601 if not line.contains_standalone_comments(0):
2602 raise CannotSplit("Line does not have any standalone comments")
2604 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2606 def append_to_line(leaf: Leaf) -> Iterator[Line]:
2607 """Append `leaf` to current line or to new line if appending impossible."""
2608 nonlocal current_line
2610 current_line.append_safe(leaf, preformatted=True)
2614 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2615 current_line.append(leaf)
2617 for leaf in line.leaves:
2618 yield from append_to_line(leaf)
2620 for comment_after in line.comments_after(leaf):
2621 yield from append_to_line(comment_after)
2627 def is_import(leaf: Leaf) -> bool:
2628 """Return True if the given leaf starts an import statement."""
2635 (v == "import" and p and p.type == syms.import_name)
2636 or (v == "from" and p and p.type == syms.import_from)
2641 def is_type_comment(leaf: Leaf) -> bool:
2642 """Return True if the given leaf is a special comment.
2643 Only returns true for type comments for now."""
2646 return t in {token.COMMENT, t == STANDALONE_COMMENT} and v.startswith("# type:")
2649 def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
2650 """Leave existing extra newlines if not `inside_brackets`. Remove everything
2653 Note: don't use backslashes for formatting or you'll lose your voting rights.
2655 if not inside_brackets:
2656 spl = leaf.prefix.split("#")
2657 if "\\" not in spl[0]:
2658 nl_count = spl[-1].count("\n")
2661 leaf.prefix = "\n" * nl_count
2667 def normalize_string_prefix(leaf: Leaf, remove_u_prefix: bool = False) -> None:
2668 """Make all string prefixes lowercase.
2670 If remove_u_prefix is given, also removes any u prefix from the string.
2672 Note: Mutates its argument.
2674 match = re.match(r"^([furbFURB]*)(.*)$", leaf.value, re.DOTALL)
2675 assert match is not None, f"failed to match string {leaf.value!r}"
2676 orig_prefix = match.group(1)
2677 new_prefix = orig_prefix.lower()
2679 new_prefix = new_prefix.replace("u", "")
2680 leaf.value = f"{new_prefix}{match.group(2)}"
2683 def normalize_string_quotes(leaf: Leaf) -> None:
2684 """Prefer double quotes but only if it doesn't cause more escaping.
2686 Adds or removes backslashes as appropriate. Doesn't parse and fix
2687 strings nested in f-strings (yet).
2689 Note: Mutates its argument.
2691 value = leaf.value.lstrip("furbFURB")
2692 if value[:3] == '"""':
2695 elif value[:3] == "'''":
2698 elif value[0] == '"':
2704 first_quote_pos = leaf.value.find(orig_quote)
2705 if first_quote_pos == -1:
2706 return # There's an internal error
2708 prefix = leaf.value[:first_quote_pos]
2709 unescaped_new_quote = re.compile(rf"(([^\\]|^)(\\\\)*){new_quote}")
2710 escaped_new_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){new_quote}")
2711 escaped_orig_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){orig_quote}")
2712 body = leaf.value[first_quote_pos + len(orig_quote) : -len(orig_quote)]
2713 if "r" in prefix.casefold():
2714 if unescaped_new_quote.search(body):
2715 # There's at least one unescaped new_quote in this raw string
2716 # so converting is impossible
2719 # Do not introduce or remove backslashes in raw strings
2722 # remove unnecessary escapes
2723 new_body = sub_twice(escaped_new_quote, rf"\1\2{new_quote}", body)
2724 if body != new_body:
2725 # Consider the string without unnecessary escapes as the original
2727 leaf.value = f"{prefix}{orig_quote}{body}{orig_quote}"
2728 new_body = sub_twice(escaped_orig_quote, rf"\1\2{orig_quote}", new_body)
2729 new_body = sub_twice(unescaped_new_quote, rf"\1\\{new_quote}", new_body)
2730 if "f" in prefix.casefold():
2731 matches = re.findall(
2733 (?:[^{]|^)\{ # start of the string or a non-{ followed by a single {
2734 ([^{].*?) # contents of the brackets except if begins with {{
2735 \}(?:[^}]|$) # A } followed by end of the string or a non-}
2742 # Do not introduce backslashes in interpolated expressions
2744 if new_quote == '"""' and new_body[-1:] == '"':
2746 new_body = new_body[:-1] + '\\"'
2747 orig_escape_count = body.count("\\")
2748 new_escape_count = new_body.count("\\")
2749 if new_escape_count > orig_escape_count:
2750 return # Do not introduce more escaping
2752 if new_escape_count == orig_escape_count and orig_quote == '"':
2753 return # Prefer double quotes
2755 leaf.value = f"{prefix}{new_quote}{new_body}{new_quote}"
2758 def normalize_numeric_literal(leaf: Leaf) -> None:
2759 """Normalizes numeric (float, int, and complex) literals.
2761 All letters used in the representation are normalized to lowercase (except
2762 in Python 2 long literals).
2764 text = leaf.value.lower()
2765 if text.startswith(("0o", "0b")):
2766 # Leave octal and binary literals alone.
2768 elif text.startswith("0x"):
2769 # Change hex literals to upper case.
2770 before, after = text[:2], text[2:]
2771 text = f"{before}{after.upper()}"
2773 before, after = text.split("e")
2775 if after.startswith("-"):
2778 elif after.startswith("+"):
2780 before = format_float_or_int_string(before)
2781 text = f"{before}e{sign}{after}"
2782 elif text.endswith(("j", "l")):
2785 # Capitalize in "2L" because "l" looks too similar to "1".
2788 text = f"{format_float_or_int_string(number)}{suffix}"
2790 text = format_float_or_int_string(text)
2794 def format_float_or_int_string(text: str) -> str:
2795 """Formats a float string like "1.0"."""
2799 before, after = text.split(".")
2800 return f"{before or 0}.{after or 0}"
2803 def normalize_invisible_parens(node: Node, parens_after: Set[str]) -> None:
2804 """Make existing optional parentheses invisible or create new ones.
2806 `parens_after` is a set of string leaf values immeditely after which parens
2809 Standardizes on visible parentheses for single-element tuples, and keeps
2810 existing visible parentheses for other tuples and generator expressions.
2812 for pc in list_comments(node.prefix, is_endmarker=False):
2813 if pc.value in FMT_OFF:
2814 # This `node` has a prefix with `# fmt: off`, don't mess with parens.
2818 for index, child in enumerate(list(node.children)):
2819 # Add parentheses around long tuple unpacking in assignments.
2822 and isinstance(child, Node)
2823 and child.type == syms.testlist_star_expr
2828 if child.type == syms.atom:
2829 if maybe_make_parens_invisible_in_atom(child, parent=node):
2830 lpar = Leaf(token.LPAR, "")
2831 rpar = Leaf(token.RPAR, "")
2832 index = child.remove() or 0
2833 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2834 elif is_one_tuple(child):
2835 # wrap child in visible parentheses
2836 lpar = Leaf(token.LPAR, "(")
2837 rpar = Leaf(token.RPAR, ")")
2839 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2840 elif node.type == syms.import_from:
2841 # "import from" nodes store parentheses directly as part of
2843 if child.type == token.LPAR:
2844 # make parentheses invisible
2845 child.value = "" # type: ignore
2846 node.children[-1].value = "" # type: ignore
2847 elif child.type != token.STAR:
2848 # insert invisible parentheses
2849 node.insert_child(index, Leaf(token.LPAR, ""))
2850 node.append_child(Leaf(token.RPAR, ""))
2853 elif not (isinstance(child, Leaf) and is_multiline_string(child)):
2854 # wrap child in invisible parentheses
2855 lpar = Leaf(token.LPAR, "")
2856 rpar = Leaf(token.RPAR, "")
2857 index = child.remove() or 0
2858 prefix = child.prefix
2860 new_child = Node(syms.atom, [lpar, child, rpar])
2861 new_child.prefix = prefix
2862 node.insert_child(index, new_child)
2864 check_lpar = isinstance(child, Leaf) and child.value in parens_after
2867 def normalize_fmt_off(node: Node) -> None:
2868 """Convert content between `# fmt: off`/`# fmt: on` into standalone comments."""
2871 try_again = convert_one_fmt_off_pair(node)
2874 def convert_one_fmt_off_pair(node: Node) -> bool:
2875 """Convert content of a single `# fmt: off`/`# fmt: on` into a standalone comment.
2877 Returns True if a pair was converted.
2879 for leaf in node.leaves():
2880 previous_consumed = 0
2881 for comment in list_comments(leaf.prefix, is_endmarker=False):
2882 if comment.value in FMT_OFF:
2883 # We only want standalone comments. If there's no previous leaf or
2884 # the previous leaf is indentation, it's a standalone comment in
2886 if comment.type != STANDALONE_COMMENT:
2887 prev = preceding_leaf(leaf)
2888 if prev and prev.type not in WHITESPACE:
2891 ignored_nodes = list(generate_ignored_nodes(leaf))
2892 if not ignored_nodes:
2895 first = ignored_nodes[0] # Can be a container node with the `leaf`.
2896 parent = first.parent
2897 prefix = first.prefix
2898 first.prefix = prefix[comment.consumed :]
2900 comment.value + "\n" + "".join(str(n) for n in ignored_nodes)
2902 if hidden_value.endswith("\n"):
2903 # That happens when one of the `ignored_nodes` ended with a NEWLINE
2904 # leaf (possibly followed by a DEDENT).
2905 hidden_value = hidden_value[:-1]
2907 for ignored in ignored_nodes:
2908 index = ignored.remove()
2909 if first_idx is None:
2911 assert parent is not None, "INTERNAL ERROR: fmt: on/off handling (1)"
2912 assert first_idx is not None, "INTERNAL ERROR: fmt: on/off handling (2)"
2913 parent.insert_child(
2918 prefix=prefix[:previous_consumed] + "\n" * comment.newlines,
2923 previous_consumed = comment.consumed
2928 def generate_ignored_nodes(leaf: Leaf) -> Iterator[LN]:
2929 """Starting from the container of `leaf`, generate all leaves until `# fmt: on`.
2931 Stops at the end of the block.
2933 container: Optional[LN] = container_of(leaf)
2934 while container is not None and container.type != token.ENDMARKER:
2935 for comment in list_comments(container.prefix, is_endmarker=False):
2936 if comment.value in FMT_ON:
2941 container = container.next_sibling
2944 def maybe_make_parens_invisible_in_atom(node: LN, parent: LN) -> bool:
2945 """If it's safe, make the parens in the atom `node` invisible, recursively.
2947 Returns whether the node should itself be wrapped in invisible parentheses.
2951 node.type != syms.atom
2952 or is_empty_tuple(node)
2953 or is_one_tuple(node)
2954 or (is_yield(node) and parent.type != syms.expr_stmt)
2955 or max_delimiter_priority_in_atom(node) >= COMMA_PRIORITY
2959 first = node.children[0]
2960 last = node.children[-1]
2961 if first.type == token.LPAR and last.type == token.RPAR:
2962 # make parentheses invisible
2963 first.value = "" # type: ignore
2964 last.value = "" # type: ignore
2965 if len(node.children) > 1:
2966 maybe_make_parens_invisible_in_atom(node.children[1], parent=parent)
2972 def is_empty_tuple(node: LN) -> bool:
2973 """Return True if `node` holds an empty tuple."""
2975 node.type == syms.atom
2976 and len(node.children) == 2
2977 and node.children[0].type == token.LPAR
2978 and node.children[1].type == token.RPAR
2982 def is_one_tuple(node: LN) -> bool:
2983 """Return True if `node` holds a tuple with one element, with or without parens."""
2984 if node.type == syms.atom:
2985 if len(node.children) != 3:
2988 lpar, gexp, rpar = node.children
2990 lpar.type == token.LPAR
2991 and gexp.type == syms.testlist_gexp
2992 and rpar.type == token.RPAR
2996 return len(gexp.children) == 2 and gexp.children[1].type == token.COMMA
2999 node.type in IMPLICIT_TUPLE
3000 and len(node.children) == 2
3001 and node.children[1].type == token.COMMA
3005 def is_yield(node: LN) -> bool:
3006 """Return True if `node` holds a `yield` or `yield from` expression."""
3007 if node.type == syms.yield_expr:
3010 if node.type == token.NAME and node.value == "yield": # type: ignore
3013 if node.type != syms.atom:
3016 if len(node.children) != 3:
3019 lpar, expr, rpar = node.children
3020 if lpar.type == token.LPAR and rpar.type == token.RPAR:
3021 return is_yield(expr)
3026 def is_vararg(leaf: Leaf, within: Set[NodeType]) -> bool:
3027 """Return True if `leaf` is a star or double star in a vararg or kwarg.
3029 If `within` includes VARARGS_PARENTS, this applies to function signatures.
3030 If `within` includes UNPACKING_PARENTS, it applies to right hand-side
3031 extended iterable unpacking (PEP 3132) and additional unpacking
3032 generalizations (PEP 448).
3034 if leaf.type not in STARS or not leaf.parent:
3038 if p.type == syms.star_expr:
3039 # Star expressions are also used as assignment targets in extended
3040 # iterable unpacking (PEP 3132). See what its parent is instead.
3046 return p.type in within
3049 def is_multiline_string(leaf: Leaf) -> bool:
3050 """Return True if `leaf` is a multiline string that actually spans many lines."""
3051 value = leaf.value.lstrip("furbFURB")
3052 return value[:3] in {'"""', "'''"} and "\n" in value
3055 def is_stub_suite(node: Node) -> bool:
3056 """Return True if `node` is a suite with a stub body."""
3058 len(node.children) != 4
3059 or node.children[0].type != token.NEWLINE
3060 or node.children[1].type != token.INDENT
3061 or node.children[3].type != token.DEDENT
3065 return is_stub_body(node.children[2])
3068 def is_stub_body(node: LN) -> bool:
3069 """Return True if `node` is a simple statement containing an ellipsis."""
3070 if not isinstance(node, Node) or node.type != syms.simple_stmt:
3073 if len(node.children) != 2:
3076 child = node.children[0]
3078 child.type == syms.atom
3079 and len(child.children) == 3
3080 and all(leaf == Leaf(token.DOT, ".") for leaf in child.children)
3084 def max_delimiter_priority_in_atom(node: LN) -> int:
3085 """Return maximum delimiter priority inside `node`.
3087 This is specific to atoms with contents contained in a pair of parentheses.
3088 If `node` isn't an atom or there are no enclosing parentheses, returns 0.
3090 if node.type != syms.atom:
3093 first = node.children[0]
3094 last = node.children[-1]
3095 if not (first.type == token.LPAR and last.type == token.RPAR):
3098 bt = BracketTracker()
3099 for c in node.children[1:-1]:
3100 if isinstance(c, Leaf):
3103 for leaf in c.leaves():
3106 return bt.max_delimiter_priority()
3112 def ensure_visible(leaf: Leaf) -> None:
3113 """Make sure parentheses are visible.
3115 They could be invisible as part of some statements (see
3116 :func:`normalize_invible_parens` and :func:`visit_import_from`).
3118 if leaf.type == token.LPAR:
3120 elif leaf.type == token.RPAR:
3124 def should_explode(line: Line, opening_bracket: Leaf) -> bool:
3125 """Should `line` immediately be split with `delimiter_split()` after RHS?"""
3128 opening_bracket.parent
3129 and opening_bracket.parent.type in {syms.atom, syms.import_from}
3130 and opening_bracket.value in "[{("
3135 last_leaf = line.leaves[-1]
3136 exclude = {id(last_leaf)} if last_leaf.type == token.COMMA else set()
3137 max_priority = line.bracket_tracker.max_delimiter_priority(exclude=exclude)
3138 except (IndexError, ValueError):
3141 return max_priority == COMMA_PRIORITY
3144 def get_features_used(node: Node) -> Set[Feature]:
3145 """Return a set of (relatively) new Python features used in this file.
3147 Currently looking for:
3149 - underscores in numeric literals; and
3150 - trailing commas after * or ** in function signatures and calls.
3152 features: Set[Feature] = set()
3153 for n in node.pre_order():
3154 if n.type == token.STRING:
3155 value_head = n.value[:2] # type: ignore
3156 if value_head in {'f"', 'F"', "f'", "F'", "rf", "fr", "RF", "FR"}:
3157 features.add(Feature.F_STRINGS)
3159 elif n.type == token.NUMBER:
3160 if "_" in n.value: # type: ignore
3161 features.add(Feature.NUMERIC_UNDERSCORES)
3164 n.type in {syms.typedargslist, syms.arglist}
3166 and n.children[-1].type == token.COMMA
3168 if n.type == syms.typedargslist:
3169 feature = Feature.TRAILING_COMMA_IN_DEF
3171 feature = Feature.TRAILING_COMMA_IN_CALL
3173 for ch in n.children:
3174 if ch.type in STARS:
3175 features.add(feature)
3177 if ch.type == syms.argument:
3178 for argch in ch.children:
3179 if argch.type in STARS:
3180 features.add(feature)
3185 def detect_target_versions(node: Node) -> Set[TargetVersion]:
3186 """Detect the version to target based on the nodes used."""
3187 features = get_features_used(node)
3189 version for version in TargetVersion if features <= VERSION_TO_FEATURES[version]
3193 def generate_trailers_to_omit(line: Line, line_length: int) -> Iterator[Set[LeafID]]:
3194 """Generate sets of closing bracket IDs that should be omitted in a RHS.
3196 Brackets can be omitted if the entire trailer up to and including
3197 a preceding closing bracket fits in one line.
3199 Yielded sets are cumulative (contain results of previous yields, too). First
3203 omit: Set[LeafID] = set()
3206 length = 4 * line.depth
3207 opening_bracket = None
3208 closing_bracket = None
3209 inner_brackets: Set[LeafID] = set()
3210 for index, leaf, leaf_length in enumerate_with_length(line, reversed=True):
3211 length += leaf_length
3212 if length > line_length:
3215 has_inline_comment = leaf_length > len(leaf.value) + len(leaf.prefix)
3216 if leaf.type == STANDALONE_COMMENT or has_inline_comment:
3220 if leaf is opening_bracket:
3221 opening_bracket = None
3222 elif leaf.type in CLOSING_BRACKETS:
3223 inner_brackets.add(id(leaf))
3224 elif leaf.type in CLOSING_BRACKETS:
3225 if index > 0 and line.leaves[index - 1].type in OPENING_BRACKETS:
3226 # Empty brackets would fail a split so treat them as "inner"
3227 # brackets (e.g. only add them to the `omit` set if another
3228 # pair of brackets was good enough.
3229 inner_brackets.add(id(leaf))
3233 omit.add(id(closing_bracket))
3234 omit.update(inner_brackets)
3235 inner_brackets.clear()
3239 opening_bracket = leaf.opening_bracket
3240 closing_bracket = leaf
3243 def get_future_imports(node: Node) -> Set[str]:
3244 """Return a set of __future__ imports in the file."""
3245 imports: Set[str] = set()
3247 def get_imports_from_children(children: List[LN]) -> Generator[str, None, None]:
3248 for child in children:
3249 if isinstance(child, Leaf):
3250 if child.type == token.NAME:
3252 elif child.type == syms.import_as_name:
3253 orig_name = child.children[0]
3254 assert isinstance(orig_name, Leaf), "Invalid syntax parsing imports"
3255 assert orig_name.type == token.NAME, "Invalid syntax parsing imports"
3256 yield orig_name.value
3257 elif child.type == syms.import_as_names:
3258 yield from get_imports_from_children(child.children)
3260 raise AssertionError("Invalid syntax parsing imports")
3262 for child in node.children:
3263 if child.type != syms.simple_stmt:
3265 first_child = child.children[0]
3266 if isinstance(first_child, Leaf):
3267 # Continue looking if we see a docstring; otherwise stop.
3269 len(child.children) == 2
3270 and first_child.type == token.STRING
3271 and child.children[1].type == token.NEWLINE
3276 elif first_child.type == syms.import_from:
3277 module_name = first_child.children[1]
3278 if not isinstance(module_name, Leaf) or module_name.value != "__future__":
3280 imports |= set(get_imports_from_children(first_child.children[3:]))
3286 def gen_python_files_in_dir(
3289 include: Pattern[str],
3290 exclude: Pattern[str],
3292 ) -> Iterator[Path]:
3293 """Generate all files under `path` whose paths are not excluded by the
3294 `exclude` regex, but are included by the `include` regex.
3296 Symbolic links pointing outside of the `root` directory are ignored.
3298 `report` is where output about exclusions goes.
3300 assert root.is_absolute(), f"INTERNAL ERROR: `root` must be absolute but is {root}"
3301 for child in path.iterdir():
3303 normalized_path = "/" + child.resolve().relative_to(root).as_posix()
3305 if child.is_symlink():
3306 report.path_ignored(
3307 child, f"is a symbolic link that points outside {root}"
3314 normalized_path += "/"
3315 exclude_match = exclude.search(normalized_path)
3316 if exclude_match and exclude_match.group(0):
3317 report.path_ignored(child, f"matches the --exclude regular expression")
3321 yield from gen_python_files_in_dir(child, root, include, exclude, report)
3323 elif child.is_file():
3324 include_match = include.search(normalized_path)
3330 def find_project_root(srcs: Iterable[str]) -> Path:
3331 """Return a directory containing .git, .hg, or pyproject.toml.
3333 That directory can be one of the directories passed in `srcs` or their
3336 If no directory in the tree contains a marker that would specify it's the
3337 project root, the root of the file system is returned.
3340 return Path("/").resolve()
3342 common_base = min(Path(src).resolve() for src in srcs)
3343 if common_base.is_dir():
3344 # Append a fake file so `parents` below returns `common_base_dir`, too.
3345 common_base /= "fake-file"
3346 for directory in common_base.parents:
3347 if (directory / ".git").is_dir():
3350 if (directory / ".hg").is_dir():
3353 if (directory / "pyproject.toml").is_file():
3361 """Provides a reformatting counter. Can be rendered with `str(report)`."""
3365 verbose: bool = False
3366 change_count: int = 0
3368 failure_count: int = 0
3370 def done(self, src: Path, changed: Changed) -> None:
3371 """Increment the counter for successful reformatting. Write out a message."""
3372 if changed is Changed.YES:
3373 reformatted = "would reformat" if self.check else "reformatted"
3374 if self.verbose or not self.quiet:
3375 out(f"{reformatted} {src}")
3376 self.change_count += 1
3379 if changed is Changed.NO:
3380 msg = f"{src} already well formatted, good job."
3382 msg = f"{src} wasn't modified on disk since last run."
3383 out(msg, bold=False)
3384 self.same_count += 1
3386 def failed(self, src: Path, message: str) -> None:
3387 """Increment the counter for failed reformatting. Write out a message."""
3388 err(f"error: cannot format {src}: {message}")
3389 self.failure_count += 1
3391 def path_ignored(self, path: Path, message: str) -> None:
3393 out(f"{path} ignored: {message}", bold=False)
3396 def return_code(self) -> int:
3397 """Return the exit code that the app should use.
3399 This considers the current state of changed files and failures:
3400 - if there were any failures, return 123;
3401 - if any files were changed and --check is being used, return 1;
3402 - otherwise return 0.
3404 # According to http://tldp.org/LDP/abs/html/exitcodes.html starting with
3405 # 126 we have special return codes reserved by the shell.
3406 if self.failure_count:
3409 elif self.change_count and self.check:
3414 def __str__(self) -> str:
3415 """Render a color report of the current state.
3417 Use `click.unstyle` to remove colors.
3420 reformatted = "would be reformatted"
3421 unchanged = "would be left unchanged"
3422 failed = "would fail to reformat"
3424 reformatted = "reformatted"
3425 unchanged = "left unchanged"
3426 failed = "failed to reformat"
3428 if self.change_count:
3429 s = "s" if self.change_count > 1 else ""
3431 click.style(f"{self.change_count} file{s} {reformatted}", bold=True)
3434 s = "s" if self.same_count > 1 else ""
3435 report.append(f"{self.same_count} file{s} {unchanged}")
3436 if self.failure_count:
3437 s = "s" if self.failure_count > 1 else ""
3439 click.style(f"{self.failure_count} file{s} {failed}", fg="red")
3441 return ", ".join(report) + "."
3444 def parse_ast(src: str) -> Union[ast3.AST, ast27.AST]:
3445 for feature_version in (7, 6):
3447 return ast3.parse(src, feature_version=feature_version)
3451 return ast27.parse(src)
3454 def assert_equivalent(src: str, dst: str) -> None:
3455 """Raise AssertionError if `src` and `dst` aren't equivalent."""
3459 def _v(node: Union[ast3.AST, ast27.AST], depth: int = 0) -> Iterator[str]:
3460 """Simple visitor generating strings to compare ASTs by content."""
3461 yield f"{' ' * depth}{node.__class__.__name__}("
3463 for field in sorted(node._fields):
3464 # TypeIgnore has only one field 'lineno' which breaks this comparison
3465 if isinstance(node, (ast3.TypeIgnore, ast27.TypeIgnore)):
3468 # Ignore str kind which is case sensitive / and ignores unicode_literals
3469 if isinstance(node, (ast3.Str, ast27.Str, ast3.Bytes)) and field == "kind":
3473 value = getattr(node, field)
3474 except AttributeError:
3477 yield f"{' ' * (depth+1)}{field}="
3479 if isinstance(value, list):
3481 # Ignore nested tuples within del statements, because we may insert
3482 # parentheses and they change the AST.
3485 and isinstance(node, (ast3.Delete, ast27.Delete))
3486 and isinstance(item, (ast3.Tuple, ast27.Tuple))
3488 for item in item.elts:
3489 yield from _v(item, depth + 2)
3490 elif isinstance(item, (ast3.AST, ast27.AST)):
3491 yield from _v(item, depth + 2)
3493 elif isinstance(value, (ast3.AST, ast27.AST)):
3494 yield from _v(value, depth + 2)
3497 yield f"{' ' * (depth+2)}{value!r}, # {value.__class__.__name__}"
3499 yield f"{' ' * depth}) # /{node.__class__.__name__}"
3502 src_ast = parse_ast(src)
3503 except Exception as exc:
3504 raise AssertionError(
3505 f"cannot use --safe with this file; failed to parse source file. "
3506 f"AST error message: {exc}"
3510 dst_ast = parse_ast(dst)
3511 except Exception as exc:
3512 log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
3513 raise AssertionError(
3514 f"INTERNAL ERROR: Black produced invalid code: {exc}. "
3515 f"Please report a bug on https://github.com/python/black/issues. "
3516 f"This invalid output might be helpful: {log}"
3519 src_ast_str = "\n".join(_v(src_ast))
3520 dst_ast_str = "\n".join(_v(dst_ast))
3521 if src_ast_str != dst_ast_str:
3522 log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst"))
3523 raise AssertionError(
3524 f"INTERNAL ERROR: Black produced code that is not equivalent to "
3526 f"Please report a bug on https://github.com/python/black/issues. "
3527 f"This diff might be helpful: {log}"
3531 def assert_stable(src: str, dst: str, mode: FileMode) -> None:
3532 """Raise AssertionError if `dst` reformats differently the second time."""
3533 newdst = format_str(dst, mode=mode)
3536 diff(src, dst, "source", "first pass"),
3537 diff(dst, newdst, "first pass", "second pass"),
3539 raise AssertionError(
3540 f"INTERNAL ERROR: Black produced different code on the second pass "
3541 f"of the formatter. "
3542 f"Please report a bug on https://github.com/python/black/issues. "
3543 f"This diff might be helpful: {log}"
3547 def dump_to_file(*output: str) -> str:
3548 """Dump `output` to a temporary file. Return path to the file."""
3551 with tempfile.NamedTemporaryFile(
3552 mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
3554 for lines in output:
3556 if lines and lines[-1] != "\n":
3561 def diff(a: str, b: str, a_name: str, b_name: str) -> str:
3562 """Return a unified diff string between strings `a` and `b`."""
3565 a_lines = [line + "\n" for line in a.split("\n")]
3566 b_lines = [line + "\n" for line in b.split("\n")]
3568 difflib.unified_diff(a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5)
3572 def cancel(tasks: Iterable[asyncio.Task]) -> None:
3573 """asyncio signal handler that cancels all `tasks` and reports to stderr."""
3579 def shutdown(loop: BaseEventLoop) -> None:
3580 """Cancel all pending tasks on `loop`, wait for them, and close the loop."""
3582 if sys.version_info[:2] >= (3, 7):
3583 all_tasks = asyncio.all_tasks
3585 all_tasks = asyncio.Task.all_tasks
3586 # This part is borrowed from asyncio/runners.py in Python 3.7b2.
3587 to_cancel = [task for task in all_tasks(loop) if not task.done()]
3591 for task in to_cancel:
3593 loop.run_until_complete(
3594 asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
3597 # `concurrent.futures.Future` objects cannot be cancelled once they
3598 # are already running. There might be some when the `shutdown()` happened.
3599 # Silence their logger's spew about the event loop being closed.
3600 cf_logger = logging.getLogger("concurrent.futures")
3601 cf_logger.setLevel(logging.CRITICAL)
3605 def sub_twice(regex: Pattern[str], replacement: str, original: str) -> str:
3606 """Replace `regex` with `replacement` twice on `original`.
3608 This is used by string normalization to perform replaces on
3609 overlapping matches.
3611 return regex.sub(replacement, regex.sub(replacement, original))
3614 def re_compile_maybe_verbose(regex: str) -> Pattern[str]:
3615 """Compile a regular expression string in `regex`.
3617 If it contains newlines, use verbose mode.
3620 regex = "(?x)" + regex
3621 return re.compile(regex)
3624 def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
3625 """Like `reversed(enumerate(sequence))` if that were possible."""
3626 index = len(sequence) - 1
3627 for element in reversed(sequence):
3628 yield (index, element)
3632 def enumerate_with_length(
3633 line: Line, reversed: bool = False
3634 ) -> Iterator[Tuple[Index, Leaf, int]]:
3635 """Return an enumeration of leaves with their length.
3637 Stops prematurely on multiline strings and standalone comments.
3640 Callable[[Sequence[Leaf]], Iterator[Tuple[Index, Leaf]]],
3641 enumerate_reversed if reversed else enumerate,
3643 for index, leaf in op(line.leaves):
3644 length = len(leaf.prefix) + len(leaf.value)
3645 if "\n" in leaf.value:
3646 return # Multiline strings, we can't continue.
3648 comment: Optional[Leaf]
3649 for comment in line.comments_after(leaf):
3650 length += len(comment.value)
3652 yield index, leaf, length
3655 def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
3656 """Return True if `line` is no longer than `line_length`.
3658 Uses the provided `line_str` rendering, if any, otherwise computes a new one.
3661 line_str = str(line).strip("\n")
3663 len(line_str) <= line_length
3664 and "\n" not in line_str # multiline strings
3665 and not line.contains_standalone_comments()
3669 def can_be_split(line: Line) -> bool:
3670 """Return False if the line cannot be split *for sure*.
3672 This is not an exhaustive search but a cheap heuristic that we can use to
3673 avoid some unfortunate formattings (mostly around wrapping unsplittable code
3674 in unnecessary parentheses).
3676 leaves = line.leaves
3680 if leaves[0].type == token.STRING and leaves[1].type == token.DOT:
3684 for leaf in leaves[-2::-1]:
3685 if leaf.type in OPENING_BRACKETS:
3686 if next.type not in CLOSING_BRACKETS:
3690 elif leaf.type == token.DOT:
3692 elif leaf.type == token.NAME:
3693 if not (next.type == token.DOT or next.type in OPENING_BRACKETS):
3696 elif leaf.type not in CLOSING_BRACKETS:
3699 if dot_count > 1 and call_count > 1:
3705 def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
3706 """Does `line` have a shape safe to reformat without optional parens around it?
3708 Returns True for only a subset of potentially nice looking formattings but
3709 the point is to not return false positives that end up producing lines that
3712 bt = line.bracket_tracker
3713 if not bt.delimiters:
3714 # Without delimiters the optional parentheses are useless.
3717 max_priority = bt.max_delimiter_priority()
3718 if bt.delimiter_count_with_priority(max_priority) > 1:
3719 # With more than one delimiter of a kind the optional parentheses read better.
3722 if max_priority == DOT_PRIORITY:
3723 # A single stranded method call doesn't require optional parentheses.
3726 assert len(line.leaves) >= 2, "Stranded delimiter"
3728 first = line.leaves[0]
3729 second = line.leaves[1]
3730 penultimate = line.leaves[-2]
3731 last = line.leaves[-1]
3733 # With a single delimiter, omit if the expression starts or ends with
3735 if first.type in OPENING_BRACKETS and second.type not in CLOSING_BRACKETS:
3737 length = 4 * line.depth
3738 for _index, leaf, leaf_length in enumerate_with_length(line):
3739 if leaf.type in CLOSING_BRACKETS and leaf.opening_bracket is first:
3742 length += leaf_length
3743 if length > line_length:
3746 if leaf.type in OPENING_BRACKETS:
3747 # There are brackets we can further split on.
3751 # checked the entire string and line length wasn't exceeded
3752 if len(line.leaves) == _index + 1:
3755 # Note: we are not returning False here because a line might have *both*
3756 # a leading opening bracket and a trailing closing bracket. If the
3757 # opening bracket doesn't match our rule, maybe the closing will.
3760 last.type == token.RPAR
3761 or last.type == token.RBRACE
3763 # don't use indexing for omitting optional parentheses;
3765 last.type == token.RSQB
3767 and last.parent.type != syms.trailer
3770 if penultimate.type in OPENING_BRACKETS:
3771 # Empty brackets don't help.
3774 if is_multiline_string(first):
3775 # Additional wrapping of a multiline string in this situation is
3779 length = 4 * line.depth
3780 seen_other_brackets = False
3781 for _index, leaf, leaf_length in enumerate_with_length(line):
3782 length += leaf_length
3783 if leaf is last.opening_bracket:
3784 if seen_other_brackets or length <= line_length:
3787 elif leaf.type in OPENING_BRACKETS:
3788 # There are brackets we can further split on.
3789 seen_other_brackets = True
3794 def get_cache_file(mode: FileMode) -> Path:
3795 return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
3798 def read_cache(mode: FileMode) -> Cache:
3799 """Read the cache if it exists and is well formed.
3801 If it is not well formed, the call to write_cache later should resolve the issue.
3803 cache_file = get_cache_file(mode)
3804 if not cache_file.exists():
3807 with cache_file.open("rb") as fobj:
3809 cache: Cache = pickle.load(fobj)
3810 except pickle.UnpicklingError:
3816 def get_cache_info(path: Path) -> CacheInfo:
3817 """Return the information used to check if a file is already formatted or not."""
3819 return stat.st_mtime, stat.st_size
3822 def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
3823 """Split an iterable of paths in `sources` into two sets.
3825 The first contains paths of files that modified on disk or are not in the
3826 cache. The other contains paths to non-modified files.
3828 todo, done = set(), set()
3831 if cache.get(src) != get_cache_info(src):
3838 def write_cache(cache: Cache, sources: Iterable[Path], mode: FileMode) -> None:
3839 """Update the cache file."""
3840 cache_file = get_cache_file(mode)
3842 CACHE_DIR.mkdir(parents=True, exist_ok=True)
3843 new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
3844 with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
3845 pickle.dump(new_cache, f, protocol=pickle.HIGHEST_PROTOCOL)
3846 os.replace(f.name, cache_file)
3851 def patch_click() -> None:
3852 """Make Click not crash.
3854 On certain misconfigured environments, Python 3 selects the ASCII encoding as the
3855 default which restricts paths that it can access during the lifetime of the
3856 application. Click refuses to work in this scenario by raising a RuntimeError.
3858 In case of Black the likelihood that non-ASCII characters are going to be used in
3859 file paths is minimal since it's Python source code. Moreover, this crash was
3860 spurious on Python 3.7 thanks to PEP 538 and PEP 540.
3863 from click import core
3864 from click import _unicodefun # type: ignore
3865 except ModuleNotFoundError:
3868 for module in (core, _unicodefun):
3869 if hasattr(module, "_verify_python3_env"):
3870 module._verify_python3_env = lambda: None
3873 def patched_main() -> None:
3879 if __name__ == "__main__":