All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
3 from concurrent.futures import Executor, ProcessPoolExecutor
4 from contextlib import contextmanager
5 from datetime import datetime
7 from functools import lru_cache, partial, wraps
11 from multiprocessing import Manager, freeze_support
13 from pathlib import Path
41 from appdirs import user_cache_dir
42 from attr import dataclass, evolve, Factory
45 from typed_ast import ast3, ast27
48 from blib2to3.pytree import Node, Leaf, type_repr
49 from blib2to3 import pygram, pytree
50 from blib2to3.pgen2 import driver, token
51 from blib2to3.pgen2.grammar import Grammar
52 from blib2to3.pgen2.parse import ParseError
54 from _version import get_versions
57 __version__ = v.get("closest-tag", v["version"])
58 __git_version__ = v.get("full-revisionid")
60 DEFAULT_LINE_LENGTH = 88
62 r"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|_build|buck-out|build|dist)/"
64 DEFAULT_INCLUDES = r"\.pyi?$"
65 CACHE_DIR = Path(user_cache_dir("black", version=__git_version__))
77 LN = Union[Leaf, Node]
78 SplitFunc = Callable[["Line", Collection["Feature"]], Iterator["Line"]]
81 CacheInfo = Tuple[Timestamp, FileSize]
82 Cache = Dict[Path, CacheInfo]
83 out = partial(click.secho, bold=True, err=True)
84 err = partial(click.secho, fg="red", err=True)
86 pygram.initialize(CACHE_DIR)
87 syms = pygram.python_symbols
90 class NothingChanged(UserWarning):
91 """Raised when reformatted code is the same as source."""
94 class CannotSplit(Exception):
95 """A readable split that fits the allotted line length is impossible."""
98 class InvalidInput(ValueError):
99 """Raised when input source code fails all parse attempts."""
102 class WriteBack(Enum):
109 def from_configuration(cls, *, check: bool, diff: bool) -> "WriteBack":
110 if check and not diff:
113 return cls.DIFF if diff else cls.YES
122 class TargetVersion(Enum):
131 def is_python2(self) -> bool:
132 return self is TargetVersion.PY27
135 PY36_VERSIONS = {TargetVersion.PY36, TargetVersion.PY37, TargetVersion.PY38}
139 # All string literals are unicode
142 NUMERIC_UNDERSCORES = 3
143 TRAILING_COMMA_IN_CALL = 4
144 TRAILING_COMMA_IN_DEF = 5
145 # The following two feature-flags are mutually exclusive, and exactly one should be
146 # set for every version of python.
147 ASYNC_IDENTIFIERS = 6
149 ASSIGNMENT_EXPRESSIONS = 8
150 POS_ONLY_ARGUMENTS = 9
153 VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
154 TargetVersion.PY27: {Feature.ASYNC_IDENTIFIERS},
155 TargetVersion.PY33: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
156 TargetVersion.PY34: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
157 TargetVersion.PY35: {
158 Feature.UNICODE_LITERALS,
159 Feature.TRAILING_COMMA_IN_CALL,
160 Feature.ASYNC_IDENTIFIERS,
162 TargetVersion.PY36: {
163 Feature.UNICODE_LITERALS,
165 Feature.NUMERIC_UNDERSCORES,
166 Feature.TRAILING_COMMA_IN_CALL,
167 Feature.TRAILING_COMMA_IN_DEF,
168 Feature.ASYNC_IDENTIFIERS,
170 TargetVersion.PY37: {
171 Feature.UNICODE_LITERALS,
173 Feature.NUMERIC_UNDERSCORES,
174 Feature.TRAILING_COMMA_IN_CALL,
175 Feature.TRAILING_COMMA_IN_DEF,
176 Feature.ASYNC_KEYWORDS,
178 TargetVersion.PY38: {
179 Feature.UNICODE_LITERALS,
181 Feature.NUMERIC_UNDERSCORES,
182 Feature.TRAILING_COMMA_IN_CALL,
183 Feature.TRAILING_COMMA_IN_DEF,
184 Feature.ASYNC_KEYWORDS,
185 Feature.ASSIGNMENT_EXPRESSIONS,
186 Feature.POS_ONLY_ARGUMENTS,
193 target_versions: Set[TargetVersion] = Factory(set)
194 line_length: int = DEFAULT_LINE_LENGTH
195 string_normalization: bool = True
198 def get_cache_key(self) -> str:
199 if self.target_versions:
200 version_str = ",".join(
202 for version in sorted(self.target_versions, key=lambda v: v.value)
208 str(self.line_length),
209 str(int(self.string_normalization)),
210 str(int(self.is_pyi)),
212 return ".".join(parts)
215 def supports_feature(target_versions: Set[TargetVersion], feature: Feature) -> bool:
216 return all(feature in VERSION_TO_FEATURES[version] for version in target_versions)
219 def read_pyproject_toml(
220 ctx: click.Context, param: click.Parameter, value: Union[str, int, bool, None]
222 """Inject Black configuration from "pyproject.toml" into defaults in `ctx`.
224 Returns the path to a successfully found and read configuration file, None
227 assert not isinstance(value, (int, bool)), "Invalid parameter type passed"
229 root = find_project_root(ctx.params.get("src", ()))
230 path = root / "pyproject.toml"
237 pyproject_toml = toml.load(value)
238 config = pyproject_toml.get("tool", {}).get("black", {})
239 except (toml.TomlDecodeError, OSError) as e:
240 raise click.FileError(
241 filename=value, hint=f"Error reading configuration file: {e}"
247 if ctx.default_map is None:
249 ctx.default_map.update( # type: ignore # bad types in .pyi
250 {k.replace("--", "").replace("-", "_"): v for k, v in config.items()}
255 @click.command(context_settings=dict(help_option_names=["-h", "--help"]))
256 @click.option("-c", "--code", type=str, help="Format the code passed in as a string.")
261 default=DEFAULT_LINE_LENGTH,
262 help="How many characters per line to allow.",
268 type=click.Choice([v.name.lower() for v in TargetVersion]),
269 callback=lambda c, p, v: [TargetVersion[val.upper()] for val in v],
272 "Python versions that should be supported by Black's output. [default: "
273 "per-file auto-detection]"
280 "Allow using Python 3.6-only syntax on all input files. This will put "
281 "trailing commas in function signatures and calls also after *args and "
282 "**kwargs. Deprecated; use --target-version instead. "
283 "[default: per-file auto-detection]"
290 "Format all input files like typing stubs regardless of file extension "
291 "(useful when piping source on standard input)."
296 "--skip-string-normalization",
298 help="Don't normalize string quotes or prefixes.",
304 "Don't write the files back, just return the status. Return code 0 "
305 "means nothing would change. Return code 1 means some files would be "
306 "reformatted. Return code 123 means there was an internal error."
312 help="Don't write the files back, just output a diff for each file on stdout.",
317 help="If --fast given, skip temporary sanity checks. [default: --safe]",
322 default=DEFAULT_INCLUDES,
324 "A regular expression that matches files and directories that should be "
325 "included on recursive searches. An empty value means all files are "
326 "included regardless of the name. Use forward slashes for directories on "
327 "all platforms (Windows, too). Exclusions are calculated first, inclusions "
335 default=DEFAULT_EXCLUDES,
337 "A regular expression that matches files and directories that should be "
338 "excluded on recursive searches. An empty value means no paths are excluded. "
339 "Use forward slashes for directories on all platforms (Windows, too). "
340 "Exclusions are calculated first, inclusions later."
349 "Don't emit non-error messages to stderr. Errors are still emitted; "
350 "silence those with 2>/dev/null."
358 "Also emit messages to stderr about files that were not changed or were "
359 "ignored due to --exclude=."
362 @click.version_option(version=__version__)
367 exists=True, file_okay=True, dir_okay=True, readable=True, allow_dash=True
374 exists=False, file_okay=True, dir_okay=False, readable=True, allow_dash=False
377 callback=read_pyproject_toml,
378 help="Read configuration from PATH.",
385 target_version: List[TargetVersion],
391 skip_string_normalization: bool,
397 config: Optional[str],
399 """The uncompromising code formatter."""
400 write_back = WriteBack.from_configuration(check=check, diff=diff)
403 err(f"Cannot use both --target-version and --py36")
406 versions = set(target_version)
409 "--py36 is deprecated and will be removed in a future version. "
410 "Use --target-version py36 instead."
412 versions = PY36_VERSIONS
414 # We'll autodetect later.
417 target_versions=versions,
418 line_length=line_length,
420 string_normalization=not skip_string_normalization,
422 if config and verbose:
423 out(f"Using configuration from {config}.", bold=False, fg="blue")
425 print(format_str(code, mode=mode))
428 include_regex = re_compile_maybe_verbose(include)
430 err(f"Invalid regular expression for include given: {include!r}")
433 exclude_regex = re_compile_maybe_verbose(exclude)
435 err(f"Invalid regular expression for exclude given: {exclude!r}")
437 report = Report(check=check, quiet=quiet, verbose=verbose)
438 root = find_project_root(src)
439 sources: Set[Path] = set()
444 gen_python_files_in_dir(p, root, include_regex, exclude_regex, report)
446 elif p.is_file() or s == "-":
447 # if a file was explicitly given, we don't care about its extension
450 err(f"invalid path: {s}")
451 if len(sources) == 0:
452 if verbose or not quiet:
453 out("No paths given. Nothing to do 😴")
456 if len(sources) == 1:
460 write_back=write_back,
466 sources=sources, fast=fast, write_back=write_back, mode=mode, report=report
469 if verbose or not quiet:
470 out("Oh no! 💥 💔 💥" if report.return_code else "All done! ✨ 🍰 ✨")
471 click.secho(str(report), err=True)
472 ctx.exit(report.return_code)
476 src: Path, fast: bool, write_back: WriteBack, mode: FileMode, report: "Report"
478 """Reformat a single file under `src` without spawning child processes.
480 `fast`, `write_back`, and `mode` options are passed to
481 :func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
485 if not src.is_file() and str(src) == "-":
486 if format_stdin_to_stdout(fast=fast, write_back=write_back, mode=mode):
487 changed = Changed.YES
490 if write_back != WriteBack.DIFF:
491 cache = read_cache(mode)
492 res_src = src.resolve()
493 if res_src in cache and cache[res_src] == get_cache_info(res_src):
494 changed = Changed.CACHED
495 if changed is not Changed.CACHED and format_file_in_place(
496 src, fast=fast, write_back=write_back, mode=mode
498 changed = Changed.YES
499 if (write_back is WriteBack.YES and changed is not Changed.CACHED) or (
500 write_back is WriteBack.CHECK and changed is Changed.NO
502 write_cache(cache, [src], mode)
503 report.done(src, changed)
504 except Exception as exc:
505 report.failed(src, str(exc))
511 write_back: WriteBack,
515 """Reformat multiple files using a ProcessPoolExecutor."""
516 loop = asyncio.get_event_loop()
517 worker_count = os.cpu_count()
518 if sys.platform == "win32":
519 # Work around https://bugs.python.org/issue26903
520 worker_count = min(worker_count, 61)
521 executor = ProcessPoolExecutor(max_workers=worker_count)
523 loop.run_until_complete(
527 write_back=write_back,
539 async def schedule_formatting(
542 write_back: WriteBack,
545 loop: asyncio.AbstractEventLoop,
548 """Run formatting of `sources` in parallel using the provided `executor`.
550 (Use ProcessPoolExecutors for actual parallelism.)
552 `write_back`, `fast`, and `mode` options are passed to
553 :func:`format_file_in_place`.
556 if write_back != WriteBack.DIFF:
557 cache = read_cache(mode)
558 sources, cached = filter_cached(cache, sources)
559 for src in sorted(cached):
560 report.done(src, Changed.CACHED)
565 sources_to_cache = []
567 if write_back == WriteBack.DIFF:
568 # For diff output, we need locks to ensure we don't interleave output
569 # from different processes.
571 lock = manager.Lock()
573 asyncio.ensure_future(
574 loop.run_in_executor(
575 executor, format_file_in_place, src, fast, mode, write_back, lock
578 for src in sorted(sources)
580 pending: Iterable[asyncio.Future] = tasks.keys()
582 loop.add_signal_handler(signal.SIGINT, cancel, pending)
583 loop.add_signal_handler(signal.SIGTERM, cancel, pending)
584 except NotImplementedError:
585 # There are no good alternatives for these on Windows.
588 done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
590 src = tasks.pop(task)
592 cancelled.append(task)
593 elif task.exception():
594 report.failed(src, str(task.exception()))
596 changed = Changed.YES if task.result() else Changed.NO
597 # If the file was written back or was successfully checked as
598 # well-formatted, store this information in the cache.
599 if write_back is WriteBack.YES or (
600 write_back is WriteBack.CHECK and changed is Changed.NO
602 sources_to_cache.append(src)
603 report.done(src, changed)
605 await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
607 write_cache(cache, sources_to_cache, mode)
610 def format_file_in_place(
614 write_back: WriteBack = WriteBack.NO,
615 lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
617 """Format file under `src` path. Return True if changed.
619 If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted
621 `mode` and `fast` options are passed to :func:`format_file_contents`.
623 if src.suffix == ".pyi":
624 mode = evolve(mode, is_pyi=True)
626 then = datetime.utcfromtimestamp(src.stat().st_mtime)
627 with open(src, "rb") as buf:
628 src_contents, encoding, newline = decode_bytes(buf.read())
630 dst_contents = format_file_contents(src_contents, fast=fast, mode=mode)
631 except NothingChanged:
634 if write_back == write_back.YES:
635 with open(src, "w", encoding=encoding, newline=newline) as f:
636 f.write(dst_contents)
637 elif write_back == write_back.DIFF:
638 now = datetime.utcnow()
639 src_name = f"{src}\t{then} +0000"
640 dst_name = f"{src}\t{now} +0000"
641 diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
643 with lock or nullcontext():
644 f = io.TextIOWrapper(
650 f.write(diff_contents)
656 def format_stdin_to_stdout(
657 fast: bool, *, write_back: WriteBack = WriteBack.NO, mode: FileMode
659 """Format file on stdin. Return True if changed.
661 If `write_back` is YES, write reformatted code back to stdout. If it is DIFF,
662 write a diff to stdout. The `mode` argument is passed to
663 :func:`format_file_contents`.
665 then = datetime.utcnow()
666 src, encoding, newline = decode_bytes(sys.stdin.buffer.read())
669 dst = format_file_contents(src, fast=fast, mode=mode)
672 except NothingChanged:
676 f = io.TextIOWrapper(
677 sys.stdout.buffer, encoding=encoding, newline=newline, write_through=True
679 if write_back == WriteBack.YES:
681 elif write_back == WriteBack.DIFF:
682 now = datetime.utcnow()
683 src_name = f"STDIN\t{then} +0000"
684 dst_name = f"STDOUT\t{now} +0000"
685 f.write(diff(src, dst, src_name, dst_name))
689 def format_file_contents(
690 src_contents: str, *, fast: bool, mode: FileMode
692 """Reformat contents a file and return new contents.
694 If `fast` is False, additionally confirm that the reformatted code is
695 valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
696 `mode` is passed to :func:`format_str`.
698 if src_contents.strip() == "":
701 dst_contents = format_str(src_contents, mode=mode)
702 if src_contents == dst_contents:
706 assert_equivalent(src_contents, dst_contents)
707 assert_stable(src_contents, dst_contents, mode=mode)
711 def format_str(src_contents: str, *, mode: FileMode) -> FileContent:
712 """Reformat a string and return new contents.
714 `mode` determines formatting options, such as how many characters per line are
717 src_node = lib2to3_parse(src_contents.lstrip(), mode.target_versions)
719 future_imports = get_future_imports(src_node)
720 if mode.target_versions:
721 versions = mode.target_versions
723 versions = detect_target_versions(src_node)
724 normalize_fmt_off(src_node)
725 lines = LineGenerator(
726 remove_u_prefix="unicode_literals" in future_imports
727 or supports_feature(versions, Feature.UNICODE_LITERALS),
729 normalize_strings=mode.string_normalization,
731 elt = EmptyLineTracker(is_pyi=mode.is_pyi)
734 split_line_features = {
736 for feature in {Feature.TRAILING_COMMA_IN_CALL, Feature.TRAILING_COMMA_IN_DEF}
737 if supports_feature(versions, feature)
739 for current_line in lines.visit(src_node):
740 for _ in range(after):
741 dst_contents.append(str(empty_line))
742 before, after = elt.maybe_empty_lines(current_line)
743 for _ in range(before):
744 dst_contents.append(str(empty_line))
745 for line in split_line(
746 current_line, line_length=mode.line_length, features=split_line_features
748 dst_contents.append(str(line))
749 return "".join(dst_contents)
752 def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine]:
753 """Return a tuple of (decoded_contents, encoding, newline).
755 `newline` is either CRLF or LF but `decoded_contents` is decoded with
756 universal newlines (i.e. only contains LF).
758 srcbuf = io.BytesIO(src)
759 encoding, lines = tokenize.detect_encoding(srcbuf.readline)
761 return "", encoding, "\n"
763 newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
765 with io.TextIOWrapper(srcbuf, encoding) as tiow:
766 return tiow.read(), encoding, newline
769 def get_grammars(target_versions: Set[TargetVersion]) -> List[Grammar]:
770 if not target_versions:
771 # No target_version specified, so try all grammars.
774 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords,
776 pygram.python_grammar_no_print_statement_no_exec_statement,
777 # Python 2.7 with future print_function import
778 pygram.python_grammar_no_print_statement,
780 pygram.python_grammar,
782 elif all(version.is_python2() for version in target_versions):
783 # Python 2-only code, so try Python 2 grammars.
785 # Python 2.7 with future print_function import
786 pygram.python_grammar_no_print_statement,
788 pygram.python_grammar,
791 # Python 3-compatible code, so only try Python 3 grammar.
793 # If we have to parse both, try to parse async as a keyword first
794 if not supports_feature(target_versions, Feature.ASYNC_IDENTIFIERS):
797 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords # noqa: B950
799 if not supports_feature(target_versions, Feature.ASYNC_KEYWORDS):
801 grammars.append(pygram.python_grammar_no_print_statement_no_exec_statement)
802 # At least one of the above branches must have been taken, because every Python
803 # version has exactly one of the two 'ASYNC_*' flags
807 def lib2to3_parse(src_txt: str, target_versions: Iterable[TargetVersion] = ()) -> Node:
808 """Given a string with source, return the lib2to3 Node."""
809 if src_txt[-1:] != "\n":
812 for grammar in get_grammars(set(target_versions)):
813 drv = driver.Driver(grammar, pytree.convert)
815 result = drv.parse_string(src_txt, True)
818 except ParseError as pe:
819 lineno, column = pe.context[1]
820 lines = src_txt.splitlines()
822 faulty_line = lines[lineno - 1]
824 faulty_line = "<line number missing in source>"
825 exc = InvalidInput(f"Cannot parse: {lineno}:{column}: {faulty_line}")
829 if isinstance(result, Leaf):
830 result = Node(syms.file_input, [result])
834 def lib2to3_unparse(node: Node) -> str:
835 """Given a lib2to3 node, return its string representation."""
843 class Visitor(Generic[T]):
844 """Basic lib2to3 visitor that yields things of type `T` on `visit()`."""
846 def visit(self, node: LN) -> Iterator[T]:
847 """Main method to visit `node` and its children.
849 It tries to find a `visit_*()` method for the given `node.type`, like
850 `visit_simple_stmt` for Node objects or `visit_INDENT` for Leaf objects.
851 If no dedicated `visit_*()` method is found, chooses `visit_default()`
854 Then yields objects of type `T` from the selected visitor.
857 name = token.tok_name[node.type]
859 name = type_repr(node.type)
860 yield from getattr(self, f"visit_{name}", self.visit_default)(node)
862 def visit_default(self, node: LN) -> Iterator[T]:
863 """Default `visit_*()` implementation. Recurses to children of `node`."""
864 if isinstance(node, Node):
865 for child in node.children:
866 yield from self.visit(child)
870 class DebugVisitor(Visitor[T]):
873 def visit_default(self, node: LN) -> Iterator[T]:
874 indent = " " * (2 * self.tree_depth)
875 if isinstance(node, Node):
876 _type = type_repr(node.type)
877 out(f"{indent}{_type}", fg="yellow")
879 for child in node.children:
880 yield from self.visit(child)
883 out(f"{indent}/{_type}", fg="yellow", bold=False)
885 _type = token.tok_name.get(node.type, str(node.type))
886 out(f"{indent}{_type}", fg="blue", nl=False)
888 # We don't have to handle prefixes for `Node` objects since
889 # that delegates to the first child anyway.
890 out(f" {node.prefix!r}", fg="green", bold=False, nl=False)
891 out(f" {node.value!r}", fg="blue", bold=False)
894 def show(cls, code: Union[str, Leaf, Node]) -> None:
895 """Pretty-print the lib2to3 AST of a given string of `code`.
897 Convenience method for debugging.
899 v: DebugVisitor[None] = DebugVisitor()
900 if isinstance(code, str):
901 code = lib2to3_parse(code)
905 WHITESPACE = {token.DEDENT, token.INDENT, token.NEWLINE}
916 STANDALONE_COMMENT = 153
917 token.tok_name[STANDALONE_COMMENT] = "STANDALONE_COMMENT"
918 LOGIC_OPERATORS = {"and", "or"}
943 STARS = {token.STAR, token.DOUBLESTAR}
944 VARARGS_SPECIALS = STARS | {token.SLASH}
947 syms.argument, # double star in arglist
948 syms.trailer, # single argument to call
950 syms.varargslist, # lambdas
952 UNPACKING_PARENTS = {
953 syms.atom, # single element of a list or set literal
957 syms.testlist_star_expr,
992 COMPREHENSION_PRIORITY = 20
994 TERNARY_PRIORITY = 16
997 COMPARATOR_PRIORITY = 10
1000 token.CIRCUMFLEX: 8,
1003 token.RIGHTSHIFT: 6,
1008 token.DOUBLESLASH: 4,
1012 token.DOUBLESTAR: 2,
1018 class BracketTracker:
1019 """Keeps track of brackets on a line."""
1022 bracket_match: Dict[Tuple[Depth, NodeType], Leaf] = Factory(dict)
1023 delimiters: Dict[LeafID, Priority] = Factory(dict)
1024 previous: Optional[Leaf] = None
1025 _for_loop_depths: List[int] = Factory(list)
1026 _lambda_argument_depths: List[int] = Factory(list)
1028 def mark(self, leaf: Leaf) -> None:
1029 """Mark `leaf` with bracket-related metadata. Keep track of delimiters.
1031 All leaves receive an int `bracket_depth` field that stores how deep
1032 within brackets a given leaf is. 0 means there are no enclosing brackets
1033 that started on this line.
1035 If a leaf is itself a closing bracket, it receives an `opening_bracket`
1036 field that it forms a pair with. This is a one-directional link to
1037 avoid reference cycles.
1039 If a leaf is a delimiter (a token on which Black can split the line if
1040 needed) and it's on depth 0, its `id()` is stored in the tracker's
1043 if leaf.type == token.COMMENT:
1046 self.maybe_decrement_after_for_loop_variable(leaf)
1047 self.maybe_decrement_after_lambda_arguments(leaf)
1048 if leaf.type in CLOSING_BRACKETS:
1050 opening_bracket = self.bracket_match.pop((self.depth, leaf.type))
1051 leaf.opening_bracket = opening_bracket
1052 leaf.bracket_depth = self.depth
1054 delim = is_split_before_delimiter(leaf, self.previous)
1055 if delim and self.previous is not None:
1056 self.delimiters[id(self.previous)] = delim
1058 delim = is_split_after_delimiter(leaf, self.previous)
1060 self.delimiters[id(leaf)] = delim
1061 if leaf.type in OPENING_BRACKETS:
1062 self.bracket_match[self.depth, BRACKET[leaf.type]] = leaf
1064 self.previous = leaf
1065 self.maybe_increment_lambda_arguments(leaf)
1066 self.maybe_increment_for_loop_variable(leaf)
1068 def any_open_brackets(self) -> bool:
1069 """Return True if there is an yet unmatched open bracket on the line."""
1070 return bool(self.bracket_match)
1072 def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> Priority:
1073 """Return the highest priority of a delimiter found on the line.
1075 Values are consistent with what `is_split_*_delimiter()` return.
1076 Raises ValueError on no delimiters.
1078 return max(v for k, v in self.delimiters.items() if k not in exclude)
1080 def delimiter_count_with_priority(self, priority: Priority = 0) -> int:
1081 """Return the number of delimiters with the given `priority`.
1083 If no `priority` is passed, defaults to max priority on the line.
1085 if not self.delimiters:
1088 priority = priority or self.max_delimiter_priority()
1089 return sum(1 for p in self.delimiters.values() if p == priority)
1091 def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
1092 """In a for loop, or comprehension, the variables are often unpacks.
1094 To avoid splitting on the comma in this situation, increase the depth of
1095 tokens between `for` and `in`.
1097 if leaf.type == token.NAME and leaf.value == "for":
1099 self._for_loop_depths.append(self.depth)
1104 def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
1105 """See `maybe_increment_for_loop_variable` above for explanation."""
1107 self._for_loop_depths
1108 and self._for_loop_depths[-1] == self.depth
1109 and leaf.type == token.NAME
1110 and leaf.value == "in"
1113 self._for_loop_depths.pop()
1118 def maybe_increment_lambda_arguments(self, leaf: Leaf) -> bool:
1119 """In a lambda expression, there might be more than one argument.
1121 To avoid splitting on the comma in this situation, increase the depth of
1122 tokens between `lambda` and `:`.
1124 if leaf.type == token.NAME and leaf.value == "lambda":
1126 self._lambda_argument_depths.append(self.depth)
1131 def maybe_decrement_after_lambda_arguments(self, leaf: Leaf) -> bool:
1132 """See `maybe_increment_lambda_arguments` above for explanation."""
1134 self._lambda_argument_depths
1135 and self._lambda_argument_depths[-1] == self.depth
1136 and leaf.type == token.COLON
1139 self._lambda_argument_depths.pop()
1144 def get_open_lsqb(self) -> Optional[Leaf]:
1145 """Return the most recent opening square bracket (if any)."""
1146 return self.bracket_match.get((self.depth - 1, token.RSQB))
1151 """Holds leaves and comments. Can be printed with `str(line)`."""
1154 leaves: List[Leaf] = Factory(list)
1155 comments: Dict[LeafID, List[Leaf]] = Factory(dict) # keys ordered like `leaves`
1156 bracket_tracker: BracketTracker = Factory(BracketTracker)
1157 inside_brackets: bool = False
1158 should_explode: bool = False
1160 def append(self, leaf: Leaf, preformatted: bool = False) -> None:
1161 """Add a new `leaf` to the end of the line.
1163 Unless `preformatted` is True, the `leaf` will receive a new consistent
1164 whitespace prefix and metadata applied by :class:`BracketTracker`.
1165 Trailing commas are maybe removed, unpacked for loop variables are
1166 demoted from being delimiters.
1168 Inline comments are put aside.
1170 has_value = leaf.type in BRACKETS or bool(leaf.value.strip())
1174 if token.COLON == leaf.type and self.is_class_paren_empty:
1175 del self.leaves[-2:]
1176 if self.leaves and not preformatted:
1177 # Note: at this point leaf.prefix should be empty except for
1178 # imports, for which we only preserve newlines.
1179 leaf.prefix += whitespace(
1180 leaf, complex_subscript=self.is_complex_subscript(leaf)
1182 if self.inside_brackets or not preformatted:
1183 self.bracket_tracker.mark(leaf)
1184 self.maybe_remove_trailing_comma(leaf)
1185 if not self.append_comment(leaf):
1186 self.leaves.append(leaf)
1188 def append_safe(self, leaf: Leaf, preformatted: bool = False) -> None:
1189 """Like :func:`append()` but disallow invalid standalone comment structure.
1191 Raises ValueError when any `leaf` is appended after a standalone comment
1192 or when a standalone comment is not the first leaf on the line.
1194 if self.bracket_tracker.depth == 0:
1196 raise ValueError("cannot append to standalone comments")
1198 if self.leaves and leaf.type == STANDALONE_COMMENT:
1200 "cannot append standalone comments to a populated line"
1203 self.append(leaf, preformatted=preformatted)
1206 def is_comment(self) -> bool:
1207 """Is this line a standalone comment?"""
1208 return len(self.leaves) == 1 and self.leaves[0].type == STANDALONE_COMMENT
1211 def is_decorator(self) -> bool:
1212 """Is this line a decorator?"""
1213 return bool(self) and self.leaves[0].type == token.AT
1216 def is_import(self) -> bool:
1217 """Is this an import line?"""
1218 return bool(self) and is_import(self.leaves[0])
1221 def is_class(self) -> bool:
1222 """Is this line a class definition?"""
1225 and self.leaves[0].type == token.NAME
1226 and self.leaves[0].value == "class"
1230 def is_stub_class(self) -> bool:
1231 """Is this line a class definition with a body consisting only of "..."?"""
1232 return self.is_class and self.leaves[-3:] == [
1233 Leaf(token.DOT, ".") for _ in range(3)
1237 def is_def(self) -> bool:
1238 """Is this a function definition? (Also returns True for async defs.)"""
1240 first_leaf = self.leaves[0]
1245 second_leaf: Optional[Leaf] = self.leaves[1]
1248 return (first_leaf.type == token.NAME and first_leaf.value == "def") or (
1249 first_leaf.type == token.ASYNC
1250 and second_leaf is not None
1251 and second_leaf.type == token.NAME
1252 and second_leaf.value == "def"
1256 def is_class_paren_empty(self) -> bool:
1257 """Is this a class with no base classes but using parentheses?
1259 Those are unnecessary and should be removed.
1263 and len(self.leaves) == 4
1265 and self.leaves[2].type == token.LPAR
1266 and self.leaves[2].value == "("
1267 and self.leaves[3].type == token.RPAR
1268 and self.leaves[3].value == ")"
1272 def is_triple_quoted_string(self) -> bool:
1273 """Is the line a triple quoted string?"""
1276 and self.leaves[0].type == token.STRING
1277 and self.leaves[0].value.startswith(('"""', "'''"))
1280 def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
1281 """If so, needs to be split before emitting."""
1282 for leaf in self.leaves:
1283 if leaf.type == STANDALONE_COMMENT:
1284 if leaf.bracket_depth <= depth_limit:
1288 def contains_inner_type_comments(self) -> bool:
1291 last_leaf = self.leaves[-1]
1292 ignored_ids.add(id(last_leaf))
1293 if last_leaf.type == token.COMMA or (
1294 last_leaf.type == token.RPAR and not last_leaf.value
1296 # When trailing commas or optional parens are inserted by Black for
1297 # consistency, comments after the previous last element are not moved
1298 # (they don't have to, rendering will still be correct). So we ignore
1299 # trailing commas and invisible.
1300 last_leaf = self.leaves[-2]
1301 ignored_ids.add(id(last_leaf))
1305 for leaf_id, comments in self.comments.items():
1306 if leaf_id in ignored_ids:
1309 for comment in comments:
1310 if is_type_comment(comment):
1315 def contains_multiline_strings(self) -> bool:
1316 for leaf in self.leaves:
1317 if is_multiline_string(leaf):
1322 def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
1323 """Remove trailing comma if there is one and it's safe."""
1326 and self.leaves[-1].type == token.COMMA
1327 and closing.type in CLOSING_BRACKETS
1331 if closing.type == token.RBRACE:
1332 self.remove_trailing_comma()
1335 if closing.type == token.RSQB:
1336 comma = self.leaves[-1]
1337 if comma.parent and comma.parent.type == syms.listmaker:
1338 self.remove_trailing_comma()
1341 # For parens let's check if it's safe to remove the comma.
1342 # Imports are always safe.
1344 self.remove_trailing_comma()
1347 # Otherwise, if the trailing one is the only one, we might mistakenly
1348 # change a tuple into a different type by removing the comma.
1349 depth = closing.bracket_depth + 1
1351 opening = closing.opening_bracket
1352 for _opening_index, leaf in enumerate(self.leaves):
1359 for leaf in self.leaves[_opening_index + 1 :]:
1363 bracket_depth = leaf.bracket_depth
1364 if bracket_depth == depth and leaf.type == token.COMMA:
1366 if leaf.parent and leaf.parent.type in {
1374 self.remove_trailing_comma()
1379 def append_comment(self, comment: Leaf) -> bool:
1380 """Add an inline or standalone comment to the line."""
1382 comment.type == STANDALONE_COMMENT
1383 and self.bracket_tracker.any_open_brackets()
1388 if comment.type != token.COMMENT:
1392 comment.type = STANDALONE_COMMENT
1396 last_leaf = self.leaves[-1]
1398 last_leaf.type == token.RPAR
1399 and not last_leaf.value
1400 and last_leaf.parent
1401 and len(list(last_leaf.parent.leaves())) <= 3
1402 and not is_type_comment(comment)
1404 # Comments on an optional parens wrapping a single leaf should belong to
1405 # the wrapped node except if it's a type comment. Pinning the comment like
1406 # this avoids unstable formatting caused by comment migration.
1407 if len(self.leaves) < 2:
1408 comment.type = STANDALONE_COMMENT
1411 last_leaf = self.leaves[-2]
1412 self.comments.setdefault(id(last_leaf), []).append(comment)
1415 def comments_after(self, leaf: Leaf) -> List[Leaf]:
1416 """Generate comments that should appear directly after `leaf`."""
1417 return self.comments.get(id(leaf), [])
1419 def remove_trailing_comma(self) -> None:
1420 """Remove the trailing comma and moves the comments attached to it."""
1421 trailing_comma = self.leaves.pop()
1422 trailing_comma_comments = self.comments.pop(id(trailing_comma), [])
1423 self.comments.setdefault(id(self.leaves[-1]), []).extend(
1424 trailing_comma_comments
1427 def is_complex_subscript(self, leaf: Leaf) -> bool:
1428 """Return True iff `leaf` is part of a slice with non-trivial exprs."""
1429 open_lsqb = self.bracket_tracker.get_open_lsqb()
1430 if open_lsqb is None:
1433 subscript_start = open_lsqb.next_sibling
1435 if isinstance(subscript_start, Node):
1436 if subscript_start.type == syms.listmaker:
1439 if subscript_start.type == syms.subscriptlist:
1440 subscript_start = child_towards(subscript_start, leaf)
1441 return subscript_start is not None and any(
1442 n.type in TEST_DESCENDANTS for n in subscript_start.pre_order()
1445 def __str__(self) -> str:
1446 """Render the line."""
1450 indent = " " * self.depth
1451 leaves = iter(self.leaves)
1452 first = next(leaves)
1453 res = f"{first.prefix}{indent}{first.value}"
1456 for comment in itertools.chain.from_iterable(self.comments.values()):
1460 def __bool__(self) -> bool:
1461 """Return True if the line has leaves or comments."""
1462 return bool(self.leaves or self.comments)
1466 class EmptyLineTracker:
1467 """Provides a stateful method that returns the number of potential extra
1468 empty lines needed before and after the currently processed line.
1470 Note: this tracker works on lines that haven't been split yet. It assumes
1471 the prefix of the first leaf consists of optional newlines. Those newlines
1472 are consumed by `maybe_empty_lines()` and included in the computation.
1475 is_pyi: bool = False
1476 previous_line: Optional[Line] = None
1477 previous_after: int = 0
1478 previous_defs: List[int] = Factory(list)
1480 def maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1481 """Return the number of extra empty lines before and after the `current_line`.
1483 This is for separating `def`, `async def` and `class` with extra empty
1484 lines (two on module-level).
1486 before, after = self._maybe_empty_lines(current_line)
1488 # Black should not insert empty lines at the beginning
1491 if self.previous_line is None
1492 else before - self.previous_after
1494 self.previous_after = after
1495 self.previous_line = current_line
1496 return before, after
1498 def _maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1500 if current_line.depth == 0:
1501 max_allowed = 1 if self.is_pyi else 2
1502 if current_line.leaves:
1503 # Consume the first leaf's extra newlines.
1504 first_leaf = current_line.leaves[0]
1505 before = first_leaf.prefix.count("\n")
1506 before = min(before, max_allowed)
1507 first_leaf.prefix = ""
1510 depth = current_line.depth
1511 while self.previous_defs and self.previous_defs[-1] >= depth:
1512 self.previous_defs.pop()
1514 before = 0 if depth else 1
1516 before = 1 if depth else 2
1517 if current_line.is_decorator or current_line.is_def or current_line.is_class:
1518 return self._maybe_empty_lines_for_class_or_def(current_line, before)
1522 and self.previous_line.is_import
1523 and not current_line.is_import
1524 and depth == self.previous_line.depth
1526 return (before or 1), 0
1530 and self.previous_line.is_class
1531 and current_line.is_triple_quoted_string
1537 def _maybe_empty_lines_for_class_or_def(
1538 self, current_line: Line, before: int
1539 ) -> Tuple[int, int]:
1540 if not current_line.is_decorator:
1541 self.previous_defs.append(current_line.depth)
1542 if self.previous_line is None:
1543 # Don't insert empty lines before the first line in the file.
1546 if self.previous_line.is_decorator:
1549 if self.previous_line.depth < current_line.depth and (
1550 self.previous_line.is_class or self.previous_line.is_def
1555 self.previous_line.is_comment
1556 and self.previous_line.depth == current_line.depth
1562 if self.previous_line.depth > current_line.depth:
1564 elif current_line.is_class or self.previous_line.is_class:
1565 if current_line.is_stub_class and self.previous_line.is_stub_class:
1566 # No blank line between classes with an empty body
1570 elif current_line.is_def and not self.previous_line.is_def:
1571 # Blank line between a block of functions and a block of non-functions
1577 if current_line.depth and newlines:
1583 class LineGenerator(Visitor[Line]):
1584 """Generates reformatted Line objects. Empty lines are not emitted.
1586 Note: destroys the tree it's visiting by mutating prefixes of its leaves
1587 in ways that will no longer stringify to valid Python code on the tree.
1590 is_pyi: bool = False
1591 normalize_strings: bool = True
1592 current_line: Line = Factory(Line)
1593 remove_u_prefix: bool = False
1595 def line(self, indent: int = 0) -> Iterator[Line]:
1598 If the line is empty, only emit if it makes sense.
1599 If the line is too long, split it first and then generate.
1601 If any lines were generated, set up a new current_line.
1603 if not self.current_line:
1604 self.current_line.depth += indent
1605 return # Line is empty, don't emit. Creating a new one unnecessary.
1607 complete_line = self.current_line
1608 self.current_line = Line(depth=complete_line.depth + indent)
1611 def visit_default(self, node: LN) -> Iterator[Line]:
1612 """Default `visit_*()` implementation. Recurses to children of `node`."""
1613 if isinstance(node, Leaf):
1614 any_open_brackets = self.current_line.bracket_tracker.any_open_brackets()
1615 for comment in generate_comments(node):
1616 if any_open_brackets:
1617 # any comment within brackets is subject to splitting
1618 self.current_line.append(comment)
1619 elif comment.type == token.COMMENT:
1620 # regular trailing comment
1621 self.current_line.append(comment)
1622 yield from self.line()
1625 # regular standalone comment
1626 yield from self.line()
1628 self.current_line.append(comment)
1629 yield from self.line()
1631 normalize_prefix(node, inside_brackets=any_open_brackets)
1632 if self.normalize_strings and node.type == token.STRING:
1633 normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
1634 normalize_string_quotes(node)
1635 if node.type == token.NUMBER:
1636 normalize_numeric_literal(node)
1637 if node.type not in WHITESPACE:
1638 self.current_line.append(node)
1639 yield from super().visit_default(node)
1641 def visit_atom(self, node: Node) -> Iterator[Line]:
1642 # Always make parentheses invisible around a single node, because it should
1643 # not be needed (except in the case of yield, where removing the parentheses
1644 # produces a SyntaxError).
1646 len(node.children) == 3
1647 and isinstance(node.children[0], Leaf)
1648 and node.children[0].type == token.LPAR
1649 and isinstance(node.children[2], Leaf)
1650 and node.children[2].type == token.RPAR
1651 and isinstance(node.children[1], Leaf)
1653 node.children[1].type == token.NAME
1654 and node.children[1].value == "yield"
1657 node.children[0].value = ""
1658 node.children[2].value = ""
1659 yield from super().visit_default(node)
1661 def visit_factor(self, node: Node) -> Iterator[Line]:
1662 """Force parentheses between a unary op and a binary power:
1664 -2 ** 8 -> -(2 ** 8)
1666 child = node.children[1]
1667 if child.type == syms.power and len(child.children) == 3:
1668 lpar = Leaf(token.LPAR, "(")
1669 rpar = Leaf(token.RPAR, ")")
1670 index = child.remove() or 0
1671 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
1672 yield from self.visit_default(node)
1674 def visit_INDENT(self, node: Node) -> Iterator[Line]:
1675 """Increase indentation level, maybe yield a line."""
1676 # In blib2to3 INDENT never holds comments.
1677 yield from self.line(+1)
1678 yield from self.visit_default(node)
1680 def visit_DEDENT(self, node: Node) -> Iterator[Line]:
1681 """Decrease indentation level, maybe yield a line."""
1682 # The current line might still wait for trailing comments. At DEDENT time
1683 # there won't be any (they would be prefixes on the preceding NEWLINE).
1684 # Emit the line then.
1685 yield from self.line()
1687 # While DEDENT has no value, its prefix may contain standalone comments
1688 # that belong to the current indentation level. Get 'em.
1689 yield from self.visit_default(node)
1691 # Finally, emit the dedent.
1692 yield from self.line(-1)
1695 self, node: Node, keywords: Set[str], parens: Set[str]
1696 ) -> Iterator[Line]:
1697 """Visit a statement.
1699 This implementation is shared for `if`, `while`, `for`, `try`, `except`,
1700 `def`, `with`, `class`, `assert` and assignments.
1702 The relevant Python language `keywords` for a given statement will be
1703 NAME leaves within it. This methods puts those on a separate line.
1705 `parens` holds a set of string leaf values immediately after which
1706 invisible parens should be put.
1708 normalize_invisible_parens(node, parens_after=parens)
1709 for child in node.children:
1710 if child.type == token.NAME and child.value in keywords: # type: ignore
1711 yield from self.line()
1713 yield from self.visit(child)
1715 def visit_suite(self, node: Node) -> Iterator[Line]:
1716 """Visit a suite."""
1717 if self.is_pyi and is_stub_suite(node):
1718 yield from self.visit(node.children[2])
1720 yield from self.visit_default(node)
1722 def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
1723 """Visit a statement without nested statements."""
1724 is_suite_like = node.parent and node.parent.type in STATEMENT
1726 if self.is_pyi and is_stub_body(node):
1727 yield from self.visit_default(node)
1729 yield from self.line(+1)
1730 yield from self.visit_default(node)
1731 yield from self.line(-1)
1734 if not self.is_pyi or not node.parent or not is_stub_suite(node.parent):
1735 yield from self.line()
1736 yield from self.visit_default(node)
1738 def visit_async_stmt(self, node: Node) -> Iterator[Line]:
1739 """Visit `async def`, `async for`, `async with`."""
1740 yield from self.line()
1742 children = iter(node.children)
1743 for child in children:
1744 yield from self.visit(child)
1746 if child.type == token.ASYNC:
1749 internal_stmt = next(children)
1750 for child in internal_stmt.children:
1751 yield from self.visit(child)
1753 def visit_decorators(self, node: Node) -> Iterator[Line]:
1754 """Visit decorators."""
1755 for child in node.children:
1756 yield from self.line()
1757 yield from self.visit(child)
1759 def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
1760 """Remove a semicolon and put the other statement on a separate line."""
1761 yield from self.line()
1763 def visit_ENDMARKER(self, leaf: Leaf) -> Iterator[Line]:
1764 """End of file. Process outstanding comments and end with a newline."""
1765 yield from self.visit_default(leaf)
1766 yield from self.line()
1768 def visit_STANDALONE_COMMENT(self, leaf: Leaf) -> Iterator[Line]:
1769 if not self.current_line.bracket_tracker.any_open_brackets():
1770 yield from self.line()
1771 yield from self.visit_default(leaf)
1773 def __attrs_post_init__(self) -> None:
1774 """You are in a twisty little maze of passages."""
1777 self.visit_assert_stmt = partial(v, keywords={"assert"}, parens={"assert", ","})
1778 self.visit_if_stmt = partial(
1779 v, keywords={"if", "else", "elif"}, parens={"if", "elif"}
1781 self.visit_while_stmt = partial(v, keywords={"while", "else"}, parens={"while"})
1782 self.visit_for_stmt = partial(v, keywords={"for", "else"}, parens={"for", "in"})
1783 self.visit_try_stmt = partial(
1784 v, keywords={"try", "except", "else", "finally"}, parens=Ø
1786 self.visit_except_clause = partial(v, keywords={"except"}, parens=Ø)
1787 self.visit_with_stmt = partial(v, keywords={"with"}, parens=Ø)
1788 self.visit_funcdef = partial(v, keywords={"def"}, parens=Ø)
1789 self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
1790 self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
1791 self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
1792 self.visit_import_from = partial(v, keywords=Ø, parens={"import"})
1793 self.visit_del_stmt = partial(v, keywords=Ø, parens={"del"})
1794 self.visit_async_funcdef = self.visit_async_stmt
1795 self.visit_decorated = self.visit_decorators
1798 IMPLICIT_TUPLE = {syms.testlist, syms.testlist_star_expr, syms.exprlist}
1799 BRACKET = {token.LPAR: token.RPAR, token.LSQB: token.RSQB, token.LBRACE: token.RBRACE}
1800 OPENING_BRACKETS = set(BRACKET.keys())
1801 CLOSING_BRACKETS = set(BRACKET.values())
1802 BRACKETS = OPENING_BRACKETS | CLOSING_BRACKETS
1803 ALWAYS_NO_SPACE = CLOSING_BRACKETS | {token.COMMA, STANDALONE_COMMENT}
1806 def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str: # noqa: C901
1807 """Return whitespace prefix if needed for the given `leaf`.
1809 `complex_subscript` signals whether the given leaf is part of a subscription
1810 which has non-trivial arguments, like arithmetic expressions or function calls.
1818 if t in ALWAYS_NO_SPACE:
1821 if t == token.COMMENT:
1824 assert p is not None, f"INTERNAL ERROR: hand-made leaf without parent: {leaf!r}"
1825 if t == token.COLON and p.type not in {
1832 prev = leaf.prev_sibling
1834 prevp = preceding_leaf(p)
1835 if not prevp or prevp.type in OPENING_BRACKETS:
1838 if t == token.COLON:
1839 if prevp.type == token.COLON:
1842 elif prevp.type != token.COMMA and not complex_subscript:
1847 if prevp.type == token.EQUAL:
1849 if prevp.parent.type in {
1857 elif prevp.parent.type == syms.typedargslist:
1858 # A bit hacky: if the equal sign has whitespace, it means we
1859 # previously found it's a typed argument. So, we're using
1863 elif prevp.type in VARARGS_SPECIALS:
1864 if is_vararg(prevp, within=VARARGS_PARENTS | UNPACKING_PARENTS):
1867 elif prevp.type == token.COLON:
1868 if prevp.parent and prevp.parent.type in {syms.subscript, syms.sliceop}:
1869 return SPACE if complex_subscript else NO
1873 and prevp.parent.type == syms.factor
1874 and prevp.type in MATH_OPERATORS
1879 prevp.type == token.RIGHTSHIFT
1881 and prevp.parent.type == syms.shift_expr
1882 and prevp.prev_sibling
1883 and prevp.prev_sibling.type == token.NAME
1884 and prevp.prev_sibling.value == "print" # type: ignore
1886 # Python 2 print chevron
1889 elif prev.type in OPENING_BRACKETS:
1892 if p.type in {syms.parameters, syms.arglist}:
1893 # untyped function signatures or calls
1894 if not prev or prev.type != token.COMMA:
1897 elif p.type == syms.varargslist:
1899 if prev and prev.type != token.COMMA:
1902 elif p.type == syms.typedargslist:
1903 # typed function signatures
1907 if t == token.EQUAL:
1908 if prev.type != syms.tname:
1911 elif prev.type == token.EQUAL:
1912 # A bit hacky: if the equal sign has whitespace, it means we
1913 # previously found it's a typed argument. So, we're using that, too.
1916 elif prev.type != token.COMMA:
1919 elif p.type == syms.tname:
1922 prevp = preceding_leaf(p)
1923 if not prevp or prevp.type != token.COMMA:
1926 elif p.type == syms.trailer:
1927 # attributes and calls
1928 if t == token.LPAR or t == token.RPAR:
1933 prevp = preceding_leaf(p)
1934 if not prevp or prevp.type != token.NUMBER:
1937 elif t == token.LSQB:
1940 elif prev.type != token.COMMA:
1943 elif p.type == syms.argument:
1945 if t == token.EQUAL:
1949 prevp = preceding_leaf(p)
1950 if not prevp or prevp.type == token.LPAR:
1953 elif prev.type in {token.EQUAL} | VARARGS_SPECIALS:
1956 elif p.type == syms.decorator:
1960 elif p.type == syms.dotted_name:
1964 prevp = preceding_leaf(p)
1965 if not prevp or prevp.type == token.AT or prevp.type == token.DOT:
1968 elif p.type == syms.classdef:
1972 if prev and prev.type == token.LPAR:
1975 elif p.type in {syms.subscript, syms.sliceop}:
1978 assert p.parent is not None, "subscripts are always parented"
1979 if p.parent.type == syms.subscriptlist:
1984 elif not complex_subscript:
1987 elif p.type == syms.atom:
1988 if prev and t == token.DOT:
1989 # dots, but not the first one.
1992 elif p.type == syms.dictsetmaker:
1994 if prev and prev.type == token.DOUBLESTAR:
1997 elif p.type in {syms.factor, syms.star_expr}:
2000 prevp = preceding_leaf(p)
2001 if not prevp or prevp.type in OPENING_BRACKETS:
2004 prevp_parent = prevp.parent
2005 assert prevp_parent is not None
2006 if prevp.type == token.COLON and prevp_parent.type in {
2012 elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
2015 elif t in {token.NAME, token.NUMBER, token.STRING}:
2018 elif p.type == syms.import_from:
2020 if prev and prev.type == token.DOT:
2023 elif t == token.NAME:
2027 if prev and prev.type == token.DOT:
2030 elif p.type == syms.sliceop:
2036 def preceding_leaf(node: Optional[LN]) -> Optional[Leaf]:
2037 """Return the first leaf that precedes `node`, if any."""
2039 res = node.prev_sibling
2041 if isinstance(res, Leaf):
2045 return list(res.leaves())[-1]
2054 def child_towards(ancestor: Node, descendant: LN) -> Optional[LN]:
2055 """Return the child of `ancestor` that contains `descendant`."""
2056 node: Optional[LN] = descendant
2057 while node and node.parent != ancestor:
2062 def container_of(leaf: Leaf) -> LN:
2063 """Return `leaf` or one of its ancestors that is the topmost container of it.
2065 By "container" we mean a node where `leaf` is the very first child.
2067 same_prefix = leaf.prefix
2068 container: LN = leaf
2070 parent = container.parent
2074 if parent.children[0].prefix != same_prefix:
2077 if parent.type == syms.file_input:
2080 if parent.prev_sibling is not None and parent.prev_sibling.type in BRACKETS:
2087 def is_split_after_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2088 """Return the priority of the `leaf` delimiter, given a line break after it.
2090 The delimiter priorities returned here are from those delimiters that would
2091 cause a line break after themselves.
2093 Higher numbers are higher priority.
2095 if leaf.type == token.COMMA:
2096 return COMMA_PRIORITY
2101 def is_split_before_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2102 """Return the priority of the `leaf` delimiter, given a line break before it.
2104 The delimiter priorities returned here are from those delimiters that would
2105 cause a line break before themselves.
2107 Higher numbers are higher priority.
2109 if is_vararg(leaf, within=VARARGS_PARENTS | UNPACKING_PARENTS):
2110 # * and ** might also be MATH_OPERATORS but in this case they are not.
2111 # Don't treat them as a delimiter.
2115 leaf.type == token.DOT
2117 and leaf.parent.type not in {syms.import_from, syms.dotted_name}
2118 and (previous is None or previous.type in CLOSING_BRACKETS)
2123 leaf.type in MATH_OPERATORS
2125 and leaf.parent.type not in {syms.factor, syms.star_expr}
2127 return MATH_PRIORITIES[leaf.type]
2129 if leaf.type in COMPARATORS:
2130 return COMPARATOR_PRIORITY
2133 leaf.type == token.STRING
2134 and previous is not None
2135 and previous.type == token.STRING
2137 return STRING_PRIORITY
2139 if leaf.type not in {token.NAME, token.ASYNC}:
2145 and leaf.parent.type in {syms.comp_for, syms.old_comp_for}
2146 or leaf.type == token.ASYNC
2149 not isinstance(leaf.prev_sibling, Leaf)
2150 or leaf.prev_sibling.value != "async"
2152 return COMPREHENSION_PRIORITY
2157 and leaf.parent.type in {syms.comp_if, syms.old_comp_if}
2159 return COMPREHENSION_PRIORITY
2161 if leaf.value in {"if", "else"} and leaf.parent and leaf.parent.type == syms.test:
2162 return TERNARY_PRIORITY
2164 if leaf.value == "is":
2165 return COMPARATOR_PRIORITY
2170 and leaf.parent.type in {syms.comp_op, syms.comparison}
2172 previous is not None
2173 and previous.type == token.NAME
2174 and previous.value == "not"
2177 return COMPARATOR_PRIORITY
2182 and leaf.parent.type == syms.comp_op
2184 previous is not None
2185 and previous.type == token.NAME
2186 and previous.value == "is"
2189 return COMPARATOR_PRIORITY
2191 if leaf.value in LOGIC_OPERATORS and leaf.parent:
2192 return LOGIC_PRIORITY
2197 FMT_OFF = {"# fmt: off", "# fmt:off", "# yapf: disable"}
2198 FMT_ON = {"# fmt: on", "# fmt:on", "# yapf: enable"}
2201 def generate_comments(leaf: LN) -> Iterator[Leaf]:
2202 """Clean the prefix of the `leaf` and generate comments from it, if any.
2204 Comments in lib2to3 are shoved into the whitespace prefix. This happens
2205 in `pgen2/driver.py:Driver.parse_tokens()`. This was a brilliant implementation
2206 move because it does away with modifying the grammar to include all the
2207 possible places in which comments can be placed.
2209 The sad consequence for us though is that comments don't "belong" anywhere.
2210 This is why this function generates simple parentless Leaf objects for
2211 comments. We simply don't know what the correct parent should be.
2213 No matter though, we can live without this. We really only need to
2214 differentiate between inline and standalone comments. The latter don't
2215 share the line with any code.
2217 Inline comments are emitted as regular token.COMMENT leaves. Standalone
2218 are emitted with a fake STANDALONE_COMMENT token identifier.
2220 for pc in list_comments(leaf.prefix, is_endmarker=leaf.type == token.ENDMARKER):
2221 yield Leaf(pc.type, pc.value, prefix="\n" * pc.newlines)
2226 """Describes a piece of syntax that is a comment.
2228 It's not a :class:`blib2to3.pytree.Leaf` so that:
2230 * it can be cached (`Leaf` objects should not be reused more than once as
2231 they store their lineno, column, prefix, and parent information);
2232 * `newlines` and `consumed` fields are kept separate from the `value`. This
2233 simplifies handling of special marker comments like ``# fmt: off/on``.
2236 type: int # token.COMMENT or STANDALONE_COMMENT
2237 value: str # content of the comment
2238 newlines: int # how many newlines before the comment
2239 consumed: int # how many characters of the original leaf's prefix did we consume
2242 @lru_cache(maxsize=4096)
2243 def list_comments(prefix: str, *, is_endmarker: bool) -> List[ProtoComment]:
2244 """Return a list of :class:`ProtoComment` objects parsed from the given `prefix`."""
2245 result: List[ProtoComment] = []
2246 if not prefix or "#" not in prefix:
2252 for index, line in enumerate(prefix.split("\n")):
2253 consumed += len(line) + 1 # adding the length of the split '\n'
2254 line = line.lstrip()
2257 if not line.startswith("#"):
2258 # Escaped newlines outside of a comment are not really newlines at
2259 # all. We treat a single-line comment following an escaped newline
2260 # as a simple trailing comment.
2261 if line.endswith("\\"):
2265 if index == ignored_lines and not is_endmarker:
2266 comment_type = token.COMMENT # simple trailing comment
2268 comment_type = STANDALONE_COMMENT
2269 comment = make_comment(line)
2272 type=comment_type, value=comment, newlines=nlines, consumed=consumed
2279 def make_comment(content: str) -> str:
2280 """Return a consistently formatted comment from the given `content` string.
2282 All comments (except for "##", "#!", "#:", '#'", "#%%") should have a single
2283 space between the hash sign and the content.
2285 If `content` didn't start with a hash sign, one is provided.
2287 content = content.rstrip()
2291 if content[0] == "#":
2292 content = content[1:]
2293 if content and content[0] not in " !:#'%":
2294 content = " " + content
2295 return "#" + content
2301 inner: bool = False,
2302 features: Collection[Feature] = (),
2303 ) -> Iterator[Line]:
2304 """Split a `line` into potentially many lines.
2306 They should fit in the allotted `line_length` but might not be able to.
2307 `inner` signifies that there were a pair of brackets somewhere around the
2308 current `line`, possibly transitively. This means we can fallback to splitting
2309 by delimiters if the LHS/RHS don't yield any results.
2311 `features` are syntactical features that may be used in the output.
2317 line_str = str(line).strip("\n")
2320 not line.contains_inner_type_comments()
2321 and not line.should_explode
2322 and is_line_short_enough(line, line_length=line_length, line_str=line_str)
2327 split_funcs: List[SplitFunc]
2329 split_funcs = [left_hand_split]
2332 def rhs(line: Line, features: Collection[Feature]) -> Iterator[Line]:
2333 for omit in generate_trailers_to_omit(line, line_length):
2334 lines = list(right_hand_split(line, line_length, features, omit=omit))
2335 if is_line_short_enough(lines[0], line_length=line_length):
2339 # All splits failed, best effort split with no omits.
2340 # This mostly happens to multiline strings that are by definition
2341 # reported as not fitting a single line.
2342 yield from right_hand_split(line, line_length, features=features)
2344 if line.inside_brackets:
2345 split_funcs = [delimiter_split, standalone_comment_split, rhs]
2348 for split_func in split_funcs:
2349 # We are accumulating lines in `result` because we might want to abort
2350 # mission and return the original line in the end, or attempt a different
2352 result: List[Line] = []
2354 for l in split_func(line, features):
2355 if str(l).strip("\n") == line_str:
2356 raise CannotSplit("Split function returned an unchanged result")
2360 l, line_length=line_length, inner=True, features=features
2374 def left_hand_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2375 """Split line into many lines, starting with the first matching bracket pair.
2377 Note: this usually looks weird, only use this for function definitions.
2378 Prefer RHS otherwise. This is why this function is not symmetrical with
2379 :func:`right_hand_split` which also handles optional parentheses.
2381 tail_leaves: List[Leaf] = []
2382 body_leaves: List[Leaf] = []
2383 head_leaves: List[Leaf] = []
2384 current_leaves = head_leaves
2385 matching_bracket = None
2386 for leaf in line.leaves:
2388 current_leaves is body_leaves
2389 and leaf.type in CLOSING_BRACKETS
2390 and leaf.opening_bracket is matching_bracket
2392 current_leaves = tail_leaves if body_leaves else head_leaves
2393 current_leaves.append(leaf)
2394 if current_leaves is head_leaves:
2395 if leaf.type in OPENING_BRACKETS:
2396 matching_bracket = leaf
2397 current_leaves = body_leaves
2398 if not matching_bracket:
2399 raise CannotSplit("No brackets found")
2401 head = bracket_split_build_line(head_leaves, line, matching_bracket)
2402 body = bracket_split_build_line(body_leaves, line, matching_bracket, is_body=True)
2403 tail = bracket_split_build_line(tail_leaves, line, matching_bracket)
2404 bracket_split_succeeded_or_raise(head, body, tail)
2405 for result in (head, body, tail):
2410 def right_hand_split(
2413 features: Collection[Feature] = (),
2414 omit: Collection[LeafID] = (),
2415 ) -> Iterator[Line]:
2416 """Split line into many lines, starting with the last matching bracket pair.
2418 If the split was by optional parentheses, attempt splitting without them, too.
2419 `omit` is a collection of closing bracket IDs that shouldn't be considered for
2422 Note: running this function modifies `bracket_depth` on the leaves of `line`.
2424 tail_leaves: List[Leaf] = []
2425 body_leaves: List[Leaf] = []
2426 head_leaves: List[Leaf] = []
2427 current_leaves = tail_leaves
2428 opening_bracket = None
2429 closing_bracket = None
2430 for leaf in reversed(line.leaves):
2431 if current_leaves is body_leaves:
2432 if leaf is opening_bracket:
2433 current_leaves = head_leaves if body_leaves else tail_leaves
2434 current_leaves.append(leaf)
2435 if current_leaves is tail_leaves:
2436 if leaf.type in CLOSING_BRACKETS and id(leaf) not in omit:
2437 opening_bracket = leaf.opening_bracket
2438 closing_bracket = leaf
2439 current_leaves = body_leaves
2440 if not (opening_bracket and closing_bracket and head_leaves):
2441 # If there is no opening or closing_bracket that means the split failed and
2442 # all content is in the tail. Otherwise, if `head_leaves` are empty, it means
2443 # the matching `opening_bracket` wasn't available on `line` anymore.
2444 raise CannotSplit("No brackets found")
2446 tail_leaves.reverse()
2447 body_leaves.reverse()
2448 head_leaves.reverse()
2449 head = bracket_split_build_line(head_leaves, line, opening_bracket)
2450 body = bracket_split_build_line(body_leaves, line, opening_bracket, is_body=True)
2451 tail = bracket_split_build_line(tail_leaves, line, opening_bracket)
2452 bracket_split_succeeded_or_raise(head, body, tail)
2454 # the body shouldn't be exploded
2455 not body.should_explode
2456 # the opening bracket is an optional paren
2457 and opening_bracket.type == token.LPAR
2458 and not opening_bracket.value
2459 # the closing bracket is an optional paren
2460 and closing_bracket.type == token.RPAR
2461 and not closing_bracket.value
2462 # it's not an import (optional parens are the only thing we can split on
2463 # in this case; attempting a split without them is a waste of time)
2464 and not line.is_import
2465 # there are no standalone comments in the body
2466 and not body.contains_standalone_comments(0)
2467 # and we can actually remove the parens
2468 and can_omit_invisible_parens(body, line_length)
2470 omit = {id(closing_bracket), *omit}
2472 yield from right_hand_split(line, line_length, features=features, omit=omit)
2478 or is_line_short_enough(body, line_length=line_length)
2481 "Splitting failed, body is still too long and can't be split."
2484 elif head.contains_multiline_strings() or tail.contains_multiline_strings():
2486 "The current optional pair of parentheses is bound to fail to "
2487 "satisfy the splitting algorithm because the head or the tail "
2488 "contains multiline strings which by definition never fit one "
2492 ensure_visible(opening_bracket)
2493 ensure_visible(closing_bracket)
2494 for result in (head, body, tail):
2499 def bracket_split_succeeded_or_raise(head: Line, body: Line, tail: Line) -> None:
2500 """Raise :exc:`CannotSplit` if the last left- or right-hand split failed.
2502 Do nothing otherwise.
2504 A left- or right-hand split is based on a pair of brackets. Content before
2505 (and including) the opening bracket is left on one line, content inside the
2506 brackets is put on a separate line, and finally content starting with and
2507 following the closing bracket is put on a separate line.
2509 Those are called `head`, `body`, and `tail`, respectively. If the split
2510 produced the same line (all content in `head`) or ended up with an empty `body`
2511 and the `tail` is just the closing bracket, then it's considered failed.
2513 tail_len = len(str(tail).strip())
2516 raise CannotSplit("Splitting brackets produced the same line")
2520 f"Splitting brackets on an empty body to save "
2521 f"{tail_len} characters is not worth it"
2525 def bracket_split_build_line(
2526 leaves: List[Leaf], original: Line, opening_bracket: Leaf, *, is_body: bool = False
2528 """Return a new line with given `leaves` and respective comments from `original`.
2530 If `is_body` is True, the result line is one-indented inside brackets and as such
2531 has its first leaf's prefix normalized and a trailing comma added when expected.
2533 result = Line(depth=original.depth)
2535 result.inside_brackets = True
2538 # Since body is a new indent level, remove spurious leading whitespace.
2539 normalize_prefix(leaves[0], inside_brackets=True)
2540 # Ensure a trailing comma for imports and standalone function arguments, but
2541 # be careful not to add one after any comments.
2542 no_commas = original.is_def and not any(
2543 l.type == token.COMMA for l in leaves
2546 if original.is_import or no_commas:
2547 for i in range(len(leaves) - 1, -1, -1):
2548 if leaves[i].type == STANDALONE_COMMENT:
2550 elif leaves[i].type == token.COMMA:
2553 leaves.insert(i + 1, Leaf(token.COMMA, ","))
2557 result.append(leaf, preformatted=True)
2558 for comment_after in original.comments_after(leaf):
2559 result.append(comment_after, preformatted=True)
2561 result.should_explode = should_explode(result, opening_bracket)
2565 def dont_increase_indentation(split_func: SplitFunc) -> SplitFunc:
2566 """Normalize prefix of the first leaf in every line returned by `split_func`.
2568 This is a decorator over relevant split functions.
2572 def split_wrapper(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2573 for l in split_func(line, features):
2574 normalize_prefix(l.leaves[0], inside_brackets=True)
2577 return split_wrapper
2580 @dont_increase_indentation
2581 def delimiter_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2582 """Split according to delimiters of the highest priority.
2584 If the appropriate Features are given, the split will add trailing commas
2585 also in function signatures and calls that contain `*` and `**`.
2588 last_leaf = line.leaves[-1]
2590 raise CannotSplit("Line empty")
2592 bt = line.bracket_tracker
2594 delimiter_priority = bt.max_delimiter_priority(exclude={id(last_leaf)})
2596 raise CannotSplit("No delimiters found")
2598 if delimiter_priority == DOT_PRIORITY:
2599 if bt.delimiter_count_with_priority(delimiter_priority) == 1:
2600 raise CannotSplit("Splitting a single attribute from its owner looks wrong")
2602 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2603 lowest_depth = sys.maxsize
2604 trailing_comma_safe = True
2606 def append_to_line(leaf: Leaf) -> Iterator[Line]:
2607 """Append `leaf` to current line or to new line if appending impossible."""
2608 nonlocal current_line
2610 current_line.append_safe(leaf, preformatted=True)
2614 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2615 current_line.append(leaf)
2617 for leaf in line.leaves:
2618 yield from append_to_line(leaf)
2620 for comment_after in line.comments_after(leaf):
2621 yield from append_to_line(comment_after)
2623 lowest_depth = min(lowest_depth, leaf.bracket_depth)
2624 if leaf.bracket_depth == lowest_depth:
2625 if is_vararg(leaf, within={syms.typedargslist}):
2626 trailing_comma_safe = (
2627 trailing_comma_safe and Feature.TRAILING_COMMA_IN_DEF in features
2629 elif is_vararg(leaf, within={syms.arglist, syms.argument}):
2630 trailing_comma_safe = (
2631 trailing_comma_safe and Feature.TRAILING_COMMA_IN_CALL in features
2634 leaf_priority = bt.delimiters.get(id(leaf))
2635 if leaf_priority == delimiter_priority:
2638 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2642 and delimiter_priority == COMMA_PRIORITY
2643 and current_line.leaves[-1].type != token.COMMA
2644 and current_line.leaves[-1].type != STANDALONE_COMMENT
2646 current_line.append(Leaf(token.COMMA, ","))
2650 @dont_increase_indentation
2651 def standalone_comment_split(
2652 line: Line, features: Collection[Feature] = ()
2653 ) -> Iterator[Line]:
2654 """Split standalone comments from the rest of the line."""
2655 if not line.contains_standalone_comments(0):
2656 raise CannotSplit("Line does not have any standalone comments")
2658 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2660 def append_to_line(leaf: Leaf) -> Iterator[Line]:
2661 """Append `leaf` to current line or to new line if appending impossible."""
2662 nonlocal current_line
2664 current_line.append_safe(leaf, preformatted=True)
2668 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2669 current_line.append(leaf)
2671 for leaf in line.leaves:
2672 yield from append_to_line(leaf)
2674 for comment_after in line.comments_after(leaf):
2675 yield from append_to_line(comment_after)
2681 def is_import(leaf: Leaf) -> bool:
2682 """Return True if the given leaf starts an import statement."""
2689 (v == "import" and p and p.type == syms.import_name)
2690 or (v == "from" and p and p.type == syms.import_from)
2695 def is_type_comment(leaf: Leaf) -> bool:
2696 """Return True if the given leaf is a special comment.
2697 Only returns true for type comments for now."""
2700 return t in {token.COMMENT, t == STANDALONE_COMMENT} and v.startswith("# type:")
2703 def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
2704 """Leave existing extra newlines if not `inside_brackets`. Remove everything
2707 Note: don't use backslashes for formatting or you'll lose your voting rights.
2709 if not inside_brackets:
2710 spl = leaf.prefix.split("#")
2711 if "\\" not in spl[0]:
2712 nl_count = spl[-1].count("\n")
2715 leaf.prefix = "\n" * nl_count
2721 def normalize_string_prefix(leaf: Leaf, remove_u_prefix: bool = False) -> None:
2722 """Make all string prefixes lowercase.
2724 If remove_u_prefix is given, also removes any u prefix from the string.
2726 Note: Mutates its argument.
2728 match = re.match(r"^([furbFURB]*)(.*)$", leaf.value, re.DOTALL)
2729 assert match is not None, f"failed to match string {leaf.value!r}"
2730 orig_prefix = match.group(1)
2731 new_prefix = orig_prefix.lower()
2733 new_prefix = new_prefix.replace("u", "")
2734 leaf.value = f"{new_prefix}{match.group(2)}"
2737 def normalize_string_quotes(leaf: Leaf) -> None:
2738 """Prefer double quotes but only if it doesn't cause more escaping.
2740 Adds or removes backslashes as appropriate. Doesn't parse and fix
2741 strings nested in f-strings (yet).
2743 Note: Mutates its argument.
2745 value = leaf.value.lstrip("furbFURB")
2746 if value[:3] == '"""':
2749 elif value[:3] == "'''":
2752 elif value[0] == '"':
2758 first_quote_pos = leaf.value.find(orig_quote)
2759 if first_quote_pos == -1:
2760 return # There's an internal error
2762 prefix = leaf.value[:first_quote_pos]
2763 unescaped_new_quote = re.compile(rf"(([^\\]|^)(\\\\)*){new_quote}")
2764 escaped_new_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){new_quote}")
2765 escaped_orig_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){orig_quote}")
2766 body = leaf.value[first_quote_pos + len(orig_quote) : -len(orig_quote)]
2767 if "r" in prefix.casefold():
2768 if unescaped_new_quote.search(body):
2769 # There's at least one unescaped new_quote in this raw string
2770 # so converting is impossible
2773 # Do not introduce or remove backslashes in raw strings
2776 # remove unnecessary escapes
2777 new_body = sub_twice(escaped_new_quote, rf"\1\2{new_quote}", body)
2778 if body != new_body:
2779 # Consider the string without unnecessary escapes as the original
2781 leaf.value = f"{prefix}{orig_quote}{body}{orig_quote}"
2782 new_body = sub_twice(escaped_orig_quote, rf"\1\2{orig_quote}", new_body)
2783 new_body = sub_twice(unescaped_new_quote, rf"\1\\{new_quote}", new_body)
2784 if "f" in prefix.casefold():
2785 matches = re.findall(
2787 (?:[^{]|^)\{ # start of the string or a non-{ followed by a single {
2788 ([^{].*?) # contents of the brackets except if begins with {{
2789 \}(?:[^}]|$) # A } followed by end of the string or a non-}
2796 # Do not introduce backslashes in interpolated expressions
2798 if new_quote == '"""' and new_body[-1:] == '"':
2800 new_body = new_body[:-1] + '\\"'
2801 orig_escape_count = body.count("\\")
2802 new_escape_count = new_body.count("\\")
2803 if new_escape_count > orig_escape_count:
2804 return # Do not introduce more escaping
2806 if new_escape_count == orig_escape_count and orig_quote == '"':
2807 return # Prefer double quotes
2809 leaf.value = f"{prefix}{new_quote}{new_body}{new_quote}"
2812 def normalize_numeric_literal(leaf: Leaf) -> None:
2813 """Normalizes numeric (float, int, and complex) literals.
2815 All letters used in the representation are normalized to lowercase (except
2816 in Python 2 long literals).
2818 text = leaf.value.lower()
2819 if text.startswith(("0o", "0b")):
2820 # Leave octal and binary literals alone.
2822 elif text.startswith("0x"):
2823 # Change hex literals to upper case.
2824 before, after = text[:2], text[2:]
2825 text = f"{before}{after.upper()}"
2827 before, after = text.split("e")
2829 if after.startswith("-"):
2832 elif after.startswith("+"):
2834 before = format_float_or_int_string(before)
2835 text = f"{before}e{sign}{after}"
2836 elif text.endswith(("j", "l")):
2839 # Capitalize in "2L" because "l" looks too similar to "1".
2842 text = f"{format_float_or_int_string(number)}{suffix}"
2844 text = format_float_or_int_string(text)
2848 def format_float_or_int_string(text: str) -> str:
2849 """Formats a float string like "1.0"."""
2853 before, after = text.split(".")
2854 return f"{before or 0}.{after or 0}"
2857 def normalize_invisible_parens(node: Node, parens_after: Set[str]) -> None:
2858 """Make existing optional parentheses invisible or create new ones.
2860 `parens_after` is a set of string leaf values immediately after which parens
2863 Standardizes on visible parentheses for single-element tuples, and keeps
2864 existing visible parentheses for other tuples and generator expressions.
2866 for pc in list_comments(node.prefix, is_endmarker=False):
2867 if pc.value in FMT_OFF:
2868 # This `node` has a prefix with `# fmt: off`, don't mess with parens.
2872 for index, child in enumerate(list(node.children)):
2873 # Add parentheses around long tuple unpacking in assignments.
2876 and isinstance(child, Node)
2877 and child.type == syms.testlist_star_expr
2882 if is_walrus_assignment(child):
2884 if child.type == syms.atom:
2885 # Determines if the underlying atom should be surrounded with
2886 # invisible params - also makes parens invisible recursively
2887 # within the atom and removes repeated invisible parens within
2889 should_surround_with_parens = maybe_make_parens_invisible_in_atom(
2893 if should_surround_with_parens:
2894 lpar = Leaf(token.LPAR, "")
2895 rpar = Leaf(token.RPAR, "")
2896 index = child.remove() or 0
2897 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2898 elif is_one_tuple(child):
2899 # wrap child in visible parentheses
2900 lpar = Leaf(token.LPAR, "(")
2901 rpar = Leaf(token.RPAR, ")")
2903 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2904 elif node.type == syms.import_from:
2905 # "import from" nodes store parentheses directly as part of
2907 if child.type == token.LPAR:
2908 # make parentheses invisible
2909 child.value = "" # type: ignore
2910 node.children[-1].value = "" # type: ignore
2911 elif child.type != token.STAR:
2912 # insert invisible parentheses
2913 node.insert_child(index, Leaf(token.LPAR, ""))
2914 node.append_child(Leaf(token.RPAR, ""))
2917 elif not (isinstance(child, Leaf) and is_multiline_string(child)):
2918 # wrap child in invisible parentheses
2919 lpar = Leaf(token.LPAR, "")
2920 rpar = Leaf(token.RPAR, "")
2921 index = child.remove() or 0
2922 prefix = child.prefix
2924 new_child = Node(syms.atom, [lpar, child, rpar])
2925 new_child.prefix = prefix
2926 node.insert_child(index, new_child)
2928 check_lpar = isinstance(child, Leaf) and child.value in parens_after
2931 def normalize_fmt_off(node: Node) -> None:
2932 """Convert content between `# fmt: off`/`# fmt: on` into standalone comments."""
2935 try_again = convert_one_fmt_off_pair(node)
2938 def convert_one_fmt_off_pair(node: Node) -> bool:
2939 """Convert content of a single `# fmt: off`/`# fmt: on` into a standalone comment.
2941 Returns True if a pair was converted.
2943 for leaf in node.leaves():
2944 previous_consumed = 0
2945 for comment in list_comments(leaf.prefix, is_endmarker=False):
2946 if comment.value in FMT_OFF:
2947 # We only want standalone comments. If there's no previous leaf or
2948 # the previous leaf is indentation, it's a standalone comment in
2950 if comment.type != STANDALONE_COMMENT:
2951 prev = preceding_leaf(leaf)
2952 if prev and prev.type not in WHITESPACE:
2955 ignored_nodes = list(generate_ignored_nodes(leaf))
2956 if not ignored_nodes:
2959 first = ignored_nodes[0] # Can be a container node with the `leaf`.
2960 parent = first.parent
2961 prefix = first.prefix
2962 first.prefix = prefix[comment.consumed :]
2964 comment.value + "\n" + "".join(str(n) for n in ignored_nodes)
2966 if hidden_value.endswith("\n"):
2967 # That happens when one of the `ignored_nodes` ended with a NEWLINE
2968 # leaf (possibly followed by a DEDENT).
2969 hidden_value = hidden_value[:-1]
2971 for ignored in ignored_nodes:
2972 index = ignored.remove()
2973 if first_idx is None:
2975 assert parent is not None, "INTERNAL ERROR: fmt: on/off handling (1)"
2976 assert first_idx is not None, "INTERNAL ERROR: fmt: on/off handling (2)"
2977 parent.insert_child(
2982 prefix=prefix[:previous_consumed] + "\n" * comment.newlines,
2987 previous_consumed = comment.consumed
2992 def generate_ignored_nodes(leaf: Leaf) -> Iterator[LN]:
2993 """Starting from the container of `leaf`, generate all leaves until `# fmt: on`.
2995 Stops at the end of the block.
2997 container: Optional[LN] = container_of(leaf)
2998 while container is not None and container.type != token.ENDMARKER:
2999 for comment in list_comments(container.prefix, is_endmarker=False):
3000 if comment.value in FMT_ON:
3005 container = container.next_sibling
3008 def maybe_make_parens_invisible_in_atom(node: LN, parent: LN) -> bool:
3009 """If it's safe, make the parens in the atom `node` invisible, recursively.
3010 Additionally, remove repeated, adjacent invisible parens from the atom `node`
3011 as they are redundant.
3013 Returns whether the node should itself be wrapped in invisible parentheses.
3017 node.type != syms.atom
3018 or is_empty_tuple(node)
3019 or is_one_tuple(node)
3020 or (is_yield(node) and parent.type != syms.expr_stmt)
3021 or max_delimiter_priority_in_atom(node) >= COMMA_PRIORITY
3025 first = node.children[0]
3026 last = node.children[-1]
3027 if first.type == token.LPAR and last.type == token.RPAR:
3028 middle = node.children[1]
3029 # make parentheses invisible
3030 first.value = "" # type: ignore
3031 last.value = "" # type: ignore
3032 maybe_make_parens_invisible_in_atom(middle, parent=parent)
3034 if is_atom_with_invisible_parens(middle):
3035 # Strip the invisible parens from `middle` by replacing
3036 # it with the child in-between the invisible parens
3037 middle.replace(middle.children[1])
3044 def is_atom_with_invisible_parens(node: LN) -> bool:
3045 """Given a `LN`, determines whether it's an atom `node` with invisible
3046 parens. Useful in dedupe-ing and normalizing parens.
3048 if isinstance(node, Leaf) or node.type != syms.atom:
3051 first, last = node.children[0], node.children[-1]
3053 isinstance(first, Leaf)
3054 and first.type == token.LPAR
3055 and first.value == ""
3056 and isinstance(last, Leaf)
3057 and last.type == token.RPAR
3058 and last.value == ""
3062 def is_empty_tuple(node: LN) -> bool:
3063 """Return True if `node` holds an empty tuple."""
3065 node.type == syms.atom
3066 and len(node.children) == 2
3067 and node.children[0].type == token.LPAR
3068 and node.children[1].type == token.RPAR
3072 def unwrap_singleton_parenthesis(node: LN) -> Optional[LN]:
3073 """Returns `wrapped` if `node` is of the shape ( wrapped ).
3075 Parenthesis can be optional. Returns None otherwise"""
3076 if len(node.children) != 3:
3078 lpar, wrapped, rpar = node.children
3079 if not (lpar.type == token.LPAR and rpar.type == token.RPAR):
3085 def is_one_tuple(node: LN) -> bool:
3086 """Return True if `node` holds a tuple with one element, with or without parens."""
3087 if node.type == syms.atom:
3088 gexp = unwrap_singleton_parenthesis(node)
3089 if gexp is None or gexp.type != syms.testlist_gexp:
3092 return len(gexp.children) == 2 and gexp.children[1].type == token.COMMA
3095 node.type in IMPLICIT_TUPLE
3096 and len(node.children) == 2
3097 and node.children[1].type == token.COMMA
3101 def is_walrus_assignment(node: LN) -> bool:
3102 """Return True iff `node` is of the shape ( test := test )"""
3103 inner = unwrap_singleton_parenthesis(node)
3104 return inner is not None and inner.type == syms.namedexpr_test
3107 def is_yield(node: LN) -> bool:
3108 """Return True if `node` holds a `yield` or `yield from` expression."""
3109 if node.type == syms.yield_expr:
3112 if node.type == token.NAME and node.value == "yield": # type: ignore
3115 if node.type != syms.atom:
3118 if len(node.children) != 3:
3121 lpar, expr, rpar = node.children
3122 if lpar.type == token.LPAR and rpar.type == token.RPAR:
3123 return is_yield(expr)
3128 def is_vararg(leaf: Leaf, within: Set[NodeType]) -> bool:
3129 """Return True if `leaf` is a star or double star in a vararg or kwarg.
3131 If `within` includes VARARGS_PARENTS, this applies to function signatures.
3132 If `within` includes UNPACKING_PARENTS, it applies to right hand-side
3133 extended iterable unpacking (PEP 3132) and additional unpacking
3134 generalizations (PEP 448).
3136 if leaf.type not in VARARGS_SPECIALS or not leaf.parent:
3140 if p.type == syms.star_expr:
3141 # Star expressions are also used as assignment targets in extended
3142 # iterable unpacking (PEP 3132). See what its parent is instead.
3148 return p.type in within
3151 def is_multiline_string(leaf: Leaf) -> bool:
3152 """Return True if `leaf` is a multiline string that actually spans many lines."""
3153 value = leaf.value.lstrip("furbFURB")
3154 return value[:3] in {'"""', "'''"} and "\n" in value
3157 def is_stub_suite(node: Node) -> bool:
3158 """Return True if `node` is a suite with a stub body."""
3160 len(node.children) != 4
3161 or node.children[0].type != token.NEWLINE
3162 or node.children[1].type != token.INDENT
3163 or node.children[3].type != token.DEDENT
3167 return is_stub_body(node.children[2])
3170 def is_stub_body(node: LN) -> bool:
3171 """Return True if `node` is a simple statement containing an ellipsis."""
3172 if not isinstance(node, Node) or node.type != syms.simple_stmt:
3175 if len(node.children) != 2:
3178 child = node.children[0]
3180 child.type == syms.atom
3181 and len(child.children) == 3
3182 and all(leaf == Leaf(token.DOT, ".") for leaf in child.children)
3186 def max_delimiter_priority_in_atom(node: LN) -> Priority:
3187 """Return maximum delimiter priority inside `node`.
3189 This is specific to atoms with contents contained in a pair of parentheses.
3190 If `node` isn't an atom or there are no enclosing parentheses, returns 0.
3192 if node.type != syms.atom:
3195 first = node.children[0]
3196 last = node.children[-1]
3197 if not (first.type == token.LPAR and last.type == token.RPAR):
3200 bt = BracketTracker()
3201 for c in node.children[1:-1]:
3202 if isinstance(c, Leaf):
3205 for leaf in c.leaves():
3208 return bt.max_delimiter_priority()
3214 def ensure_visible(leaf: Leaf) -> None:
3215 """Make sure parentheses are visible.
3217 They could be invisible as part of some statements (see
3218 :func:`normalize_invisible_parens` and :func:`visit_import_from`).
3220 if leaf.type == token.LPAR:
3222 elif leaf.type == token.RPAR:
3226 def should_explode(line: Line, opening_bracket: Leaf) -> bool:
3227 """Should `line` immediately be split with `delimiter_split()` after RHS?"""
3230 opening_bracket.parent
3231 and opening_bracket.parent.type in {syms.atom, syms.import_from}
3232 and opening_bracket.value in "[{("
3237 last_leaf = line.leaves[-1]
3238 exclude = {id(last_leaf)} if last_leaf.type == token.COMMA else set()
3239 max_priority = line.bracket_tracker.max_delimiter_priority(exclude=exclude)
3240 except (IndexError, ValueError):
3243 return max_priority == COMMA_PRIORITY
3246 def get_features_used(node: Node) -> Set[Feature]:
3247 """Return a set of (relatively) new Python features used in this file.
3249 Currently looking for:
3251 - underscores in numeric literals;
3252 - trailing commas after * or ** in function signatures and calls;
3253 - positional only arguments in function signatures and lambdas;
3255 features: Set[Feature] = set()
3256 for n in node.pre_order():
3257 if n.type == token.STRING:
3258 value_head = n.value[:2] # type: ignore
3259 if value_head in {'f"', 'F"', "f'", "F'", "rf", "fr", "RF", "FR"}:
3260 features.add(Feature.F_STRINGS)
3262 elif n.type == token.NUMBER:
3263 if "_" in n.value: # type: ignore
3264 features.add(Feature.NUMERIC_UNDERSCORES)
3266 elif n.type == token.SLASH:
3267 if n.parent and n.parent.type in {syms.typedargslist, syms.arglist}:
3268 features.add(Feature.POS_ONLY_ARGUMENTS)
3270 elif n.type == token.COLONEQUAL:
3271 features.add(Feature.ASSIGNMENT_EXPRESSIONS)
3274 n.type in {syms.typedargslist, syms.arglist}
3276 and n.children[-1].type == token.COMMA
3278 if n.type == syms.typedargslist:
3279 feature = Feature.TRAILING_COMMA_IN_DEF
3281 feature = Feature.TRAILING_COMMA_IN_CALL
3283 for ch in n.children:
3284 if ch.type in STARS:
3285 features.add(feature)
3287 if ch.type == syms.argument:
3288 for argch in ch.children:
3289 if argch.type in STARS:
3290 features.add(feature)
3295 def detect_target_versions(node: Node) -> Set[TargetVersion]:
3296 """Detect the version to target based on the nodes used."""
3297 features = get_features_used(node)
3299 version for version in TargetVersion if features <= VERSION_TO_FEATURES[version]
3303 def generate_trailers_to_omit(line: Line, line_length: int) -> Iterator[Set[LeafID]]:
3304 """Generate sets of closing bracket IDs that should be omitted in a RHS.
3306 Brackets can be omitted if the entire trailer up to and including
3307 a preceding closing bracket fits in one line.
3309 Yielded sets are cumulative (contain results of previous yields, too). First
3313 omit: Set[LeafID] = set()
3316 length = 4 * line.depth
3317 opening_bracket = None
3318 closing_bracket = None
3319 inner_brackets: Set[LeafID] = set()
3320 for index, leaf, leaf_length in enumerate_with_length(line, reversed=True):
3321 length += leaf_length
3322 if length > line_length:
3325 has_inline_comment = leaf_length > len(leaf.value) + len(leaf.prefix)
3326 if leaf.type == STANDALONE_COMMENT or has_inline_comment:
3330 if leaf is opening_bracket:
3331 opening_bracket = None
3332 elif leaf.type in CLOSING_BRACKETS:
3333 inner_brackets.add(id(leaf))
3334 elif leaf.type in CLOSING_BRACKETS:
3335 if index > 0 and line.leaves[index - 1].type in OPENING_BRACKETS:
3336 # Empty brackets would fail a split so treat them as "inner"
3337 # brackets (e.g. only add them to the `omit` set if another
3338 # pair of brackets was good enough.
3339 inner_brackets.add(id(leaf))
3343 omit.add(id(closing_bracket))
3344 omit.update(inner_brackets)
3345 inner_brackets.clear()
3349 opening_bracket = leaf.opening_bracket
3350 closing_bracket = leaf
3353 def get_future_imports(node: Node) -> Set[str]:
3354 """Return a set of __future__ imports in the file."""
3355 imports: Set[str] = set()
3357 def get_imports_from_children(children: List[LN]) -> Generator[str, None, None]:
3358 for child in children:
3359 if isinstance(child, Leaf):
3360 if child.type == token.NAME:
3362 elif child.type == syms.import_as_name:
3363 orig_name = child.children[0]
3364 assert isinstance(orig_name, Leaf), "Invalid syntax parsing imports"
3365 assert orig_name.type == token.NAME, "Invalid syntax parsing imports"
3366 yield orig_name.value
3367 elif child.type == syms.import_as_names:
3368 yield from get_imports_from_children(child.children)
3370 raise AssertionError("Invalid syntax parsing imports")
3372 for child in node.children:
3373 if child.type != syms.simple_stmt:
3375 first_child = child.children[0]
3376 if isinstance(first_child, Leaf):
3377 # Continue looking if we see a docstring; otherwise stop.
3379 len(child.children) == 2
3380 and first_child.type == token.STRING
3381 and child.children[1].type == token.NEWLINE
3386 elif first_child.type == syms.import_from:
3387 module_name = first_child.children[1]
3388 if not isinstance(module_name, Leaf) or module_name.value != "__future__":
3390 imports |= set(get_imports_from_children(first_child.children[3:]))
3396 def gen_python_files_in_dir(
3399 include: Pattern[str],
3400 exclude: Pattern[str],
3402 ) -> Iterator[Path]:
3403 """Generate all files under `path` whose paths are not excluded by the
3404 `exclude` regex, but are included by the `include` regex.
3406 Symbolic links pointing outside of the `root` directory are ignored.
3408 `report` is where output about exclusions goes.
3410 assert root.is_absolute(), f"INTERNAL ERROR: `root` must be absolute but is {root}"
3411 for child in path.iterdir():
3413 normalized_path = "/" + child.resolve().relative_to(root).as_posix()
3415 if child.is_symlink():
3416 report.path_ignored(
3417 child, f"is a symbolic link that points outside {root}"
3424 normalized_path += "/"
3425 exclude_match = exclude.search(normalized_path)
3426 if exclude_match and exclude_match.group(0):
3427 report.path_ignored(child, f"matches the --exclude regular expression")
3431 yield from gen_python_files_in_dir(child, root, include, exclude, report)
3433 elif child.is_file():
3434 include_match = include.search(normalized_path)
3440 def find_project_root(srcs: Iterable[str]) -> Path:
3441 """Return a directory containing .git, .hg, or pyproject.toml.
3443 That directory can be one of the directories passed in `srcs` or their
3446 If no directory in the tree contains a marker that would specify it's the
3447 project root, the root of the file system is returned.
3450 return Path("/").resolve()
3452 common_base = min(Path(src).resolve() for src in srcs)
3453 if common_base.is_dir():
3454 # Append a fake file so `parents` below returns `common_base_dir`, too.
3455 common_base /= "fake-file"
3456 for directory in common_base.parents:
3457 if (directory / ".git").is_dir():
3460 if (directory / ".hg").is_dir():
3463 if (directory / "pyproject.toml").is_file():
3471 """Provides a reformatting counter. Can be rendered with `str(report)`."""
3475 verbose: bool = False
3476 change_count: int = 0
3478 failure_count: int = 0
3480 def done(self, src: Path, changed: Changed) -> None:
3481 """Increment the counter for successful reformatting. Write out a message."""
3482 if changed is Changed.YES:
3483 reformatted = "would reformat" if self.check else "reformatted"
3484 if self.verbose or not self.quiet:
3485 out(f"{reformatted} {src}")
3486 self.change_count += 1
3489 if changed is Changed.NO:
3490 msg = f"{src} already well formatted, good job."
3492 msg = f"{src} wasn't modified on disk since last run."
3493 out(msg, bold=False)
3494 self.same_count += 1
3496 def failed(self, src: Path, message: str) -> None:
3497 """Increment the counter for failed reformatting. Write out a message."""
3498 err(f"error: cannot format {src}: {message}")
3499 self.failure_count += 1
3501 def path_ignored(self, path: Path, message: str) -> None:
3503 out(f"{path} ignored: {message}", bold=False)
3506 def return_code(self) -> int:
3507 """Return the exit code that the app should use.
3509 This considers the current state of changed files and failures:
3510 - if there were any failures, return 123;
3511 - if any files were changed and --check is being used, return 1;
3512 - otherwise return 0.
3514 # According to http://tldp.org/LDP/abs/html/exitcodes.html starting with
3515 # 126 we have special return codes reserved by the shell.
3516 if self.failure_count:
3519 elif self.change_count and self.check:
3524 def __str__(self) -> str:
3525 """Render a color report of the current state.
3527 Use `click.unstyle` to remove colors.
3530 reformatted = "would be reformatted"
3531 unchanged = "would be left unchanged"
3532 failed = "would fail to reformat"
3534 reformatted = "reformatted"
3535 unchanged = "left unchanged"
3536 failed = "failed to reformat"
3538 if self.change_count:
3539 s = "s" if self.change_count > 1 else ""
3541 click.style(f"{self.change_count} file{s} {reformatted}", bold=True)
3544 s = "s" if self.same_count > 1 else ""
3545 report.append(f"{self.same_count} file{s} {unchanged}")
3546 if self.failure_count:
3547 s = "s" if self.failure_count > 1 else ""
3549 click.style(f"{self.failure_count} file{s} {failed}", fg="red")
3551 return ", ".join(report) + "."
3554 def parse_ast(src: str) -> Union[ast.AST, ast3.AST, ast27.AST]:
3555 filename = "<unknown>"
3556 if sys.version_info >= (3, 8):
3557 # TODO: support Python 4+ ;)
3558 for minor_version in range(sys.version_info[1], 4, -1):
3560 return ast.parse(src, filename, feature_version=(3, minor_version))
3564 for feature_version in (7, 6):
3566 return ast3.parse(src, filename, feature_version=feature_version)
3570 return ast27.parse(src)
3573 def _fixup_ast_constants(
3574 node: Union[ast.AST, ast3.AST, ast27.AST]
3575 ) -> Union[ast.AST, ast3.AST, ast27.AST]:
3576 """Map ast nodes deprecated in 3.8 to Constant."""
3577 # casts are required until this is released:
3578 # https://github.com/python/typeshed/pull/3142
3579 if isinstance(node, (ast.Str, ast3.Str, ast27.Str, ast.Bytes, ast3.Bytes)):
3580 return cast(ast.AST, ast.Constant(value=node.s))
3581 elif isinstance(node, (ast.Num, ast3.Num, ast27.Num)):
3582 return cast(ast.AST, ast.Constant(value=node.n))
3583 elif isinstance(node, (ast.NameConstant, ast3.NameConstant)):
3584 return cast(ast.AST, ast.Constant(value=node.value))
3588 def assert_equivalent(src: str, dst: str) -> None:
3589 """Raise AssertionError if `src` and `dst` aren't equivalent."""
3591 def _v(node: Union[ast.AST, ast3.AST, ast27.AST], depth: int = 0) -> Iterator[str]:
3592 """Simple visitor generating strings to compare ASTs by content."""
3594 node = _fixup_ast_constants(node)
3596 yield f"{' ' * depth}{node.__class__.__name__}("
3598 for field in sorted(node._fields):
3599 # TypeIgnore has only one field 'lineno' which breaks this comparison
3600 type_ignore_classes = (ast3.TypeIgnore, ast27.TypeIgnore)
3601 if sys.version_info >= (3, 8):
3602 type_ignore_classes += (ast.TypeIgnore,)
3603 if isinstance(node, type_ignore_classes):
3607 value = getattr(node, field)
3608 except AttributeError:
3611 yield f"{' ' * (depth+1)}{field}="
3613 if isinstance(value, list):
3615 # Ignore nested tuples within del statements, because we may insert
3616 # parentheses and they change the AST.
3619 and isinstance(node, (ast.Delete, ast3.Delete, ast27.Delete))
3620 and isinstance(item, (ast.Tuple, ast3.Tuple, ast27.Tuple))
3622 for item in item.elts:
3623 yield from _v(item, depth + 2)
3624 elif isinstance(item, (ast.AST, ast3.AST, ast27.AST)):
3625 yield from _v(item, depth + 2)
3627 elif isinstance(value, (ast.AST, ast3.AST, ast27.AST)):
3628 yield from _v(value, depth + 2)
3631 yield f"{' ' * (depth+2)}{value!r}, # {value.__class__.__name__}"
3633 yield f"{' ' * depth}) # /{node.__class__.__name__}"
3636 src_ast = parse_ast(src)
3637 except Exception as exc:
3638 raise AssertionError(
3639 f"cannot use --safe with this file; failed to parse source file. "
3640 f"AST error message: {exc}"
3644 dst_ast = parse_ast(dst)
3645 except Exception as exc:
3646 log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
3647 raise AssertionError(
3648 f"INTERNAL ERROR: Black produced invalid code: {exc}. "
3649 f"Please report a bug on https://github.com/psf/black/issues. "
3650 f"This invalid output might be helpful: {log}"
3653 src_ast_str = "\n".join(_v(src_ast))
3654 dst_ast_str = "\n".join(_v(dst_ast))
3655 if src_ast_str != dst_ast_str:
3656 log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst"))
3657 raise AssertionError(
3658 f"INTERNAL ERROR: Black produced code that is not equivalent to "
3660 f"Please report a bug on https://github.com/psf/black/issues. "
3661 f"This diff might be helpful: {log}"
3665 def assert_stable(src: str, dst: str, mode: FileMode) -> None:
3666 """Raise AssertionError if `dst` reformats differently the second time."""
3667 newdst = format_str(dst, mode=mode)
3670 diff(src, dst, "source", "first pass"),
3671 diff(dst, newdst, "first pass", "second pass"),
3673 raise AssertionError(
3674 f"INTERNAL ERROR: Black produced different code on the second pass "
3675 f"of the formatter. "
3676 f"Please report a bug on https://github.com/psf/black/issues. "
3677 f"This diff might be helpful: {log}"
3681 def dump_to_file(*output: str) -> str:
3682 """Dump `output` to a temporary file. Return path to the file."""
3683 with tempfile.NamedTemporaryFile(
3684 mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
3686 for lines in output:
3688 if lines and lines[-1] != "\n":
3694 def nullcontext() -> Iterator[None]:
3695 """Return context manager that does nothing.
3696 Similar to `nullcontext` from python 3.7"""
3700 def diff(a: str, b: str, a_name: str, b_name: str) -> str:
3701 """Return a unified diff string between strings `a` and `b`."""
3704 a_lines = [line + "\n" for line in a.split("\n")]
3705 b_lines = [line + "\n" for line in b.split("\n")]
3707 difflib.unified_diff(a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5)
3711 def cancel(tasks: Iterable[asyncio.Task]) -> None:
3712 """asyncio signal handler that cancels all `tasks` and reports to stderr."""
3718 def shutdown(loop: asyncio.AbstractEventLoop) -> None:
3719 """Cancel all pending tasks on `loop`, wait for them, and close the loop."""
3721 if sys.version_info[:2] >= (3, 7):
3722 all_tasks = asyncio.all_tasks
3724 all_tasks = asyncio.Task.all_tasks
3725 # This part is borrowed from asyncio/runners.py in Python 3.7b2.
3726 to_cancel = [task for task in all_tasks(loop) if not task.done()]
3730 for task in to_cancel:
3732 loop.run_until_complete(
3733 asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
3736 # `concurrent.futures.Future` objects cannot be cancelled once they
3737 # are already running. There might be some when the `shutdown()` happened.
3738 # Silence their logger's spew about the event loop being closed.
3739 cf_logger = logging.getLogger("concurrent.futures")
3740 cf_logger.setLevel(logging.CRITICAL)
3744 def sub_twice(regex: Pattern[str], replacement: str, original: str) -> str:
3745 """Replace `regex` with `replacement` twice on `original`.
3747 This is used by string normalization to perform replaces on
3748 overlapping matches.
3750 return regex.sub(replacement, regex.sub(replacement, original))
3753 def re_compile_maybe_verbose(regex: str) -> Pattern[str]:
3754 """Compile a regular expression string in `regex`.
3756 If it contains newlines, use verbose mode.
3759 regex = "(?x)" + regex
3760 return re.compile(regex)
3763 def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
3764 """Like `reversed(enumerate(sequence))` if that were possible."""
3765 index = len(sequence) - 1
3766 for element in reversed(sequence):
3767 yield (index, element)
3771 def enumerate_with_length(
3772 line: Line, reversed: bool = False
3773 ) -> Iterator[Tuple[Index, Leaf, int]]:
3774 """Return an enumeration of leaves with their length.
3776 Stops prematurely on multiline strings and standalone comments.
3779 Callable[[Sequence[Leaf]], Iterator[Tuple[Index, Leaf]]],
3780 enumerate_reversed if reversed else enumerate,
3782 for index, leaf in op(line.leaves):
3783 length = len(leaf.prefix) + len(leaf.value)
3784 if "\n" in leaf.value:
3785 return # Multiline strings, we can't continue.
3787 for comment in line.comments_after(leaf):
3788 length += len(comment.value)
3790 yield index, leaf, length
3793 def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
3794 """Return True if `line` is no longer than `line_length`.
3796 Uses the provided `line_str` rendering, if any, otherwise computes a new one.
3799 line_str = str(line).strip("\n")
3801 len(line_str) <= line_length
3802 and "\n" not in line_str # multiline strings
3803 and not line.contains_standalone_comments()
3807 def can_be_split(line: Line) -> bool:
3808 """Return False if the line cannot be split *for sure*.
3810 This is not an exhaustive search but a cheap heuristic that we can use to
3811 avoid some unfortunate formattings (mostly around wrapping unsplittable code
3812 in unnecessary parentheses).
3814 leaves = line.leaves
3818 if leaves[0].type == token.STRING and leaves[1].type == token.DOT:
3822 for leaf in leaves[-2::-1]:
3823 if leaf.type in OPENING_BRACKETS:
3824 if next.type not in CLOSING_BRACKETS:
3828 elif leaf.type == token.DOT:
3830 elif leaf.type == token.NAME:
3831 if not (next.type == token.DOT or next.type in OPENING_BRACKETS):
3834 elif leaf.type not in CLOSING_BRACKETS:
3837 if dot_count > 1 and call_count > 1:
3843 def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
3844 """Does `line` have a shape safe to reformat without optional parens around it?
3846 Returns True for only a subset of potentially nice looking formattings but
3847 the point is to not return false positives that end up producing lines that
3850 bt = line.bracket_tracker
3851 if not bt.delimiters:
3852 # Without delimiters the optional parentheses are useless.
3855 max_priority = bt.max_delimiter_priority()
3856 if bt.delimiter_count_with_priority(max_priority) > 1:
3857 # With more than one delimiter of a kind the optional parentheses read better.
3860 if max_priority == DOT_PRIORITY:
3861 # A single stranded method call doesn't require optional parentheses.
3864 assert len(line.leaves) >= 2, "Stranded delimiter"
3866 first = line.leaves[0]
3867 second = line.leaves[1]
3868 penultimate = line.leaves[-2]
3869 last = line.leaves[-1]
3871 # With a single delimiter, omit if the expression starts or ends with
3873 if first.type in OPENING_BRACKETS and second.type not in CLOSING_BRACKETS:
3875 length = 4 * line.depth
3876 for _index, leaf, leaf_length in enumerate_with_length(line):
3877 if leaf.type in CLOSING_BRACKETS and leaf.opening_bracket is first:
3880 length += leaf_length
3881 if length > line_length:
3884 if leaf.type in OPENING_BRACKETS:
3885 # There are brackets we can further split on.
3889 # checked the entire string and line length wasn't exceeded
3890 if len(line.leaves) == _index + 1:
3893 # Note: we are not returning False here because a line might have *both*
3894 # a leading opening bracket and a trailing closing bracket. If the
3895 # opening bracket doesn't match our rule, maybe the closing will.
3898 last.type == token.RPAR
3899 or last.type == token.RBRACE
3901 # don't use indexing for omitting optional parentheses;
3903 last.type == token.RSQB
3905 and last.parent.type != syms.trailer
3908 if penultimate.type in OPENING_BRACKETS:
3909 # Empty brackets don't help.
3912 if is_multiline_string(first):
3913 # Additional wrapping of a multiline string in this situation is
3917 length = 4 * line.depth
3918 seen_other_brackets = False
3919 for _index, leaf, leaf_length in enumerate_with_length(line):
3920 length += leaf_length
3921 if leaf is last.opening_bracket:
3922 if seen_other_brackets or length <= line_length:
3925 elif leaf.type in OPENING_BRACKETS:
3926 # There are brackets we can further split on.
3927 seen_other_brackets = True
3932 def get_cache_file(mode: FileMode) -> Path:
3933 return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
3936 def read_cache(mode: FileMode) -> Cache:
3937 """Read the cache if it exists and is well formed.
3939 If it is not well formed, the call to write_cache later should resolve the issue.
3941 cache_file = get_cache_file(mode)
3942 if not cache_file.exists():
3945 with cache_file.open("rb") as fobj:
3947 cache: Cache = pickle.load(fobj)
3948 except pickle.UnpicklingError:
3954 def get_cache_info(path: Path) -> CacheInfo:
3955 """Return the information used to check if a file is already formatted or not."""
3957 return stat.st_mtime, stat.st_size
3960 def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
3961 """Split an iterable of paths in `sources` into two sets.
3963 The first contains paths of files that modified on disk or are not in the
3964 cache. The other contains paths to non-modified files.
3966 todo, done = set(), set()
3969 if cache.get(src) != get_cache_info(src):
3976 def write_cache(cache: Cache, sources: Iterable[Path], mode: FileMode) -> None:
3977 """Update the cache file."""
3978 cache_file = get_cache_file(mode)
3980 CACHE_DIR.mkdir(parents=True, exist_ok=True)
3981 new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
3982 with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
3983 pickle.dump(new_cache, f, protocol=pickle.HIGHEST_PROTOCOL)
3984 os.replace(f.name, cache_file)
3989 def patch_click() -> None:
3990 """Make Click not crash.
3992 On certain misconfigured environments, Python 3 selects the ASCII encoding as the
3993 default which restricts paths that it can access during the lifetime of the
3994 application. Click refuses to work in this scenario by raising a RuntimeError.
3996 In case of Black the likelihood that non-ASCII characters are going to be used in
3997 file paths is minimal since it's Python source code. Moreover, this crash was
3998 spurious on Python 3.7 thanks to PEP 538 and PEP 540.
4001 from click import core
4002 from click import _unicodefun # type: ignore
4003 except ModuleNotFoundError:
4006 for module in (core, _unicodefun):
4007 if hasattr(module, "_verify_python3_env"):
4008 module._verify_python3_env = lambda: None
4011 def patched_main() -> None:
4017 if __name__ == "__main__":