All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
3 from concurrent.futures import Executor, ProcessPoolExecutor
4 from contextlib import contextmanager
5 from datetime import datetime
7 from functools import lru_cache, partial, wraps
11 from multiprocessing import Manager, freeze_support
13 from pathlib import Path
41 from appdirs import user_cache_dir
42 from attr import dataclass, evolve, Factory
45 from typed_ast import ast3, ast27
48 from blib2to3.pytree import Node, Leaf, type_repr
49 from blib2to3 import pygram, pytree
50 from blib2to3.pgen2 import driver, token
51 from blib2to3.pgen2.grammar import Grammar
52 from blib2to3.pgen2.parse import ParseError
55 __version__ = "19.3b0"
56 DEFAULT_LINE_LENGTH = 88
58 r"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|_build|buck-out|build|dist)/"
60 DEFAULT_INCLUDES = r"\.pyi?$"
61 CACHE_DIR = Path(user_cache_dir("black", version=__version__))
73 LN = Union[Leaf, Node]
74 SplitFunc = Callable[["Line", Collection["Feature"]], Iterator["Line"]]
77 CacheInfo = Tuple[Timestamp, FileSize]
78 Cache = Dict[Path, CacheInfo]
79 out = partial(click.secho, bold=True, err=True)
80 err = partial(click.secho, fg="red", err=True)
82 pygram.initialize(CACHE_DIR)
83 syms = pygram.python_symbols
86 class NothingChanged(UserWarning):
87 """Raised when reformatted code is the same as source."""
90 class CannotSplit(Exception):
91 """A readable split that fits the allotted line length is impossible."""
94 class InvalidInput(ValueError):
95 """Raised when input source code fails all parse attempts."""
98 class WriteBack(Enum):
105 def from_configuration(cls, *, check: bool, diff: bool) -> "WriteBack":
106 if check and not diff:
109 return cls.DIFF if diff else cls.YES
118 class TargetVersion(Enum):
127 def is_python2(self) -> bool:
128 return self is TargetVersion.PY27
131 PY36_VERSIONS = {TargetVersion.PY36, TargetVersion.PY37, TargetVersion.PY38}
135 # All string literals are unicode
138 NUMERIC_UNDERSCORES = 3
139 TRAILING_COMMA_IN_CALL = 4
140 TRAILING_COMMA_IN_DEF = 5
141 # The following two feature-flags are mutually exclusive, and exactly one should be
142 # set for every version of python.
143 ASYNC_IDENTIFIERS = 6
145 ASSIGNMENT_EXPRESSIONS = 8
148 VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
149 TargetVersion.PY27: {Feature.ASYNC_IDENTIFIERS},
150 TargetVersion.PY33: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
151 TargetVersion.PY34: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
152 TargetVersion.PY35: {
153 Feature.UNICODE_LITERALS,
154 Feature.TRAILING_COMMA_IN_CALL,
155 Feature.ASYNC_IDENTIFIERS,
157 TargetVersion.PY36: {
158 Feature.UNICODE_LITERALS,
160 Feature.NUMERIC_UNDERSCORES,
161 Feature.TRAILING_COMMA_IN_CALL,
162 Feature.TRAILING_COMMA_IN_DEF,
163 Feature.ASYNC_IDENTIFIERS,
165 TargetVersion.PY37: {
166 Feature.UNICODE_LITERALS,
168 Feature.NUMERIC_UNDERSCORES,
169 Feature.TRAILING_COMMA_IN_CALL,
170 Feature.TRAILING_COMMA_IN_DEF,
171 Feature.ASYNC_KEYWORDS,
173 TargetVersion.PY38: {
174 Feature.UNICODE_LITERALS,
176 Feature.NUMERIC_UNDERSCORES,
177 Feature.TRAILING_COMMA_IN_CALL,
178 Feature.TRAILING_COMMA_IN_DEF,
179 Feature.ASYNC_KEYWORDS,
180 Feature.ASSIGNMENT_EXPRESSIONS,
187 target_versions: Set[TargetVersion] = Factory(set)
188 line_length: int = DEFAULT_LINE_LENGTH
189 string_normalization: bool = True
192 def get_cache_key(self) -> str:
193 if self.target_versions:
194 version_str = ",".join(
196 for version in sorted(self.target_versions, key=lambda v: v.value)
202 str(self.line_length),
203 str(int(self.string_normalization)),
204 str(int(self.is_pyi)),
206 return ".".join(parts)
209 def supports_feature(target_versions: Set[TargetVersion], feature: Feature) -> bool:
210 return all(feature in VERSION_TO_FEATURES[version] for version in target_versions)
213 def read_pyproject_toml(
214 ctx: click.Context, param: click.Parameter, value: Union[str, int, bool, None]
216 """Inject Black configuration from "pyproject.toml" into defaults in `ctx`.
218 Returns the path to a successfully found and read configuration file, None
221 assert not isinstance(value, (int, bool)), "Invalid parameter type passed"
223 root = find_project_root(ctx.params.get("src", ()))
224 path = root / "pyproject.toml"
231 pyproject_toml = toml.load(value)
232 config = pyproject_toml.get("tool", {}).get("black", {})
233 except (toml.TomlDecodeError, OSError) as e:
234 raise click.FileError(
235 filename=value, hint=f"Error reading configuration file: {e}"
241 if ctx.default_map is None:
243 ctx.default_map.update( # type: ignore # bad types in .pyi
244 {k.replace("--", "").replace("-", "_"): v for k, v in config.items()}
249 @click.command(context_settings=dict(help_option_names=["-h", "--help"]))
250 @click.option("-c", "--code", type=str, help="Format the code passed in as a string.")
255 default=DEFAULT_LINE_LENGTH,
256 help="How many characters per line to allow.",
262 type=click.Choice([v.name.lower() for v in TargetVersion]),
263 callback=lambda c, p, v: [TargetVersion[val.upper()] for val in v],
266 "Python versions that should be supported by Black's output. [default: "
267 "per-file auto-detection]"
274 "Allow using Python 3.6-only syntax on all input files. This will put "
275 "trailing commas in function signatures and calls also after *args and "
276 "**kwargs. Deprecated; use --target-version instead. "
277 "[default: per-file auto-detection]"
284 "Format all input files like typing stubs regardless of file extension "
285 "(useful when piping source on standard input)."
290 "--skip-string-normalization",
292 help="Don't normalize string quotes or prefixes.",
298 "Don't write the files back, just return the status. Return code 0 "
299 "means nothing would change. Return code 1 means some files would be "
300 "reformatted. Return code 123 means there was an internal error."
306 help="Don't write the files back, just output a diff for each file on stdout.",
311 help="If --fast given, skip temporary sanity checks. [default: --safe]",
316 default=DEFAULT_INCLUDES,
318 "A regular expression that matches files and directories that should be "
319 "included on recursive searches. An empty value means all files are "
320 "included regardless of the name. Use forward slashes for directories on "
321 "all platforms (Windows, too). Exclusions are calculated first, inclusions "
329 default=DEFAULT_EXCLUDES,
331 "A regular expression that matches files and directories that should be "
332 "excluded on recursive searches. An empty value means no paths are excluded. "
333 "Use forward slashes for directories on all platforms (Windows, too). "
334 "Exclusions are calculated first, inclusions later."
343 "Don't emit non-error messages to stderr. Errors are still emitted; "
344 "silence those with 2>/dev/null."
352 "Also emit messages to stderr about files that were not changed or were "
353 "ignored due to --exclude=."
356 @click.version_option(version=__version__)
361 exists=True, file_okay=True, dir_okay=True, readable=True, allow_dash=True
368 exists=False, file_okay=True, dir_okay=False, readable=True, allow_dash=False
371 callback=read_pyproject_toml,
372 help="Read configuration from PATH.",
379 target_version: List[TargetVersion],
385 skip_string_normalization: bool,
391 config: Optional[str],
393 """The uncompromising code formatter."""
394 write_back = WriteBack.from_configuration(check=check, diff=diff)
397 err(f"Cannot use both --target-version and --py36")
400 versions = set(target_version)
403 "--py36 is deprecated and will be removed in a future version. "
404 "Use --target-version py36 instead."
406 versions = PY36_VERSIONS
408 # We'll autodetect later.
411 target_versions=versions,
412 line_length=line_length,
414 string_normalization=not skip_string_normalization,
416 if config and verbose:
417 out(f"Using configuration from {config}.", bold=False, fg="blue")
419 print(format_str(code, mode=mode))
422 include_regex = re_compile_maybe_verbose(include)
424 err(f"Invalid regular expression for include given: {include!r}")
427 exclude_regex = re_compile_maybe_verbose(exclude)
429 err(f"Invalid regular expression for exclude given: {exclude!r}")
431 report = Report(check=check, quiet=quiet, verbose=verbose)
432 root = find_project_root(src)
433 sources: Set[Path] = set()
438 gen_python_files_in_dir(p, root, include_regex, exclude_regex, report)
440 elif p.is_file() or s == "-":
441 # if a file was explicitly given, we don't care about its extension
444 err(f"invalid path: {s}")
445 if len(sources) == 0:
446 if verbose or not quiet:
447 out("No paths given. Nothing to do 😴")
450 if len(sources) == 1:
454 write_back=write_back,
460 sources=sources, fast=fast, write_back=write_back, mode=mode, report=report
463 if verbose or not quiet:
464 out("Oh no! 💥 💔 💥" if report.return_code else "All done! ✨ 🍰 ✨")
465 click.secho(str(report), err=True)
466 ctx.exit(report.return_code)
470 src: Path, fast: bool, write_back: WriteBack, mode: FileMode, report: "Report"
472 """Reformat a single file under `src` without spawning child processes.
474 `fast`, `write_back`, and `mode` options are passed to
475 :func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
479 if not src.is_file() and str(src) == "-":
480 if format_stdin_to_stdout(fast=fast, write_back=write_back, mode=mode):
481 changed = Changed.YES
484 if write_back != WriteBack.DIFF:
485 cache = read_cache(mode)
486 res_src = src.resolve()
487 if res_src in cache and cache[res_src] == get_cache_info(res_src):
488 changed = Changed.CACHED
489 if changed is not Changed.CACHED and format_file_in_place(
490 src, fast=fast, write_back=write_back, mode=mode
492 changed = Changed.YES
493 if (write_back is WriteBack.YES and changed is not Changed.CACHED) or (
494 write_back is WriteBack.CHECK and changed is Changed.NO
496 write_cache(cache, [src], mode)
497 report.done(src, changed)
498 except Exception as exc:
499 report.failed(src, str(exc))
505 write_back: WriteBack,
509 """Reformat multiple files using a ProcessPoolExecutor."""
510 loop = asyncio.get_event_loop()
511 worker_count = os.cpu_count()
512 if sys.platform == "win32":
513 # Work around https://bugs.python.org/issue26903
514 worker_count = min(worker_count, 61)
515 executor = ProcessPoolExecutor(max_workers=worker_count)
517 loop.run_until_complete(
521 write_back=write_back,
533 async def schedule_formatting(
536 write_back: WriteBack,
539 loop: asyncio.AbstractEventLoop,
542 """Run formatting of `sources` in parallel using the provided `executor`.
544 (Use ProcessPoolExecutors for actual parallelism.)
546 `write_back`, `fast`, and `mode` options are passed to
547 :func:`format_file_in_place`.
550 if write_back != WriteBack.DIFF:
551 cache = read_cache(mode)
552 sources, cached = filter_cached(cache, sources)
553 for src in sorted(cached):
554 report.done(src, Changed.CACHED)
559 sources_to_cache = []
561 if write_back == WriteBack.DIFF:
562 # For diff output, we need locks to ensure we don't interleave output
563 # from different processes.
565 lock = manager.Lock()
567 asyncio.ensure_future(
568 loop.run_in_executor(
569 executor, format_file_in_place, src, fast, mode, write_back, lock
572 for src in sorted(sources)
574 pending: Iterable[asyncio.Future] = tasks.keys()
576 loop.add_signal_handler(signal.SIGINT, cancel, pending)
577 loop.add_signal_handler(signal.SIGTERM, cancel, pending)
578 except NotImplementedError:
579 # There are no good alternatives for these on Windows.
582 done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
584 src = tasks.pop(task)
586 cancelled.append(task)
587 elif task.exception():
588 report.failed(src, str(task.exception()))
590 changed = Changed.YES if task.result() else Changed.NO
591 # If the file was written back or was successfully checked as
592 # well-formatted, store this information in the cache.
593 if write_back is WriteBack.YES or (
594 write_back is WriteBack.CHECK and changed is Changed.NO
596 sources_to_cache.append(src)
597 report.done(src, changed)
599 await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
601 write_cache(cache, sources_to_cache, mode)
604 def format_file_in_place(
608 write_back: WriteBack = WriteBack.NO,
609 lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
611 """Format file under `src` path. Return True if changed.
613 If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted
615 `mode` and `fast` options are passed to :func:`format_file_contents`.
617 if src.suffix == ".pyi":
618 mode = evolve(mode, is_pyi=True)
620 then = datetime.utcfromtimestamp(src.stat().st_mtime)
621 with open(src, "rb") as buf:
622 src_contents, encoding, newline = decode_bytes(buf.read())
624 dst_contents = format_file_contents(src_contents, fast=fast, mode=mode)
625 except NothingChanged:
628 if write_back == write_back.YES:
629 with open(src, "w", encoding=encoding, newline=newline) as f:
630 f.write(dst_contents)
631 elif write_back == write_back.DIFF:
632 now = datetime.utcnow()
633 src_name = f"{src}\t{then} +0000"
634 dst_name = f"{src}\t{now} +0000"
635 diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
637 with lock or nullcontext():
638 f = io.TextIOWrapper(
644 f.write(diff_contents)
650 def format_stdin_to_stdout(
651 fast: bool, *, write_back: WriteBack = WriteBack.NO, mode: FileMode
653 """Format file on stdin. Return True if changed.
655 If `write_back` is YES, write reformatted code back to stdout. If it is DIFF,
656 write a diff to stdout. The `mode` argument is passed to
657 :func:`format_file_contents`.
659 then = datetime.utcnow()
660 src, encoding, newline = decode_bytes(sys.stdin.buffer.read())
663 dst = format_file_contents(src, fast=fast, mode=mode)
666 except NothingChanged:
670 f = io.TextIOWrapper(
671 sys.stdout.buffer, encoding=encoding, newline=newline, write_through=True
673 if write_back == WriteBack.YES:
675 elif write_back == WriteBack.DIFF:
676 now = datetime.utcnow()
677 src_name = f"STDIN\t{then} +0000"
678 dst_name = f"STDOUT\t{now} +0000"
679 f.write(diff(src, dst, src_name, dst_name))
683 def format_file_contents(
684 src_contents: str, *, fast: bool, mode: FileMode
686 """Reformat contents a file and return new contents.
688 If `fast` is False, additionally confirm that the reformatted code is
689 valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
690 `mode` is passed to :func:`format_str`.
692 if src_contents.strip() == "":
695 dst_contents = format_str(src_contents, mode=mode)
696 if src_contents == dst_contents:
700 assert_equivalent(src_contents, dst_contents)
701 assert_stable(src_contents, dst_contents, mode=mode)
705 def format_str(src_contents: str, *, mode: FileMode) -> FileContent:
706 """Reformat a string and return new contents.
708 `mode` determines formatting options, such as how many characters per line are
711 src_node = lib2to3_parse(src_contents.lstrip(), mode.target_versions)
713 future_imports = get_future_imports(src_node)
714 if mode.target_versions:
715 versions = mode.target_versions
717 versions = detect_target_versions(src_node)
718 normalize_fmt_off(src_node)
719 lines = LineGenerator(
720 remove_u_prefix="unicode_literals" in future_imports
721 or supports_feature(versions, Feature.UNICODE_LITERALS),
723 normalize_strings=mode.string_normalization,
725 elt = EmptyLineTracker(is_pyi=mode.is_pyi)
728 split_line_features = {
730 for feature in {Feature.TRAILING_COMMA_IN_CALL, Feature.TRAILING_COMMA_IN_DEF}
731 if supports_feature(versions, feature)
733 for current_line in lines.visit(src_node):
734 for _ in range(after):
735 dst_contents.append(str(empty_line))
736 before, after = elt.maybe_empty_lines(current_line)
737 for _ in range(before):
738 dst_contents.append(str(empty_line))
739 for line in split_line(
740 current_line, line_length=mode.line_length, features=split_line_features
742 dst_contents.append(str(line))
743 return "".join(dst_contents)
746 def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine]:
747 """Return a tuple of (decoded_contents, encoding, newline).
749 `newline` is either CRLF or LF but `decoded_contents` is decoded with
750 universal newlines (i.e. only contains LF).
752 srcbuf = io.BytesIO(src)
753 encoding, lines = tokenize.detect_encoding(srcbuf.readline)
755 return "", encoding, "\n"
757 newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
759 with io.TextIOWrapper(srcbuf, encoding) as tiow:
760 return tiow.read(), encoding, newline
763 def get_grammars(target_versions: Set[TargetVersion]) -> List[Grammar]:
764 if not target_versions:
765 # No target_version specified, so try all grammars.
768 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords,
770 pygram.python_grammar_no_print_statement_no_exec_statement,
771 # Python 2.7 with future print_function import
772 pygram.python_grammar_no_print_statement,
774 pygram.python_grammar,
776 elif all(version.is_python2() for version in target_versions):
777 # Python 2-only code, so try Python 2 grammars.
779 # Python 2.7 with future print_function import
780 pygram.python_grammar_no_print_statement,
782 pygram.python_grammar,
785 # Python 3-compatible code, so only try Python 3 grammar.
787 # If we have to parse both, try to parse async as a keyword first
788 if not supports_feature(target_versions, Feature.ASYNC_IDENTIFIERS):
791 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords # noqa: B950
793 if not supports_feature(target_versions, Feature.ASYNC_KEYWORDS):
795 grammars.append(pygram.python_grammar_no_print_statement_no_exec_statement)
796 # At least one of the above branches must have been taken, because every Python
797 # version has exactly one of the two 'ASYNC_*' flags
801 def lib2to3_parse(src_txt: str, target_versions: Iterable[TargetVersion] = ()) -> Node:
802 """Given a string with source, return the lib2to3 Node."""
803 if src_txt[-1:] != "\n":
806 for grammar in get_grammars(set(target_versions)):
807 drv = driver.Driver(grammar, pytree.convert)
809 result = drv.parse_string(src_txt, True)
812 except ParseError as pe:
813 lineno, column = pe.context[1]
814 lines = src_txt.splitlines()
816 faulty_line = lines[lineno - 1]
818 faulty_line = "<line number missing in source>"
819 exc = InvalidInput(f"Cannot parse: {lineno}:{column}: {faulty_line}")
823 if isinstance(result, Leaf):
824 result = Node(syms.file_input, [result])
828 def lib2to3_unparse(node: Node) -> str:
829 """Given a lib2to3 node, return its string representation."""
837 class Visitor(Generic[T]):
838 """Basic lib2to3 visitor that yields things of type `T` on `visit()`."""
840 def visit(self, node: LN) -> Iterator[T]:
841 """Main method to visit `node` and its children.
843 It tries to find a `visit_*()` method for the given `node.type`, like
844 `visit_simple_stmt` for Node objects or `visit_INDENT` for Leaf objects.
845 If no dedicated `visit_*()` method is found, chooses `visit_default()`
848 Then yields objects of type `T` from the selected visitor.
851 name = token.tok_name[node.type]
853 name = type_repr(node.type)
854 yield from getattr(self, f"visit_{name}", self.visit_default)(node)
856 def visit_default(self, node: LN) -> Iterator[T]:
857 """Default `visit_*()` implementation. Recurses to children of `node`."""
858 if isinstance(node, Node):
859 for child in node.children:
860 yield from self.visit(child)
864 class DebugVisitor(Visitor[T]):
867 def visit_default(self, node: LN) -> Iterator[T]:
868 indent = " " * (2 * self.tree_depth)
869 if isinstance(node, Node):
870 _type = type_repr(node.type)
871 out(f"{indent}{_type}", fg="yellow")
873 for child in node.children:
874 yield from self.visit(child)
877 out(f"{indent}/{_type}", fg="yellow", bold=False)
879 _type = token.tok_name.get(node.type, str(node.type))
880 out(f"{indent}{_type}", fg="blue", nl=False)
882 # We don't have to handle prefixes for `Node` objects since
883 # that delegates to the first child anyway.
884 out(f" {node.prefix!r}", fg="green", bold=False, nl=False)
885 out(f" {node.value!r}", fg="blue", bold=False)
888 def show(cls, code: Union[str, Leaf, Node]) -> None:
889 """Pretty-print the lib2to3 AST of a given string of `code`.
891 Convenience method for debugging.
893 v: DebugVisitor[None] = DebugVisitor()
894 if isinstance(code, str):
895 code = lib2to3_parse(code)
899 WHITESPACE = {token.DEDENT, token.INDENT, token.NEWLINE}
910 STANDALONE_COMMENT = 153
911 token.tok_name[STANDALONE_COMMENT] = "STANDALONE_COMMENT"
912 LOGIC_OPERATORS = {"and", "or"}
937 STARS = {token.STAR, token.DOUBLESTAR}
940 syms.argument, # double star in arglist
941 syms.trailer, # single argument to call
943 syms.varargslist, # lambdas
945 UNPACKING_PARENTS = {
946 syms.atom, # single element of a list or set literal
950 syms.testlist_star_expr,
985 COMPREHENSION_PRIORITY = 20
987 TERNARY_PRIORITY = 16
990 COMPARATOR_PRIORITY = 10
1001 token.DOUBLESLASH: 4,
1005 token.DOUBLESTAR: 2,
1011 class BracketTracker:
1012 """Keeps track of brackets on a line."""
1015 bracket_match: Dict[Tuple[Depth, NodeType], Leaf] = Factory(dict)
1016 delimiters: Dict[LeafID, Priority] = Factory(dict)
1017 previous: Optional[Leaf] = None
1018 _for_loop_depths: List[int] = Factory(list)
1019 _lambda_argument_depths: List[int] = Factory(list)
1021 def mark(self, leaf: Leaf) -> None:
1022 """Mark `leaf` with bracket-related metadata. Keep track of delimiters.
1024 All leaves receive an int `bracket_depth` field that stores how deep
1025 within brackets a given leaf is. 0 means there are no enclosing brackets
1026 that started on this line.
1028 If a leaf is itself a closing bracket, it receives an `opening_bracket`
1029 field that it forms a pair with. This is a one-directional link to
1030 avoid reference cycles.
1032 If a leaf is a delimiter (a token on which Black can split the line if
1033 needed) and it's on depth 0, its `id()` is stored in the tracker's
1036 if leaf.type == token.COMMENT:
1039 self.maybe_decrement_after_for_loop_variable(leaf)
1040 self.maybe_decrement_after_lambda_arguments(leaf)
1041 if leaf.type in CLOSING_BRACKETS:
1043 opening_bracket = self.bracket_match.pop((self.depth, leaf.type))
1044 leaf.opening_bracket = opening_bracket
1045 leaf.bracket_depth = self.depth
1047 delim = is_split_before_delimiter(leaf, self.previous)
1048 if delim and self.previous is not None:
1049 self.delimiters[id(self.previous)] = delim
1051 delim = is_split_after_delimiter(leaf, self.previous)
1053 self.delimiters[id(leaf)] = delim
1054 if leaf.type in OPENING_BRACKETS:
1055 self.bracket_match[self.depth, BRACKET[leaf.type]] = leaf
1057 self.previous = leaf
1058 self.maybe_increment_lambda_arguments(leaf)
1059 self.maybe_increment_for_loop_variable(leaf)
1061 def any_open_brackets(self) -> bool:
1062 """Return True if there is an yet unmatched open bracket on the line."""
1063 return bool(self.bracket_match)
1065 def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> Priority:
1066 """Return the highest priority of a delimiter found on the line.
1068 Values are consistent with what `is_split_*_delimiter()` return.
1069 Raises ValueError on no delimiters.
1071 return max(v for k, v in self.delimiters.items() if k not in exclude)
1073 def delimiter_count_with_priority(self, priority: Priority = 0) -> int:
1074 """Return the number of delimiters with the given `priority`.
1076 If no `priority` is passed, defaults to max priority on the line.
1078 if not self.delimiters:
1081 priority = priority or self.max_delimiter_priority()
1082 return sum(1 for p in self.delimiters.values() if p == priority)
1084 def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
1085 """In a for loop, or comprehension, the variables are often unpacks.
1087 To avoid splitting on the comma in this situation, increase the depth of
1088 tokens between `for` and `in`.
1090 if leaf.type == token.NAME and leaf.value == "for":
1092 self._for_loop_depths.append(self.depth)
1097 def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
1098 """See `maybe_increment_for_loop_variable` above for explanation."""
1100 self._for_loop_depths
1101 and self._for_loop_depths[-1] == self.depth
1102 and leaf.type == token.NAME
1103 and leaf.value == "in"
1106 self._for_loop_depths.pop()
1111 def maybe_increment_lambda_arguments(self, leaf: Leaf) -> bool:
1112 """In a lambda expression, there might be more than one argument.
1114 To avoid splitting on the comma in this situation, increase the depth of
1115 tokens between `lambda` and `:`.
1117 if leaf.type == token.NAME and leaf.value == "lambda":
1119 self._lambda_argument_depths.append(self.depth)
1124 def maybe_decrement_after_lambda_arguments(self, leaf: Leaf) -> bool:
1125 """See `maybe_increment_lambda_arguments` above for explanation."""
1127 self._lambda_argument_depths
1128 and self._lambda_argument_depths[-1] == self.depth
1129 and leaf.type == token.COLON
1132 self._lambda_argument_depths.pop()
1137 def get_open_lsqb(self) -> Optional[Leaf]:
1138 """Return the most recent opening square bracket (if any)."""
1139 return self.bracket_match.get((self.depth - 1, token.RSQB))
1144 """Holds leaves and comments. Can be printed with `str(line)`."""
1147 leaves: List[Leaf] = Factory(list)
1148 comments: Dict[LeafID, List[Leaf]] = Factory(dict) # keys ordered like `leaves`
1149 bracket_tracker: BracketTracker = Factory(BracketTracker)
1150 inside_brackets: bool = False
1151 should_explode: bool = False
1153 def append(self, leaf: Leaf, preformatted: bool = False) -> None:
1154 """Add a new `leaf` to the end of the line.
1156 Unless `preformatted` is True, the `leaf` will receive a new consistent
1157 whitespace prefix and metadata applied by :class:`BracketTracker`.
1158 Trailing commas are maybe removed, unpacked for loop variables are
1159 demoted from being delimiters.
1161 Inline comments are put aside.
1163 has_value = leaf.type in BRACKETS or bool(leaf.value.strip())
1167 if token.COLON == leaf.type and self.is_class_paren_empty:
1168 del self.leaves[-2:]
1169 if self.leaves and not preformatted:
1170 # Note: at this point leaf.prefix should be empty except for
1171 # imports, for which we only preserve newlines.
1172 leaf.prefix += whitespace(
1173 leaf, complex_subscript=self.is_complex_subscript(leaf)
1175 if self.inside_brackets or not preformatted:
1176 self.bracket_tracker.mark(leaf)
1177 self.maybe_remove_trailing_comma(leaf)
1178 if not self.append_comment(leaf):
1179 self.leaves.append(leaf)
1181 def append_safe(self, leaf: Leaf, preformatted: bool = False) -> None:
1182 """Like :func:`append()` but disallow invalid standalone comment structure.
1184 Raises ValueError when any `leaf` is appended after a standalone comment
1185 or when a standalone comment is not the first leaf on the line.
1187 if self.bracket_tracker.depth == 0:
1189 raise ValueError("cannot append to standalone comments")
1191 if self.leaves and leaf.type == STANDALONE_COMMENT:
1193 "cannot append standalone comments to a populated line"
1196 self.append(leaf, preformatted=preformatted)
1199 def is_comment(self) -> bool:
1200 """Is this line a standalone comment?"""
1201 return len(self.leaves) == 1 and self.leaves[0].type == STANDALONE_COMMENT
1204 def is_decorator(self) -> bool:
1205 """Is this line a decorator?"""
1206 return bool(self) and self.leaves[0].type == token.AT
1209 def is_import(self) -> bool:
1210 """Is this an import line?"""
1211 return bool(self) and is_import(self.leaves[0])
1214 def is_class(self) -> bool:
1215 """Is this line a class definition?"""
1218 and self.leaves[0].type == token.NAME
1219 and self.leaves[0].value == "class"
1223 def is_stub_class(self) -> bool:
1224 """Is this line a class definition with a body consisting only of "..."?"""
1225 return self.is_class and self.leaves[-3:] == [
1226 Leaf(token.DOT, ".") for _ in range(3)
1230 def is_def(self) -> bool:
1231 """Is this a function definition? (Also returns True for async defs.)"""
1233 first_leaf = self.leaves[0]
1238 second_leaf: Optional[Leaf] = self.leaves[1]
1241 return (first_leaf.type == token.NAME and first_leaf.value == "def") or (
1242 first_leaf.type == token.ASYNC
1243 and second_leaf is not None
1244 and second_leaf.type == token.NAME
1245 and second_leaf.value == "def"
1249 def is_class_paren_empty(self) -> bool:
1250 """Is this a class with no base classes but using parentheses?
1252 Those are unnecessary and should be removed.
1256 and len(self.leaves) == 4
1258 and self.leaves[2].type == token.LPAR
1259 and self.leaves[2].value == "("
1260 and self.leaves[3].type == token.RPAR
1261 and self.leaves[3].value == ")"
1265 def is_triple_quoted_string(self) -> bool:
1266 """Is the line a triple quoted string?"""
1269 and self.leaves[0].type == token.STRING
1270 and self.leaves[0].value.startswith(('"""', "'''"))
1273 def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
1274 """If so, needs to be split before emitting."""
1275 for leaf in self.leaves:
1276 if leaf.type == STANDALONE_COMMENT:
1277 if leaf.bracket_depth <= depth_limit:
1281 def contains_inner_type_comments(self) -> bool:
1284 last_leaf = self.leaves[-1]
1285 ignored_ids.add(id(last_leaf))
1286 if last_leaf.type == token.COMMA or (
1287 last_leaf.type == token.RPAR and not last_leaf.value
1289 # When trailing commas or optional parens are inserted by Black for
1290 # consistency, comments after the previous last element are not moved
1291 # (they don't have to, rendering will still be correct). So we ignore
1292 # trailing commas and invisible.
1293 last_leaf = self.leaves[-2]
1294 ignored_ids.add(id(last_leaf))
1298 for leaf_id, comments in self.comments.items():
1299 if leaf_id in ignored_ids:
1302 for comment in comments:
1303 if is_type_comment(comment):
1308 def contains_multiline_strings(self) -> bool:
1309 for leaf in self.leaves:
1310 if is_multiline_string(leaf):
1315 def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
1316 """Remove trailing comma if there is one and it's safe."""
1319 and self.leaves[-1].type == token.COMMA
1320 and closing.type in CLOSING_BRACKETS
1324 if closing.type == token.RBRACE:
1325 self.remove_trailing_comma()
1328 if closing.type == token.RSQB:
1329 comma = self.leaves[-1]
1330 if comma.parent and comma.parent.type == syms.listmaker:
1331 self.remove_trailing_comma()
1334 # For parens let's check if it's safe to remove the comma.
1335 # Imports are always safe.
1337 self.remove_trailing_comma()
1340 # Otherwise, if the trailing one is the only one, we might mistakenly
1341 # change a tuple into a different type by removing the comma.
1342 depth = closing.bracket_depth + 1
1344 opening = closing.opening_bracket
1345 for _opening_index, leaf in enumerate(self.leaves):
1352 for leaf in self.leaves[_opening_index + 1 :]:
1356 bracket_depth = leaf.bracket_depth
1357 if bracket_depth == depth and leaf.type == token.COMMA:
1359 if leaf.parent and leaf.parent.type in {
1367 self.remove_trailing_comma()
1372 def append_comment(self, comment: Leaf) -> bool:
1373 """Add an inline or standalone comment to the line."""
1375 comment.type == STANDALONE_COMMENT
1376 and self.bracket_tracker.any_open_brackets()
1381 if comment.type != token.COMMENT:
1385 comment.type = STANDALONE_COMMENT
1389 last_leaf = self.leaves[-1]
1391 last_leaf.type == token.RPAR
1392 and not last_leaf.value
1393 and last_leaf.parent
1394 and len(list(last_leaf.parent.leaves())) <= 3
1395 and not is_type_comment(comment)
1397 # Comments on an optional parens wrapping a single leaf should belong to
1398 # the wrapped node except if it's a type comment. Pinning the comment like
1399 # this avoids unstable formatting caused by comment migration.
1400 if len(self.leaves) < 2:
1401 comment.type = STANDALONE_COMMENT
1404 last_leaf = self.leaves[-2]
1405 self.comments.setdefault(id(last_leaf), []).append(comment)
1408 def comments_after(self, leaf: Leaf) -> List[Leaf]:
1409 """Generate comments that should appear directly after `leaf`."""
1410 return self.comments.get(id(leaf), [])
1412 def remove_trailing_comma(self) -> None:
1413 """Remove the trailing comma and moves the comments attached to it."""
1414 trailing_comma = self.leaves.pop()
1415 trailing_comma_comments = self.comments.pop(id(trailing_comma), [])
1416 self.comments.setdefault(id(self.leaves[-1]), []).extend(
1417 trailing_comma_comments
1420 def is_complex_subscript(self, leaf: Leaf) -> bool:
1421 """Return True iff `leaf` is part of a slice with non-trivial exprs."""
1422 open_lsqb = self.bracket_tracker.get_open_lsqb()
1423 if open_lsqb is None:
1426 subscript_start = open_lsqb.next_sibling
1428 if isinstance(subscript_start, Node):
1429 if subscript_start.type == syms.listmaker:
1432 if subscript_start.type == syms.subscriptlist:
1433 subscript_start = child_towards(subscript_start, leaf)
1434 return subscript_start is not None and any(
1435 n.type in TEST_DESCENDANTS for n in subscript_start.pre_order()
1438 def __str__(self) -> str:
1439 """Render the line."""
1443 indent = " " * self.depth
1444 leaves = iter(self.leaves)
1445 first = next(leaves)
1446 res = f"{first.prefix}{indent}{first.value}"
1449 for comment in itertools.chain.from_iterable(self.comments.values()):
1453 def __bool__(self) -> bool:
1454 """Return True if the line has leaves or comments."""
1455 return bool(self.leaves or self.comments)
1459 class EmptyLineTracker:
1460 """Provides a stateful method that returns the number of potential extra
1461 empty lines needed before and after the currently processed line.
1463 Note: this tracker works on lines that haven't been split yet. It assumes
1464 the prefix of the first leaf consists of optional newlines. Those newlines
1465 are consumed by `maybe_empty_lines()` and included in the computation.
1468 is_pyi: bool = False
1469 previous_line: Optional[Line] = None
1470 previous_after: int = 0
1471 previous_defs: List[int] = Factory(list)
1473 def maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1474 """Return the number of extra empty lines before and after the `current_line`.
1476 This is for separating `def`, `async def` and `class` with extra empty
1477 lines (two on module-level).
1479 before, after = self._maybe_empty_lines(current_line)
1480 before -= self.previous_after
1481 self.previous_after = after
1482 self.previous_line = current_line
1483 return before, after
1485 def _maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1487 if current_line.depth == 0:
1488 max_allowed = 1 if self.is_pyi else 2
1489 if current_line.leaves:
1490 # Consume the first leaf's extra newlines.
1491 first_leaf = current_line.leaves[0]
1492 before = first_leaf.prefix.count("\n")
1493 before = min(before, max_allowed)
1494 first_leaf.prefix = ""
1497 depth = current_line.depth
1498 while self.previous_defs and self.previous_defs[-1] >= depth:
1499 self.previous_defs.pop()
1501 before = 0 if depth else 1
1503 before = 1 if depth else 2
1504 if current_line.is_decorator or current_line.is_def or current_line.is_class:
1505 return self._maybe_empty_lines_for_class_or_def(current_line, before)
1509 and self.previous_line.is_import
1510 and not current_line.is_import
1511 and depth == self.previous_line.depth
1513 return (before or 1), 0
1517 and self.previous_line.is_class
1518 and current_line.is_triple_quoted_string
1524 def _maybe_empty_lines_for_class_or_def(
1525 self, current_line: Line, before: int
1526 ) -> Tuple[int, int]:
1527 if not current_line.is_decorator:
1528 self.previous_defs.append(current_line.depth)
1529 if self.previous_line is None:
1530 # Don't insert empty lines before the first line in the file.
1533 if self.previous_line.is_decorator:
1536 if self.previous_line.depth < current_line.depth and (
1537 self.previous_line.is_class or self.previous_line.is_def
1542 self.previous_line.is_comment
1543 and self.previous_line.depth == current_line.depth
1549 if self.previous_line.depth > current_line.depth:
1551 elif current_line.is_class or self.previous_line.is_class:
1552 if current_line.is_stub_class and self.previous_line.is_stub_class:
1553 # No blank line between classes with an empty body
1557 elif current_line.is_def and not self.previous_line.is_def:
1558 # Blank line between a block of functions and a block of non-functions
1564 if current_line.depth and newlines:
1570 class LineGenerator(Visitor[Line]):
1571 """Generates reformatted Line objects. Empty lines are not emitted.
1573 Note: destroys the tree it's visiting by mutating prefixes of its leaves
1574 in ways that will no longer stringify to valid Python code on the tree.
1577 is_pyi: bool = False
1578 normalize_strings: bool = True
1579 current_line: Line = Factory(Line)
1580 remove_u_prefix: bool = False
1582 def line(self, indent: int = 0) -> Iterator[Line]:
1585 If the line is empty, only emit if it makes sense.
1586 If the line is too long, split it first and then generate.
1588 If any lines were generated, set up a new current_line.
1590 if not self.current_line:
1591 self.current_line.depth += indent
1592 return # Line is empty, don't emit. Creating a new one unnecessary.
1594 complete_line = self.current_line
1595 self.current_line = Line(depth=complete_line.depth + indent)
1598 def visit_default(self, node: LN) -> Iterator[Line]:
1599 """Default `visit_*()` implementation. Recurses to children of `node`."""
1600 if isinstance(node, Leaf):
1601 any_open_brackets = self.current_line.bracket_tracker.any_open_brackets()
1602 for comment in generate_comments(node):
1603 if any_open_brackets:
1604 # any comment within brackets is subject to splitting
1605 self.current_line.append(comment)
1606 elif comment.type == token.COMMENT:
1607 # regular trailing comment
1608 self.current_line.append(comment)
1609 yield from self.line()
1612 # regular standalone comment
1613 yield from self.line()
1615 self.current_line.append(comment)
1616 yield from self.line()
1618 normalize_prefix(node, inside_brackets=any_open_brackets)
1619 if self.normalize_strings and node.type == token.STRING:
1620 normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
1621 normalize_string_quotes(node)
1622 if node.type == token.NUMBER:
1623 normalize_numeric_literal(node)
1624 if node.type not in WHITESPACE:
1625 self.current_line.append(node)
1626 yield from super().visit_default(node)
1628 def visit_atom(self, node: Node) -> Iterator[Line]:
1629 # Always make parentheses invisible around a single node, because it should
1630 # not be needed (except in the case of yield, where removing the parentheses
1631 # produces a SyntaxError).
1633 len(node.children) == 3
1634 and isinstance(node.children[0], Leaf)
1635 and node.children[0].type == token.LPAR
1636 and isinstance(node.children[2], Leaf)
1637 and node.children[2].type == token.RPAR
1638 and isinstance(node.children[1], Leaf)
1640 node.children[1].type == token.NAME
1641 and node.children[1].value == "yield"
1644 node.children[0].value = ""
1645 node.children[2].value = ""
1646 yield from super().visit_default(node)
1648 def visit_factor(self, node: Node) -> Iterator[Line]:
1649 """Force parentheses between a unary op and a binary power:
1651 -2 ** 8 -> -(2 ** 8)
1653 child = node.children[1]
1654 if child.type == syms.power and len(child.children) == 3:
1655 lpar = Leaf(token.LPAR, "(")
1656 rpar = Leaf(token.RPAR, ")")
1657 index = child.remove() or 0
1658 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
1659 yield from self.visit_default(node)
1661 def visit_INDENT(self, node: Node) -> Iterator[Line]:
1662 """Increase indentation level, maybe yield a line."""
1663 # In blib2to3 INDENT never holds comments.
1664 yield from self.line(+1)
1665 yield from self.visit_default(node)
1667 def visit_DEDENT(self, node: Node) -> Iterator[Line]:
1668 """Decrease indentation level, maybe yield a line."""
1669 # The current line might still wait for trailing comments. At DEDENT time
1670 # there won't be any (they would be prefixes on the preceding NEWLINE).
1671 # Emit the line then.
1672 yield from self.line()
1674 # While DEDENT has no value, its prefix may contain standalone comments
1675 # that belong to the current indentation level. Get 'em.
1676 yield from self.visit_default(node)
1678 # Finally, emit the dedent.
1679 yield from self.line(-1)
1682 self, node: Node, keywords: Set[str], parens: Set[str]
1683 ) -> Iterator[Line]:
1684 """Visit a statement.
1686 This implementation is shared for `if`, `while`, `for`, `try`, `except`,
1687 `def`, `with`, `class`, `assert` and assignments.
1689 The relevant Python language `keywords` for a given statement will be
1690 NAME leaves within it. This methods puts those on a separate line.
1692 `parens` holds a set of string leaf values immediately after which
1693 invisible parens should be put.
1695 normalize_invisible_parens(node, parens_after=parens)
1696 for child in node.children:
1697 if child.type == token.NAME and child.value in keywords: # type: ignore
1698 yield from self.line()
1700 yield from self.visit(child)
1702 def visit_suite(self, node: Node) -> Iterator[Line]:
1703 """Visit a suite."""
1704 if self.is_pyi and is_stub_suite(node):
1705 yield from self.visit(node.children[2])
1707 yield from self.visit_default(node)
1709 def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
1710 """Visit a statement without nested statements."""
1711 is_suite_like = node.parent and node.parent.type in STATEMENT
1713 if self.is_pyi and is_stub_body(node):
1714 yield from self.visit_default(node)
1716 yield from self.line(+1)
1717 yield from self.visit_default(node)
1718 yield from self.line(-1)
1721 if not self.is_pyi or not node.parent or not is_stub_suite(node.parent):
1722 yield from self.line()
1723 yield from self.visit_default(node)
1725 def visit_async_stmt(self, node: Node) -> Iterator[Line]:
1726 """Visit `async def`, `async for`, `async with`."""
1727 yield from self.line()
1729 children = iter(node.children)
1730 for child in children:
1731 yield from self.visit(child)
1733 if child.type == token.ASYNC:
1736 internal_stmt = next(children)
1737 for child in internal_stmt.children:
1738 yield from self.visit(child)
1740 def visit_decorators(self, node: Node) -> Iterator[Line]:
1741 """Visit decorators."""
1742 for child in node.children:
1743 yield from self.line()
1744 yield from self.visit(child)
1746 def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
1747 """Remove a semicolon and put the other statement on a separate line."""
1748 yield from self.line()
1750 def visit_ENDMARKER(self, leaf: Leaf) -> Iterator[Line]:
1751 """End of file. Process outstanding comments and end with a newline."""
1752 yield from self.visit_default(leaf)
1753 yield from self.line()
1755 def visit_STANDALONE_COMMENT(self, leaf: Leaf) -> Iterator[Line]:
1756 if not self.current_line.bracket_tracker.any_open_brackets():
1757 yield from self.line()
1758 yield from self.visit_default(leaf)
1760 def __attrs_post_init__(self) -> None:
1761 """You are in a twisty little maze of passages."""
1764 self.visit_assert_stmt = partial(v, keywords={"assert"}, parens={"assert", ","})
1765 self.visit_if_stmt = partial(
1766 v, keywords={"if", "else", "elif"}, parens={"if", "elif"}
1768 self.visit_while_stmt = partial(v, keywords={"while", "else"}, parens={"while"})
1769 self.visit_for_stmt = partial(v, keywords={"for", "else"}, parens={"for", "in"})
1770 self.visit_try_stmt = partial(
1771 v, keywords={"try", "except", "else", "finally"}, parens=Ø
1773 self.visit_except_clause = partial(v, keywords={"except"}, parens=Ø)
1774 self.visit_with_stmt = partial(v, keywords={"with"}, parens=Ø)
1775 self.visit_funcdef = partial(v, keywords={"def"}, parens=Ø)
1776 self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
1777 self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
1778 self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
1779 self.visit_import_from = partial(v, keywords=Ø, parens={"import"})
1780 self.visit_del_stmt = partial(v, keywords=Ø, parens={"del"})
1781 self.visit_async_funcdef = self.visit_async_stmt
1782 self.visit_decorated = self.visit_decorators
1785 IMPLICIT_TUPLE = {syms.testlist, syms.testlist_star_expr, syms.exprlist}
1786 BRACKET = {token.LPAR: token.RPAR, token.LSQB: token.RSQB, token.LBRACE: token.RBRACE}
1787 OPENING_BRACKETS = set(BRACKET.keys())
1788 CLOSING_BRACKETS = set(BRACKET.values())
1789 BRACKETS = OPENING_BRACKETS | CLOSING_BRACKETS
1790 ALWAYS_NO_SPACE = CLOSING_BRACKETS | {token.COMMA, STANDALONE_COMMENT}
1793 def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str: # noqa: C901
1794 """Return whitespace prefix if needed for the given `leaf`.
1796 `complex_subscript` signals whether the given leaf is part of a subscription
1797 which has non-trivial arguments, like arithmetic expressions or function calls.
1805 if t in ALWAYS_NO_SPACE:
1808 if t == token.COMMENT:
1811 assert p is not None, f"INTERNAL ERROR: hand-made leaf without parent: {leaf!r}"
1812 if t == token.COLON and p.type not in {
1819 prev = leaf.prev_sibling
1821 prevp = preceding_leaf(p)
1822 if not prevp or prevp.type in OPENING_BRACKETS:
1825 if t == token.COLON:
1826 if prevp.type == token.COLON:
1829 elif prevp.type != token.COMMA and not complex_subscript:
1834 if prevp.type == token.EQUAL:
1836 if prevp.parent.type in {
1844 elif prevp.parent.type == syms.typedargslist:
1845 # A bit hacky: if the equal sign has whitespace, it means we
1846 # previously found it's a typed argument. So, we're using
1850 elif prevp.type in STARS:
1851 if is_vararg(prevp, within=VARARGS_PARENTS | UNPACKING_PARENTS):
1854 elif prevp.type == token.COLON:
1855 if prevp.parent and prevp.parent.type in {syms.subscript, syms.sliceop}:
1856 return SPACE if complex_subscript else NO
1860 and prevp.parent.type == syms.factor
1861 and prevp.type in MATH_OPERATORS
1866 prevp.type == token.RIGHTSHIFT
1868 and prevp.parent.type == syms.shift_expr
1869 and prevp.prev_sibling
1870 and prevp.prev_sibling.type == token.NAME
1871 and prevp.prev_sibling.value == "print" # type: ignore
1873 # Python 2 print chevron
1876 elif prev.type in OPENING_BRACKETS:
1879 if p.type in {syms.parameters, syms.arglist}:
1880 # untyped function signatures or calls
1881 if not prev or prev.type != token.COMMA:
1884 elif p.type == syms.varargslist:
1886 if prev and prev.type != token.COMMA:
1889 elif p.type == syms.typedargslist:
1890 # typed function signatures
1894 if t == token.EQUAL:
1895 if prev.type != syms.tname:
1898 elif prev.type == token.EQUAL:
1899 # A bit hacky: if the equal sign has whitespace, it means we
1900 # previously found it's a typed argument. So, we're using that, too.
1903 elif prev.type != token.COMMA:
1906 elif p.type == syms.tname:
1909 prevp = preceding_leaf(p)
1910 if not prevp or prevp.type != token.COMMA:
1913 elif p.type == syms.trailer:
1914 # attributes and calls
1915 if t == token.LPAR or t == token.RPAR:
1920 prevp = preceding_leaf(p)
1921 if not prevp or prevp.type != token.NUMBER:
1924 elif t == token.LSQB:
1927 elif prev.type != token.COMMA:
1930 elif p.type == syms.argument:
1932 if t == token.EQUAL:
1936 prevp = preceding_leaf(p)
1937 if not prevp or prevp.type == token.LPAR:
1940 elif prev.type in {token.EQUAL} | STARS:
1943 elif p.type == syms.decorator:
1947 elif p.type == syms.dotted_name:
1951 prevp = preceding_leaf(p)
1952 if not prevp or prevp.type == token.AT or prevp.type == token.DOT:
1955 elif p.type == syms.classdef:
1959 if prev and prev.type == token.LPAR:
1962 elif p.type in {syms.subscript, syms.sliceop}:
1965 assert p.parent is not None, "subscripts are always parented"
1966 if p.parent.type == syms.subscriptlist:
1971 elif not complex_subscript:
1974 elif p.type == syms.atom:
1975 if prev and t == token.DOT:
1976 # dots, but not the first one.
1979 elif p.type == syms.dictsetmaker:
1981 if prev and prev.type == token.DOUBLESTAR:
1984 elif p.type in {syms.factor, syms.star_expr}:
1987 prevp = preceding_leaf(p)
1988 if not prevp or prevp.type in OPENING_BRACKETS:
1991 prevp_parent = prevp.parent
1992 assert prevp_parent is not None
1993 if prevp.type == token.COLON and prevp_parent.type in {
1999 elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
2002 elif t in {token.NAME, token.NUMBER, token.STRING}:
2005 elif p.type == syms.import_from:
2007 if prev and prev.type == token.DOT:
2010 elif t == token.NAME:
2014 if prev and prev.type == token.DOT:
2017 elif p.type == syms.sliceop:
2023 def preceding_leaf(node: Optional[LN]) -> Optional[Leaf]:
2024 """Return the first leaf that precedes `node`, if any."""
2026 res = node.prev_sibling
2028 if isinstance(res, Leaf):
2032 return list(res.leaves())[-1]
2041 def child_towards(ancestor: Node, descendant: LN) -> Optional[LN]:
2042 """Return the child of `ancestor` that contains `descendant`."""
2043 node: Optional[LN] = descendant
2044 while node and node.parent != ancestor:
2049 def container_of(leaf: Leaf) -> LN:
2050 """Return `leaf` or one of its ancestors that is the topmost container of it.
2052 By "container" we mean a node where `leaf` is the very first child.
2054 same_prefix = leaf.prefix
2055 container: LN = leaf
2057 parent = container.parent
2061 if parent.children[0].prefix != same_prefix:
2064 if parent.type == syms.file_input:
2067 if parent.prev_sibling is not None and parent.prev_sibling.type in BRACKETS:
2074 def is_split_after_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2075 """Return the priority of the `leaf` delimiter, given a line break after it.
2077 The delimiter priorities returned here are from those delimiters that would
2078 cause a line break after themselves.
2080 Higher numbers are higher priority.
2082 if leaf.type == token.COMMA:
2083 return COMMA_PRIORITY
2088 def is_split_before_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2089 """Return the priority of the `leaf` delimiter, given a line break before it.
2091 The delimiter priorities returned here are from those delimiters that would
2092 cause a line break before themselves.
2094 Higher numbers are higher priority.
2096 if is_vararg(leaf, within=VARARGS_PARENTS | UNPACKING_PARENTS):
2097 # * and ** might also be MATH_OPERATORS but in this case they are not.
2098 # Don't treat them as a delimiter.
2102 leaf.type == token.DOT
2104 and leaf.parent.type not in {syms.import_from, syms.dotted_name}
2105 and (previous is None or previous.type in CLOSING_BRACKETS)
2110 leaf.type in MATH_OPERATORS
2112 and leaf.parent.type not in {syms.factor, syms.star_expr}
2114 return MATH_PRIORITIES[leaf.type]
2116 if leaf.type in COMPARATORS:
2117 return COMPARATOR_PRIORITY
2120 leaf.type == token.STRING
2121 and previous is not None
2122 and previous.type == token.STRING
2124 return STRING_PRIORITY
2126 if leaf.type not in {token.NAME, token.ASYNC}:
2132 and leaf.parent.type in {syms.comp_for, syms.old_comp_for}
2133 or leaf.type == token.ASYNC
2136 not isinstance(leaf.prev_sibling, Leaf)
2137 or leaf.prev_sibling.value != "async"
2139 return COMPREHENSION_PRIORITY
2144 and leaf.parent.type in {syms.comp_if, syms.old_comp_if}
2146 return COMPREHENSION_PRIORITY
2148 if leaf.value in {"if", "else"} and leaf.parent and leaf.parent.type == syms.test:
2149 return TERNARY_PRIORITY
2151 if leaf.value == "is":
2152 return COMPARATOR_PRIORITY
2157 and leaf.parent.type in {syms.comp_op, syms.comparison}
2159 previous is not None
2160 and previous.type == token.NAME
2161 and previous.value == "not"
2164 return COMPARATOR_PRIORITY
2169 and leaf.parent.type == syms.comp_op
2171 previous is not None
2172 and previous.type == token.NAME
2173 and previous.value == "is"
2176 return COMPARATOR_PRIORITY
2178 if leaf.value in LOGIC_OPERATORS and leaf.parent:
2179 return LOGIC_PRIORITY
2184 FMT_OFF = {"# fmt: off", "# fmt:off", "# yapf: disable"}
2185 FMT_ON = {"# fmt: on", "# fmt:on", "# yapf: enable"}
2188 def generate_comments(leaf: LN) -> Iterator[Leaf]:
2189 """Clean the prefix of the `leaf` and generate comments from it, if any.
2191 Comments in lib2to3 are shoved into the whitespace prefix. This happens
2192 in `pgen2/driver.py:Driver.parse_tokens()`. This was a brilliant implementation
2193 move because it does away with modifying the grammar to include all the
2194 possible places in which comments can be placed.
2196 The sad consequence for us though is that comments don't "belong" anywhere.
2197 This is why this function generates simple parentless Leaf objects for
2198 comments. We simply don't know what the correct parent should be.
2200 No matter though, we can live without this. We really only need to
2201 differentiate between inline and standalone comments. The latter don't
2202 share the line with any code.
2204 Inline comments are emitted as regular token.COMMENT leaves. Standalone
2205 are emitted with a fake STANDALONE_COMMENT token identifier.
2207 for pc in list_comments(leaf.prefix, is_endmarker=leaf.type == token.ENDMARKER):
2208 yield Leaf(pc.type, pc.value, prefix="\n" * pc.newlines)
2213 """Describes a piece of syntax that is a comment.
2215 It's not a :class:`blib2to3.pytree.Leaf` so that:
2217 * it can be cached (`Leaf` objects should not be reused more than once as
2218 they store their lineno, column, prefix, and parent information);
2219 * `newlines` and `consumed` fields are kept separate from the `value`. This
2220 simplifies handling of special marker comments like ``# fmt: off/on``.
2223 type: int # token.COMMENT or STANDALONE_COMMENT
2224 value: str # content of the comment
2225 newlines: int # how many newlines before the comment
2226 consumed: int # how many characters of the original leaf's prefix did we consume
2229 @lru_cache(maxsize=4096)
2230 def list_comments(prefix: str, *, is_endmarker: bool) -> List[ProtoComment]:
2231 """Return a list of :class:`ProtoComment` objects parsed from the given `prefix`."""
2232 result: List[ProtoComment] = []
2233 if not prefix or "#" not in prefix:
2239 for index, line in enumerate(prefix.split("\n")):
2240 consumed += len(line) + 1 # adding the length of the split '\n'
2241 line = line.lstrip()
2244 if not line.startswith("#"):
2245 # Escaped newlines outside of a comment are not really newlines at
2246 # all. We treat a single-line comment following an escaped newline
2247 # as a simple trailing comment.
2248 if line.endswith("\\"):
2252 if index == ignored_lines and not is_endmarker:
2253 comment_type = token.COMMENT # simple trailing comment
2255 comment_type = STANDALONE_COMMENT
2256 comment = make_comment(line)
2259 type=comment_type, value=comment, newlines=nlines, consumed=consumed
2266 def make_comment(content: str) -> str:
2267 """Return a consistently formatted comment from the given `content` string.
2269 All comments (except for "##", "#!", "#:", '#'", "#%%") should have a single
2270 space between the hash sign and the content.
2272 If `content` didn't start with a hash sign, one is provided.
2274 content = content.rstrip()
2278 if content[0] == "#":
2279 content = content[1:]
2280 if content and content[0] not in " !:#'%":
2281 content = " " + content
2282 return "#" + content
2288 inner: bool = False,
2289 features: Collection[Feature] = (),
2290 ) -> Iterator[Line]:
2291 """Split a `line` into potentially many lines.
2293 They should fit in the allotted `line_length` but might not be able to.
2294 `inner` signifies that there were a pair of brackets somewhere around the
2295 current `line`, possibly transitively. This means we can fallback to splitting
2296 by delimiters if the LHS/RHS don't yield any results.
2298 `features` are syntactical features that may be used in the output.
2304 line_str = str(line).strip("\n")
2307 not line.contains_inner_type_comments()
2308 and not line.should_explode
2309 and is_line_short_enough(line, line_length=line_length, line_str=line_str)
2314 split_funcs: List[SplitFunc]
2316 split_funcs = [left_hand_split]
2319 def rhs(line: Line, features: Collection[Feature]) -> Iterator[Line]:
2320 for omit in generate_trailers_to_omit(line, line_length):
2321 lines = list(right_hand_split(line, line_length, features, omit=omit))
2322 if is_line_short_enough(lines[0], line_length=line_length):
2326 # All splits failed, best effort split with no omits.
2327 # This mostly happens to multiline strings that are by definition
2328 # reported as not fitting a single line.
2329 yield from right_hand_split(line, line_length, features=features)
2331 if line.inside_brackets:
2332 split_funcs = [delimiter_split, standalone_comment_split, rhs]
2335 for split_func in split_funcs:
2336 # We are accumulating lines in `result` because we might want to abort
2337 # mission and return the original line in the end, or attempt a different
2339 result: List[Line] = []
2341 for l in split_func(line, features):
2342 if str(l).strip("\n") == line_str:
2343 raise CannotSplit("Split function returned an unchanged result")
2347 l, line_length=line_length, inner=True, features=features
2361 def left_hand_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2362 """Split line into many lines, starting with the first matching bracket pair.
2364 Note: this usually looks weird, only use this for function definitions.
2365 Prefer RHS otherwise. This is why this function is not symmetrical with
2366 :func:`right_hand_split` which also handles optional parentheses.
2368 tail_leaves: List[Leaf] = []
2369 body_leaves: List[Leaf] = []
2370 head_leaves: List[Leaf] = []
2371 current_leaves = head_leaves
2372 matching_bracket = None
2373 for leaf in line.leaves:
2375 current_leaves is body_leaves
2376 and leaf.type in CLOSING_BRACKETS
2377 and leaf.opening_bracket is matching_bracket
2379 current_leaves = tail_leaves if body_leaves else head_leaves
2380 current_leaves.append(leaf)
2381 if current_leaves is head_leaves:
2382 if leaf.type in OPENING_BRACKETS:
2383 matching_bracket = leaf
2384 current_leaves = body_leaves
2385 if not matching_bracket:
2386 raise CannotSplit("No brackets found")
2388 head = bracket_split_build_line(head_leaves, line, matching_bracket)
2389 body = bracket_split_build_line(body_leaves, line, matching_bracket, is_body=True)
2390 tail = bracket_split_build_line(tail_leaves, line, matching_bracket)
2391 bracket_split_succeeded_or_raise(head, body, tail)
2392 for result in (head, body, tail):
2397 def right_hand_split(
2400 features: Collection[Feature] = (),
2401 omit: Collection[LeafID] = (),
2402 ) -> Iterator[Line]:
2403 """Split line into many lines, starting with the last matching bracket pair.
2405 If the split was by optional parentheses, attempt splitting without them, too.
2406 `omit` is a collection of closing bracket IDs that shouldn't be considered for
2409 Note: running this function modifies `bracket_depth` on the leaves of `line`.
2411 tail_leaves: List[Leaf] = []
2412 body_leaves: List[Leaf] = []
2413 head_leaves: List[Leaf] = []
2414 current_leaves = tail_leaves
2415 opening_bracket = None
2416 closing_bracket = None
2417 for leaf in reversed(line.leaves):
2418 if current_leaves is body_leaves:
2419 if leaf is opening_bracket:
2420 current_leaves = head_leaves if body_leaves else tail_leaves
2421 current_leaves.append(leaf)
2422 if current_leaves is tail_leaves:
2423 if leaf.type in CLOSING_BRACKETS and id(leaf) not in omit:
2424 opening_bracket = leaf.opening_bracket
2425 closing_bracket = leaf
2426 current_leaves = body_leaves
2427 if not (opening_bracket and closing_bracket and head_leaves):
2428 # If there is no opening or closing_bracket that means the split failed and
2429 # all content is in the tail. Otherwise, if `head_leaves` are empty, it means
2430 # the matching `opening_bracket` wasn't available on `line` anymore.
2431 raise CannotSplit("No brackets found")
2433 tail_leaves.reverse()
2434 body_leaves.reverse()
2435 head_leaves.reverse()
2436 head = bracket_split_build_line(head_leaves, line, opening_bracket)
2437 body = bracket_split_build_line(body_leaves, line, opening_bracket, is_body=True)
2438 tail = bracket_split_build_line(tail_leaves, line, opening_bracket)
2439 bracket_split_succeeded_or_raise(head, body, tail)
2441 # the body shouldn't be exploded
2442 not body.should_explode
2443 # the opening bracket is an optional paren
2444 and opening_bracket.type == token.LPAR
2445 and not opening_bracket.value
2446 # the closing bracket is an optional paren
2447 and closing_bracket.type == token.RPAR
2448 and not closing_bracket.value
2449 # it's not an import (optional parens are the only thing we can split on
2450 # in this case; attempting a split without them is a waste of time)
2451 and not line.is_import
2452 # there are no standalone comments in the body
2453 and not body.contains_standalone_comments(0)
2454 # and we can actually remove the parens
2455 and can_omit_invisible_parens(body, line_length)
2457 omit = {id(closing_bracket), *omit}
2459 yield from right_hand_split(line, line_length, features=features, omit=omit)
2465 or is_line_short_enough(body, line_length=line_length)
2468 "Splitting failed, body is still too long and can't be split."
2471 elif head.contains_multiline_strings() or tail.contains_multiline_strings():
2473 "The current optional pair of parentheses is bound to fail to "
2474 "satisfy the splitting algorithm because the head or the tail "
2475 "contains multiline strings which by definition never fit one "
2479 ensure_visible(opening_bracket)
2480 ensure_visible(closing_bracket)
2481 for result in (head, body, tail):
2486 def bracket_split_succeeded_or_raise(head: Line, body: Line, tail: Line) -> None:
2487 """Raise :exc:`CannotSplit` if the last left- or right-hand split failed.
2489 Do nothing otherwise.
2491 A left- or right-hand split is based on a pair of brackets. Content before
2492 (and including) the opening bracket is left on one line, content inside the
2493 brackets is put on a separate line, and finally content starting with and
2494 following the closing bracket is put on a separate line.
2496 Those are called `head`, `body`, and `tail`, respectively. If the split
2497 produced the same line (all content in `head`) or ended up with an empty `body`
2498 and the `tail` is just the closing bracket, then it's considered failed.
2500 tail_len = len(str(tail).strip())
2503 raise CannotSplit("Splitting brackets produced the same line")
2507 f"Splitting brackets on an empty body to save "
2508 f"{tail_len} characters is not worth it"
2512 def bracket_split_build_line(
2513 leaves: List[Leaf], original: Line, opening_bracket: Leaf, *, is_body: bool = False
2515 """Return a new line with given `leaves` and respective comments from `original`.
2517 If `is_body` is True, the result line is one-indented inside brackets and as such
2518 has its first leaf's prefix normalized and a trailing comma added when expected.
2520 result = Line(depth=original.depth)
2522 result.inside_brackets = True
2525 # Since body is a new indent level, remove spurious leading whitespace.
2526 normalize_prefix(leaves[0], inside_brackets=True)
2527 # Ensure a trailing comma for imports and standalone function arguments, but
2528 # be careful not to add one after any comments.
2529 no_commas = original.is_def and not any(
2530 l.type == token.COMMA for l in leaves
2533 if original.is_import or no_commas:
2534 for i in range(len(leaves) - 1, -1, -1):
2535 if leaves[i].type == STANDALONE_COMMENT:
2537 elif leaves[i].type == token.COMMA:
2540 leaves.insert(i + 1, Leaf(token.COMMA, ","))
2544 result.append(leaf, preformatted=True)
2545 for comment_after in original.comments_after(leaf):
2546 result.append(comment_after, preformatted=True)
2548 result.should_explode = should_explode(result, opening_bracket)
2552 def dont_increase_indentation(split_func: SplitFunc) -> SplitFunc:
2553 """Normalize prefix of the first leaf in every line returned by `split_func`.
2555 This is a decorator over relevant split functions.
2559 def split_wrapper(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2560 for l in split_func(line, features):
2561 normalize_prefix(l.leaves[0], inside_brackets=True)
2564 return split_wrapper
2567 @dont_increase_indentation
2568 def delimiter_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2569 """Split according to delimiters of the highest priority.
2571 If the appropriate Features are given, the split will add trailing commas
2572 also in function signatures and calls that contain `*` and `**`.
2575 last_leaf = line.leaves[-1]
2577 raise CannotSplit("Line empty")
2579 bt = line.bracket_tracker
2581 delimiter_priority = bt.max_delimiter_priority(exclude={id(last_leaf)})
2583 raise CannotSplit("No delimiters found")
2585 if delimiter_priority == DOT_PRIORITY:
2586 if bt.delimiter_count_with_priority(delimiter_priority) == 1:
2587 raise CannotSplit("Splitting a single attribute from its owner looks wrong")
2589 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2590 lowest_depth = sys.maxsize
2591 trailing_comma_safe = True
2593 def append_to_line(leaf: Leaf) -> Iterator[Line]:
2594 """Append `leaf` to current line or to new line if appending impossible."""
2595 nonlocal current_line
2597 current_line.append_safe(leaf, preformatted=True)
2601 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2602 current_line.append(leaf)
2604 for leaf in line.leaves:
2605 yield from append_to_line(leaf)
2607 for comment_after in line.comments_after(leaf):
2608 yield from append_to_line(comment_after)
2610 lowest_depth = min(lowest_depth, leaf.bracket_depth)
2611 if leaf.bracket_depth == lowest_depth:
2612 if is_vararg(leaf, within={syms.typedargslist}):
2613 trailing_comma_safe = (
2614 trailing_comma_safe and Feature.TRAILING_COMMA_IN_DEF in features
2616 elif is_vararg(leaf, within={syms.arglist, syms.argument}):
2617 trailing_comma_safe = (
2618 trailing_comma_safe and Feature.TRAILING_COMMA_IN_CALL in features
2621 leaf_priority = bt.delimiters.get(id(leaf))
2622 if leaf_priority == delimiter_priority:
2625 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2629 and delimiter_priority == COMMA_PRIORITY
2630 and current_line.leaves[-1].type != token.COMMA
2631 and current_line.leaves[-1].type != STANDALONE_COMMENT
2633 current_line.append(Leaf(token.COMMA, ","))
2637 @dont_increase_indentation
2638 def standalone_comment_split(
2639 line: Line, features: Collection[Feature] = ()
2640 ) -> Iterator[Line]:
2641 """Split standalone comments from the rest of the line."""
2642 if not line.contains_standalone_comments(0):
2643 raise CannotSplit("Line does not have any standalone comments")
2645 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2647 def append_to_line(leaf: Leaf) -> Iterator[Line]:
2648 """Append `leaf` to current line or to new line if appending impossible."""
2649 nonlocal current_line
2651 current_line.append_safe(leaf, preformatted=True)
2655 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2656 current_line.append(leaf)
2658 for leaf in line.leaves:
2659 yield from append_to_line(leaf)
2661 for comment_after in line.comments_after(leaf):
2662 yield from append_to_line(comment_after)
2668 def is_import(leaf: Leaf) -> bool:
2669 """Return True if the given leaf starts an import statement."""
2676 (v == "import" and p and p.type == syms.import_name)
2677 or (v == "from" and p and p.type == syms.import_from)
2682 def is_type_comment(leaf: Leaf) -> bool:
2683 """Return True if the given leaf is a special comment.
2684 Only returns true for type comments for now."""
2687 return t in {token.COMMENT, t == STANDALONE_COMMENT} and v.startswith("# type:")
2690 def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
2691 """Leave existing extra newlines if not `inside_brackets`. Remove everything
2694 Note: don't use backslashes for formatting or you'll lose your voting rights.
2696 if not inside_brackets:
2697 spl = leaf.prefix.split("#")
2698 if "\\" not in spl[0]:
2699 nl_count = spl[-1].count("\n")
2702 leaf.prefix = "\n" * nl_count
2708 def normalize_string_prefix(leaf: Leaf, remove_u_prefix: bool = False) -> None:
2709 """Make all string prefixes lowercase.
2711 If remove_u_prefix is given, also removes any u prefix from the string.
2713 Note: Mutates its argument.
2715 match = re.match(r"^([furbFURB]*)(.*)$", leaf.value, re.DOTALL)
2716 assert match is not None, f"failed to match string {leaf.value!r}"
2717 orig_prefix = match.group(1)
2718 new_prefix = orig_prefix.lower()
2720 new_prefix = new_prefix.replace("u", "")
2721 leaf.value = f"{new_prefix}{match.group(2)}"
2724 def normalize_string_quotes(leaf: Leaf) -> None:
2725 """Prefer double quotes but only if it doesn't cause more escaping.
2727 Adds or removes backslashes as appropriate. Doesn't parse and fix
2728 strings nested in f-strings (yet).
2730 Note: Mutates its argument.
2732 value = leaf.value.lstrip("furbFURB")
2733 if value[:3] == '"""':
2736 elif value[:3] == "'''":
2739 elif value[0] == '"':
2745 first_quote_pos = leaf.value.find(orig_quote)
2746 if first_quote_pos == -1:
2747 return # There's an internal error
2749 prefix = leaf.value[:first_quote_pos]
2750 unescaped_new_quote = re.compile(rf"(([^\\]|^)(\\\\)*){new_quote}")
2751 escaped_new_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){new_quote}")
2752 escaped_orig_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){orig_quote}")
2753 body = leaf.value[first_quote_pos + len(orig_quote) : -len(orig_quote)]
2754 if "r" in prefix.casefold():
2755 if unescaped_new_quote.search(body):
2756 # There's at least one unescaped new_quote in this raw string
2757 # so converting is impossible
2760 # Do not introduce or remove backslashes in raw strings
2763 # remove unnecessary escapes
2764 new_body = sub_twice(escaped_new_quote, rf"\1\2{new_quote}", body)
2765 if body != new_body:
2766 # Consider the string without unnecessary escapes as the original
2768 leaf.value = f"{prefix}{orig_quote}{body}{orig_quote}"
2769 new_body = sub_twice(escaped_orig_quote, rf"\1\2{orig_quote}", new_body)
2770 new_body = sub_twice(unescaped_new_quote, rf"\1\\{new_quote}", new_body)
2771 if "f" in prefix.casefold():
2772 matches = re.findall(
2774 (?:[^{]|^)\{ # start of the string or a non-{ followed by a single {
2775 ([^{].*?) # contents of the brackets except if begins with {{
2776 \}(?:[^}]|$) # A } followed by end of the string or a non-}
2783 # Do not introduce backslashes in interpolated expressions
2785 if new_quote == '"""' and new_body[-1:] == '"':
2787 new_body = new_body[:-1] + '\\"'
2788 orig_escape_count = body.count("\\")
2789 new_escape_count = new_body.count("\\")
2790 if new_escape_count > orig_escape_count:
2791 return # Do not introduce more escaping
2793 if new_escape_count == orig_escape_count and orig_quote == '"':
2794 return # Prefer double quotes
2796 leaf.value = f"{prefix}{new_quote}{new_body}{new_quote}"
2799 def normalize_numeric_literal(leaf: Leaf) -> None:
2800 """Normalizes numeric (float, int, and complex) literals.
2802 All letters used in the representation are normalized to lowercase (except
2803 in Python 2 long literals).
2805 text = leaf.value.lower()
2806 if text.startswith(("0o", "0b")):
2807 # Leave octal and binary literals alone.
2809 elif text.startswith("0x"):
2810 # Change hex literals to upper case.
2811 before, after = text[:2], text[2:]
2812 text = f"{before}{after.upper()}"
2814 before, after = text.split("e")
2816 if after.startswith("-"):
2819 elif after.startswith("+"):
2821 before = format_float_or_int_string(before)
2822 text = f"{before}e{sign}{after}"
2823 elif text.endswith(("j", "l")):
2826 # Capitalize in "2L" because "l" looks too similar to "1".
2829 text = f"{format_float_or_int_string(number)}{suffix}"
2831 text = format_float_or_int_string(text)
2835 def format_float_or_int_string(text: str) -> str:
2836 """Formats a float string like "1.0"."""
2840 before, after = text.split(".")
2841 return f"{before or 0}.{after or 0}"
2844 def normalize_invisible_parens(node: Node, parens_after: Set[str]) -> None:
2845 """Make existing optional parentheses invisible or create new ones.
2847 `parens_after` is a set of string leaf values immediately after which parens
2850 Standardizes on visible parentheses for single-element tuples, and keeps
2851 existing visible parentheses for other tuples and generator expressions.
2853 for pc in list_comments(node.prefix, is_endmarker=False):
2854 if pc.value in FMT_OFF:
2855 # This `node` has a prefix with `# fmt: off`, don't mess with parens.
2859 for index, child in enumerate(list(node.children)):
2860 # Add parentheses around long tuple unpacking in assignments.
2863 and isinstance(child, Node)
2864 and child.type == syms.testlist_star_expr
2869 if is_walrus_assignment(child):
2871 if child.type == syms.atom:
2872 if maybe_make_parens_invisible_in_atom(child, parent=node):
2873 lpar = Leaf(token.LPAR, "")
2874 rpar = Leaf(token.RPAR, "")
2875 index = child.remove() or 0
2876 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2877 elif is_one_tuple(child):
2878 # wrap child in visible parentheses
2879 lpar = Leaf(token.LPAR, "(")
2880 rpar = Leaf(token.RPAR, ")")
2882 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2883 elif node.type == syms.import_from:
2884 # "import from" nodes store parentheses directly as part of
2886 if child.type == token.LPAR:
2887 # make parentheses invisible
2888 child.value = "" # type: ignore
2889 node.children[-1].value = "" # type: ignore
2890 elif child.type != token.STAR:
2891 # insert invisible parentheses
2892 node.insert_child(index, Leaf(token.LPAR, ""))
2893 node.append_child(Leaf(token.RPAR, ""))
2896 elif not (isinstance(child, Leaf) and is_multiline_string(child)):
2897 # wrap child in invisible parentheses
2898 lpar = Leaf(token.LPAR, "")
2899 rpar = Leaf(token.RPAR, "")
2900 index = child.remove() or 0
2901 prefix = child.prefix
2903 new_child = Node(syms.atom, [lpar, child, rpar])
2904 new_child.prefix = prefix
2905 node.insert_child(index, new_child)
2907 check_lpar = isinstance(child, Leaf) and child.value in parens_after
2910 def normalize_fmt_off(node: Node) -> None:
2911 """Convert content between `# fmt: off`/`# fmt: on` into standalone comments."""
2914 try_again = convert_one_fmt_off_pair(node)
2917 def convert_one_fmt_off_pair(node: Node) -> bool:
2918 """Convert content of a single `# fmt: off`/`# fmt: on` into a standalone comment.
2920 Returns True if a pair was converted.
2922 for leaf in node.leaves():
2923 previous_consumed = 0
2924 for comment in list_comments(leaf.prefix, is_endmarker=False):
2925 if comment.value in FMT_OFF:
2926 # We only want standalone comments. If there's no previous leaf or
2927 # the previous leaf is indentation, it's a standalone comment in
2929 if comment.type != STANDALONE_COMMENT:
2930 prev = preceding_leaf(leaf)
2931 if prev and prev.type not in WHITESPACE:
2934 ignored_nodes = list(generate_ignored_nodes(leaf))
2935 if not ignored_nodes:
2938 first = ignored_nodes[0] # Can be a container node with the `leaf`.
2939 parent = first.parent
2940 prefix = first.prefix
2941 first.prefix = prefix[comment.consumed :]
2943 comment.value + "\n" + "".join(str(n) for n in ignored_nodes)
2945 if hidden_value.endswith("\n"):
2946 # That happens when one of the `ignored_nodes` ended with a NEWLINE
2947 # leaf (possibly followed by a DEDENT).
2948 hidden_value = hidden_value[:-1]
2950 for ignored in ignored_nodes:
2951 index = ignored.remove()
2952 if first_idx is None:
2954 assert parent is not None, "INTERNAL ERROR: fmt: on/off handling (1)"
2955 assert first_idx is not None, "INTERNAL ERROR: fmt: on/off handling (2)"
2956 parent.insert_child(
2961 prefix=prefix[:previous_consumed] + "\n" * comment.newlines,
2966 previous_consumed = comment.consumed
2971 def generate_ignored_nodes(leaf: Leaf) -> Iterator[LN]:
2972 """Starting from the container of `leaf`, generate all leaves until `# fmt: on`.
2974 Stops at the end of the block.
2976 container: Optional[LN] = container_of(leaf)
2977 while container is not None and container.type != token.ENDMARKER:
2978 for comment in list_comments(container.prefix, is_endmarker=False):
2979 if comment.value in FMT_ON:
2984 container = container.next_sibling
2987 def maybe_make_parens_invisible_in_atom(node: LN, parent: LN) -> bool:
2988 """If it's safe, make the parens in the atom `node` invisible, recursively.
2990 Returns whether the node should itself be wrapped in invisible parentheses.
2994 node.type != syms.atom
2995 or is_empty_tuple(node)
2996 or is_one_tuple(node)
2997 or (is_yield(node) and parent.type != syms.expr_stmt)
2998 or max_delimiter_priority_in_atom(node) >= COMMA_PRIORITY
3002 first = node.children[0]
3003 last = node.children[-1]
3004 if first.type == token.LPAR and last.type == token.RPAR:
3005 # make parentheses invisible
3006 first.value = "" # type: ignore
3007 last.value = "" # type: ignore
3008 if len(node.children) > 1:
3009 maybe_make_parens_invisible_in_atom(node.children[1], parent=parent)
3015 def is_empty_tuple(node: LN) -> bool:
3016 """Return True if `node` holds an empty tuple."""
3018 node.type == syms.atom
3019 and len(node.children) == 2
3020 and node.children[0].type == token.LPAR
3021 and node.children[1].type == token.RPAR
3025 def unwrap_singleton_parenthesis(node: LN) -> Optional[LN]:
3026 """Returns `wrapped` if `node` is of the shape ( wrapped ).
3028 Parenthesis can be optional. Returns None otherwise"""
3029 if len(node.children) != 3:
3031 lpar, wrapped, rpar = node.children
3032 if not (lpar.type == token.LPAR and rpar.type == token.RPAR):
3038 def is_one_tuple(node: LN) -> bool:
3039 """Return True if `node` holds a tuple with one element, with or without parens."""
3040 if node.type == syms.atom:
3041 gexp = unwrap_singleton_parenthesis(node)
3042 if gexp is None or gexp.type != syms.testlist_gexp:
3045 return len(gexp.children) == 2 and gexp.children[1].type == token.COMMA
3048 node.type in IMPLICIT_TUPLE
3049 and len(node.children) == 2
3050 and node.children[1].type == token.COMMA
3054 def is_walrus_assignment(node: LN) -> bool:
3055 """Return True iff `node` is of the shape ( test := test )"""
3056 inner = unwrap_singleton_parenthesis(node)
3057 return inner is not None and inner.type == syms.namedexpr_test
3060 def is_yield(node: LN) -> bool:
3061 """Return True if `node` holds a `yield` or `yield from` expression."""
3062 if node.type == syms.yield_expr:
3065 if node.type == token.NAME and node.value == "yield": # type: ignore
3068 if node.type != syms.atom:
3071 if len(node.children) != 3:
3074 lpar, expr, rpar = node.children
3075 if lpar.type == token.LPAR and rpar.type == token.RPAR:
3076 return is_yield(expr)
3081 def is_vararg(leaf: Leaf, within: Set[NodeType]) -> bool:
3082 """Return True if `leaf` is a star or double star in a vararg or kwarg.
3084 If `within` includes VARARGS_PARENTS, this applies to function signatures.
3085 If `within` includes UNPACKING_PARENTS, it applies to right hand-side
3086 extended iterable unpacking (PEP 3132) and additional unpacking
3087 generalizations (PEP 448).
3089 if leaf.type not in STARS or not leaf.parent:
3093 if p.type == syms.star_expr:
3094 # Star expressions are also used as assignment targets in extended
3095 # iterable unpacking (PEP 3132). See what its parent is instead.
3101 return p.type in within
3104 def is_multiline_string(leaf: Leaf) -> bool:
3105 """Return True if `leaf` is a multiline string that actually spans many lines."""
3106 value = leaf.value.lstrip("furbFURB")
3107 return value[:3] in {'"""', "'''"} and "\n" in value
3110 def is_stub_suite(node: Node) -> bool:
3111 """Return True if `node` is a suite with a stub body."""
3113 len(node.children) != 4
3114 or node.children[0].type != token.NEWLINE
3115 or node.children[1].type != token.INDENT
3116 or node.children[3].type != token.DEDENT
3120 return is_stub_body(node.children[2])
3123 def is_stub_body(node: LN) -> bool:
3124 """Return True if `node` is a simple statement containing an ellipsis."""
3125 if not isinstance(node, Node) or node.type != syms.simple_stmt:
3128 if len(node.children) != 2:
3131 child = node.children[0]
3133 child.type == syms.atom
3134 and len(child.children) == 3
3135 and all(leaf == Leaf(token.DOT, ".") for leaf in child.children)
3139 def max_delimiter_priority_in_atom(node: LN) -> Priority:
3140 """Return maximum delimiter priority inside `node`.
3142 This is specific to atoms with contents contained in a pair of parentheses.
3143 If `node` isn't an atom or there are no enclosing parentheses, returns 0.
3145 if node.type != syms.atom:
3148 first = node.children[0]
3149 last = node.children[-1]
3150 if not (first.type == token.LPAR and last.type == token.RPAR):
3153 bt = BracketTracker()
3154 for c in node.children[1:-1]:
3155 if isinstance(c, Leaf):
3158 for leaf in c.leaves():
3161 return bt.max_delimiter_priority()
3167 def ensure_visible(leaf: Leaf) -> None:
3168 """Make sure parentheses are visible.
3170 They could be invisible as part of some statements (see
3171 :func:`normalize_invisible_parens` and :func:`visit_import_from`).
3173 if leaf.type == token.LPAR:
3175 elif leaf.type == token.RPAR:
3179 def should_explode(line: Line, opening_bracket: Leaf) -> bool:
3180 """Should `line` immediately be split with `delimiter_split()` after RHS?"""
3183 opening_bracket.parent
3184 and opening_bracket.parent.type in {syms.atom, syms.import_from}
3185 and opening_bracket.value in "[{("
3190 last_leaf = line.leaves[-1]
3191 exclude = {id(last_leaf)} if last_leaf.type == token.COMMA else set()
3192 max_priority = line.bracket_tracker.max_delimiter_priority(exclude=exclude)
3193 except (IndexError, ValueError):
3196 return max_priority == COMMA_PRIORITY
3199 def get_features_used(node: Node) -> Set[Feature]:
3200 """Return a set of (relatively) new Python features used in this file.
3202 Currently looking for:
3204 - underscores in numeric literals; and
3205 - trailing commas after * or ** in function signatures and calls.
3207 features: Set[Feature] = set()
3208 for n in node.pre_order():
3209 if n.type == token.STRING:
3210 value_head = n.value[:2] # type: ignore
3211 if value_head in {'f"', 'F"', "f'", "F'", "rf", "fr", "RF", "FR"}:
3212 features.add(Feature.F_STRINGS)
3214 elif n.type == token.NUMBER:
3215 if "_" in n.value: # type: ignore
3216 features.add(Feature.NUMERIC_UNDERSCORES)
3218 elif n.type == token.COLONEQUAL:
3219 features.add(Feature.ASSIGNMENT_EXPRESSIONS)
3222 n.type in {syms.typedargslist, syms.arglist}
3224 and n.children[-1].type == token.COMMA
3226 if n.type == syms.typedargslist:
3227 feature = Feature.TRAILING_COMMA_IN_DEF
3229 feature = Feature.TRAILING_COMMA_IN_CALL
3231 for ch in n.children:
3232 if ch.type in STARS:
3233 features.add(feature)
3235 if ch.type == syms.argument:
3236 for argch in ch.children:
3237 if argch.type in STARS:
3238 features.add(feature)
3243 def detect_target_versions(node: Node) -> Set[TargetVersion]:
3244 """Detect the version to target based on the nodes used."""
3245 features = get_features_used(node)
3247 version for version in TargetVersion if features <= VERSION_TO_FEATURES[version]
3251 def generate_trailers_to_omit(line: Line, line_length: int) -> Iterator[Set[LeafID]]:
3252 """Generate sets of closing bracket IDs that should be omitted in a RHS.
3254 Brackets can be omitted if the entire trailer up to and including
3255 a preceding closing bracket fits in one line.
3257 Yielded sets are cumulative (contain results of previous yields, too). First
3261 omit: Set[LeafID] = set()
3264 length = 4 * line.depth
3265 opening_bracket = None
3266 closing_bracket = None
3267 inner_brackets: Set[LeafID] = set()
3268 for index, leaf, leaf_length in enumerate_with_length(line, reversed=True):
3269 length += leaf_length
3270 if length > line_length:
3273 has_inline_comment = leaf_length > len(leaf.value) + len(leaf.prefix)
3274 if leaf.type == STANDALONE_COMMENT or has_inline_comment:
3278 if leaf is opening_bracket:
3279 opening_bracket = None
3280 elif leaf.type in CLOSING_BRACKETS:
3281 inner_brackets.add(id(leaf))
3282 elif leaf.type in CLOSING_BRACKETS:
3283 if index > 0 and line.leaves[index - 1].type in OPENING_BRACKETS:
3284 # Empty brackets would fail a split so treat them as "inner"
3285 # brackets (e.g. only add them to the `omit` set if another
3286 # pair of brackets was good enough.
3287 inner_brackets.add(id(leaf))
3291 omit.add(id(closing_bracket))
3292 omit.update(inner_brackets)
3293 inner_brackets.clear()
3297 opening_bracket = leaf.opening_bracket
3298 closing_bracket = leaf
3301 def get_future_imports(node: Node) -> Set[str]:
3302 """Return a set of __future__ imports in the file."""
3303 imports: Set[str] = set()
3305 def get_imports_from_children(children: List[LN]) -> Generator[str, None, None]:
3306 for child in children:
3307 if isinstance(child, Leaf):
3308 if child.type == token.NAME:
3310 elif child.type == syms.import_as_name:
3311 orig_name = child.children[0]
3312 assert isinstance(orig_name, Leaf), "Invalid syntax parsing imports"
3313 assert orig_name.type == token.NAME, "Invalid syntax parsing imports"
3314 yield orig_name.value
3315 elif child.type == syms.import_as_names:
3316 yield from get_imports_from_children(child.children)
3318 raise AssertionError("Invalid syntax parsing imports")
3320 for child in node.children:
3321 if child.type != syms.simple_stmt:
3323 first_child = child.children[0]
3324 if isinstance(first_child, Leaf):
3325 # Continue looking if we see a docstring; otherwise stop.
3327 len(child.children) == 2
3328 and first_child.type == token.STRING
3329 and child.children[1].type == token.NEWLINE
3334 elif first_child.type == syms.import_from:
3335 module_name = first_child.children[1]
3336 if not isinstance(module_name, Leaf) or module_name.value != "__future__":
3338 imports |= set(get_imports_from_children(first_child.children[3:]))
3344 def gen_python_files_in_dir(
3347 include: Pattern[str],
3348 exclude: Pattern[str],
3350 ) -> Iterator[Path]:
3351 """Generate all files under `path` whose paths are not excluded by the
3352 `exclude` regex, but are included by the `include` regex.
3354 Symbolic links pointing outside of the `root` directory are ignored.
3356 `report` is where output about exclusions goes.
3358 assert root.is_absolute(), f"INTERNAL ERROR: `root` must be absolute but is {root}"
3359 for child in path.iterdir():
3361 normalized_path = "/" + child.resolve().relative_to(root).as_posix()
3363 if child.is_symlink():
3364 report.path_ignored(
3365 child, f"is a symbolic link that points outside {root}"
3372 normalized_path += "/"
3373 exclude_match = exclude.search(normalized_path)
3374 if exclude_match and exclude_match.group(0):
3375 report.path_ignored(child, f"matches the --exclude regular expression")
3379 yield from gen_python_files_in_dir(child, root, include, exclude, report)
3381 elif child.is_file():
3382 include_match = include.search(normalized_path)
3388 def find_project_root(srcs: Iterable[str]) -> Path:
3389 """Return a directory containing .git, .hg, or pyproject.toml.
3391 That directory can be one of the directories passed in `srcs` or their
3394 If no directory in the tree contains a marker that would specify it's the
3395 project root, the root of the file system is returned.
3398 return Path("/").resolve()
3400 common_base = min(Path(src).resolve() for src in srcs)
3401 if common_base.is_dir():
3402 # Append a fake file so `parents` below returns `common_base_dir`, too.
3403 common_base /= "fake-file"
3404 for directory in common_base.parents:
3405 if (directory / ".git").is_dir():
3408 if (directory / ".hg").is_dir():
3411 if (directory / "pyproject.toml").is_file():
3419 """Provides a reformatting counter. Can be rendered with `str(report)`."""
3423 verbose: bool = False
3424 change_count: int = 0
3426 failure_count: int = 0
3428 def done(self, src: Path, changed: Changed) -> None:
3429 """Increment the counter for successful reformatting. Write out a message."""
3430 if changed is Changed.YES:
3431 reformatted = "would reformat" if self.check else "reformatted"
3432 if self.verbose or not self.quiet:
3433 out(f"{reformatted} {src}")
3434 self.change_count += 1
3437 if changed is Changed.NO:
3438 msg = f"{src} already well formatted, good job."
3440 msg = f"{src} wasn't modified on disk since last run."
3441 out(msg, bold=False)
3442 self.same_count += 1
3444 def failed(self, src: Path, message: str) -> None:
3445 """Increment the counter for failed reformatting. Write out a message."""
3446 err(f"error: cannot format {src}: {message}")
3447 self.failure_count += 1
3449 def path_ignored(self, path: Path, message: str) -> None:
3451 out(f"{path} ignored: {message}", bold=False)
3454 def return_code(self) -> int:
3455 """Return the exit code that the app should use.
3457 This considers the current state of changed files and failures:
3458 - if there were any failures, return 123;
3459 - if any files were changed and --check is being used, return 1;
3460 - otherwise return 0.
3462 # According to http://tldp.org/LDP/abs/html/exitcodes.html starting with
3463 # 126 we have special return codes reserved by the shell.
3464 if self.failure_count:
3467 elif self.change_count and self.check:
3472 def __str__(self) -> str:
3473 """Render a color report of the current state.
3475 Use `click.unstyle` to remove colors.
3478 reformatted = "would be reformatted"
3479 unchanged = "would be left unchanged"
3480 failed = "would fail to reformat"
3482 reformatted = "reformatted"
3483 unchanged = "left unchanged"
3484 failed = "failed to reformat"
3486 if self.change_count:
3487 s = "s" if self.change_count > 1 else ""
3489 click.style(f"{self.change_count} file{s} {reformatted}", bold=True)
3492 s = "s" if self.same_count > 1 else ""
3493 report.append(f"{self.same_count} file{s} {unchanged}")
3494 if self.failure_count:
3495 s = "s" if self.failure_count > 1 else ""
3497 click.style(f"{self.failure_count} file{s} {failed}", fg="red")
3499 return ", ".join(report) + "."
3502 def parse_ast(src: str) -> Union[ast.AST, ast3.AST, ast27.AST]:
3503 filename = "<unknown>"
3504 if sys.version_info >= (3, 8):
3505 # TODO: support Python 4+ ;)
3506 for minor_version in range(sys.version_info[1], 4, -1):
3508 return ast.parse(src, filename, feature_version=(3, minor_version))
3512 for feature_version in (7, 6):
3514 return ast3.parse(src, filename, feature_version=feature_version)
3518 return ast27.parse(src)
3521 def _fixup_ast_constants(
3522 node: Union[ast.AST, ast3.AST, ast27.AST]
3523 ) -> Union[ast.AST, ast3.AST, ast27.AST]:
3524 """Map ast nodes deprecated in 3.8 to Constant."""
3525 # casts are required until this is released:
3526 # https://github.com/python/typeshed/pull/3142
3527 if isinstance(node, (ast.Str, ast3.Str, ast27.Str, ast.Bytes, ast3.Bytes)):
3528 return cast(ast.AST, ast.Constant(value=node.s))
3529 elif isinstance(node, (ast.Num, ast3.Num, ast27.Num)):
3530 return cast(ast.AST, ast.Constant(value=node.n))
3531 elif isinstance(node, (ast.NameConstant, ast3.NameConstant)):
3532 return cast(ast.AST, ast.Constant(value=node.value))
3536 def assert_equivalent(src: str, dst: str) -> None:
3537 """Raise AssertionError if `src` and `dst` aren't equivalent."""
3539 def _v(node: Union[ast.AST, ast3.AST, ast27.AST], depth: int = 0) -> Iterator[str]:
3540 """Simple visitor generating strings to compare ASTs by content."""
3542 node = _fixup_ast_constants(node)
3544 yield f"{' ' * depth}{node.__class__.__name__}("
3546 for field in sorted(node._fields):
3547 # TypeIgnore has only one field 'lineno' which breaks this comparison
3548 type_ignore_classes = (ast3.TypeIgnore, ast27.TypeIgnore)
3549 if sys.version_info >= (3, 8):
3550 type_ignore_classes += (ast.TypeIgnore,)
3551 if isinstance(node, type_ignore_classes):
3555 value = getattr(node, field)
3556 except AttributeError:
3559 yield f"{' ' * (depth+1)}{field}="
3561 if isinstance(value, list):
3563 # Ignore nested tuples within del statements, because we may insert
3564 # parentheses and they change the AST.
3567 and isinstance(node, (ast.Delete, ast3.Delete, ast27.Delete))
3568 and isinstance(item, (ast.Tuple, ast3.Tuple, ast27.Tuple))
3570 for item in item.elts:
3571 yield from _v(item, depth + 2)
3572 elif isinstance(item, (ast.AST, ast3.AST, ast27.AST)):
3573 yield from _v(item, depth + 2)
3575 elif isinstance(value, (ast.AST, ast3.AST, ast27.AST)):
3576 yield from _v(value, depth + 2)
3579 yield f"{' ' * (depth+2)}{value!r}, # {value.__class__.__name__}"
3581 yield f"{' ' * depth}) # /{node.__class__.__name__}"
3584 src_ast = parse_ast(src)
3585 except Exception as exc:
3586 raise AssertionError(
3587 f"cannot use --safe with this file; failed to parse source file. "
3588 f"AST error message: {exc}"
3592 dst_ast = parse_ast(dst)
3593 except Exception as exc:
3594 log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
3595 raise AssertionError(
3596 f"INTERNAL ERROR: Black produced invalid code: {exc}. "
3597 f"Please report a bug on https://github.com/psf/black/issues. "
3598 f"This invalid output might be helpful: {log}"
3601 src_ast_str = "\n".join(_v(src_ast))
3602 dst_ast_str = "\n".join(_v(dst_ast))
3603 if src_ast_str != dst_ast_str:
3604 log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst"))
3605 raise AssertionError(
3606 f"INTERNAL ERROR: Black produced code that is not equivalent to "
3608 f"Please report a bug on https://github.com/psf/black/issues. "
3609 f"This diff might be helpful: {log}"
3613 def assert_stable(src: str, dst: str, mode: FileMode) -> None:
3614 """Raise AssertionError if `dst` reformats differently the second time."""
3615 newdst = format_str(dst, mode=mode)
3618 diff(src, dst, "source", "first pass"),
3619 diff(dst, newdst, "first pass", "second pass"),
3621 raise AssertionError(
3622 f"INTERNAL ERROR: Black produced different code on the second pass "
3623 f"of the formatter. "
3624 f"Please report a bug on https://github.com/psf/black/issues. "
3625 f"This diff might be helpful: {log}"
3629 def dump_to_file(*output: str) -> str:
3630 """Dump `output` to a temporary file. Return path to the file."""
3631 with tempfile.NamedTemporaryFile(
3632 mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
3634 for lines in output:
3636 if lines and lines[-1] != "\n":
3642 def nullcontext() -> Iterator[None]:
3643 """Return context manager that does nothing.
3644 Similar to `nullcontext` from python 3.7"""
3648 def diff(a: str, b: str, a_name: str, b_name: str) -> str:
3649 """Return a unified diff string between strings `a` and `b`."""
3652 a_lines = [line + "\n" for line in a.split("\n")]
3653 b_lines = [line + "\n" for line in b.split("\n")]
3655 difflib.unified_diff(a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5)
3659 def cancel(tasks: Iterable[asyncio.Task]) -> None:
3660 """asyncio signal handler that cancels all `tasks` and reports to stderr."""
3666 def shutdown(loop: asyncio.AbstractEventLoop) -> None:
3667 """Cancel all pending tasks on `loop`, wait for them, and close the loop."""
3669 if sys.version_info[:2] >= (3, 7):
3670 all_tasks = asyncio.all_tasks
3672 all_tasks = asyncio.Task.all_tasks
3673 # This part is borrowed from asyncio/runners.py in Python 3.7b2.
3674 to_cancel = [task for task in all_tasks(loop) if not task.done()]
3678 for task in to_cancel:
3680 loop.run_until_complete(
3681 asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
3684 # `concurrent.futures.Future` objects cannot be cancelled once they
3685 # are already running. There might be some when the `shutdown()` happened.
3686 # Silence their logger's spew about the event loop being closed.
3687 cf_logger = logging.getLogger("concurrent.futures")
3688 cf_logger.setLevel(logging.CRITICAL)
3692 def sub_twice(regex: Pattern[str], replacement: str, original: str) -> str:
3693 """Replace `regex` with `replacement` twice on `original`.
3695 This is used by string normalization to perform replaces on
3696 overlapping matches.
3698 return regex.sub(replacement, regex.sub(replacement, original))
3701 def re_compile_maybe_verbose(regex: str) -> Pattern[str]:
3702 """Compile a regular expression string in `regex`.
3704 If it contains newlines, use verbose mode.
3707 regex = "(?x)" + regex
3708 return re.compile(regex)
3711 def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
3712 """Like `reversed(enumerate(sequence))` if that were possible."""
3713 index = len(sequence) - 1
3714 for element in reversed(sequence):
3715 yield (index, element)
3719 def enumerate_with_length(
3720 line: Line, reversed: bool = False
3721 ) -> Iterator[Tuple[Index, Leaf, int]]:
3722 """Return an enumeration of leaves with their length.
3724 Stops prematurely on multiline strings and standalone comments.
3727 Callable[[Sequence[Leaf]], Iterator[Tuple[Index, Leaf]]],
3728 enumerate_reversed if reversed else enumerate,
3730 for index, leaf in op(line.leaves):
3731 length = len(leaf.prefix) + len(leaf.value)
3732 if "\n" in leaf.value:
3733 return # Multiline strings, we can't continue.
3735 for comment in line.comments_after(leaf):
3736 length += len(comment.value)
3738 yield index, leaf, length
3741 def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
3742 """Return True if `line` is no longer than `line_length`.
3744 Uses the provided `line_str` rendering, if any, otherwise computes a new one.
3747 line_str = str(line).strip("\n")
3749 len(line_str) <= line_length
3750 and "\n" not in line_str # multiline strings
3751 and not line.contains_standalone_comments()
3755 def can_be_split(line: Line) -> bool:
3756 """Return False if the line cannot be split *for sure*.
3758 This is not an exhaustive search but a cheap heuristic that we can use to
3759 avoid some unfortunate formattings (mostly around wrapping unsplittable code
3760 in unnecessary parentheses).
3762 leaves = line.leaves
3766 if leaves[0].type == token.STRING and leaves[1].type == token.DOT:
3770 for leaf in leaves[-2::-1]:
3771 if leaf.type in OPENING_BRACKETS:
3772 if next.type not in CLOSING_BRACKETS:
3776 elif leaf.type == token.DOT:
3778 elif leaf.type == token.NAME:
3779 if not (next.type == token.DOT or next.type in OPENING_BRACKETS):
3782 elif leaf.type not in CLOSING_BRACKETS:
3785 if dot_count > 1 and call_count > 1:
3791 def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
3792 """Does `line` have a shape safe to reformat without optional parens around it?
3794 Returns True for only a subset of potentially nice looking formattings but
3795 the point is to not return false positives that end up producing lines that
3798 bt = line.bracket_tracker
3799 if not bt.delimiters:
3800 # Without delimiters the optional parentheses are useless.
3803 max_priority = bt.max_delimiter_priority()
3804 if bt.delimiter_count_with_priority(max_priority) > 1:
3805 # With more than one delimiter of a kind the optional parentheses read better.
3808 if max_priority == DOT_PRIORITY:
3809 # A single stranded method call doesn't require optional parentheses.
3812 assert len(line.leaves) >= 2, "Stranded delimiter"
3814 first = line.leaves[0]
3815 second = line.leaves[1]
3816 penultimate = line.leaves[-2]
3817 last = line.leaves[-1]
3819 # With a single delimiter, omit if the expression starts or ends with
3821 if first.type in OPENING_BRACKETS and second.type not in CLOSING_BRACKETS:
3823 length = 4 * line.depth
3824 for _index, leaf, leaf_length in enumerate_with_length(line):
3825 if leaf.type in CLOSING_BRACKETS and leaf.opening_bracket is first:
3828 length += leaf_length
3829 if length > line_length:
3832 if leaf.type in OPENING_BRACKETS:
3833 # There are brackets we can further split on.
3837 # checked the entire string and line length wasn't exceeded
3838 if len(line.leaves) == _index + 1:
3841 # Note: we are not returning False here because a line might have *both*
3842 # a leading opening bracket and a trailing closing bracket. If the
3843 # opening bracket doesn't match our rule, maybe the closing will.
3846 last.type == token.RPAR
3847 or last.type == token.RBRACE
3849 # don't use indexing for omitting optional parentheses;
3851 last.type == token.RSQB
3853 and last.parent.type != syms.trailer
3856 if penultimate.type in OPENING_BRACKETS:
3857 # Empty brackets don't help.
3860 if is_multiline_string(first):
3861 # Additional wrapping of a multiline string in this situation is
3865 length = 4 * line.depth
3866 seen_other_brackets = False
3867 for _index, leaf, leaf_length in enumerate_with_length(line):
3868 length += leaf_length
3869 if leaf is last.opening_bracket:
3870 if seen_other_brackets or length <= line_length:
3873 elif leaf.type in OPENING_BRACKETS:
3874 # There are brackets we can further split on.
3875 seen_other_brackets = True
3880 def get_cache_file(mode: FileMode) -> Path:
3881 return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
3884 def read_cache(mode: FileMode) -> Cache:
3885 """Read the cache if it exists and is well formed.
3887 If it is not well formed, the call to write_cache later should resolve the issue.
3889 cache_file = get_cache_file(mode)
3890 if not cache_file.exists():
3893 with cache_file.open("rb") as fobj:
3895 cache: Cache = pickle.load(fobj)
3896 except pickle.UnpicklingError:
3902 def get_cache_info(path: Path) -> CacheInfo:
3903 """Return the information used to check if a file is already formatted or not."""
3905 return stat.st_mtime, stat.st_size
3908 def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
3909 """Split an iterable of paths in `sources` into two sets.
3911 The first contains paths of files that modified on disk or are not in the
3912 cache. The other contains paths to non-modified files.
3914 todo, done = set(), set()
3917 if cache.get(src) != get_cache_info(src):
3924 def write_cache(cache: Cache, sources: Iterable[Path], mode: FileMode) -> None:
3925 """Update the cache file."""
3926 cache_file = get_cache_file(mode)
3928 CACHE_DIR.mkdir(parents=True, exist_ok=True)
3929 new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
3930 with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
3931 pickle.dump(new_cache, f, protocol=pickle.HIGHEST_PROTOCOL)
3932 os.replace(f.name, cache_file)
3937 def patch_click() -> None:
3938 """Make Click not crash.
3940 On certain misconfigured environments, Python 3 selects the ASCII encoding as the
3941 default which restricts paths that it can access during the lifetime of the
3942 application. Click refuses to work in this scenario by raising a RuntimeError.
3944 In case of Black the likelihood that non-ASCII characters are going to be used in
3945 file paths is minimal since it's Python source code. Moreover, this crash was
3946 spurious on Python 3.7 thanks to PEP 538 and PEP 540.
3949 from click import core
3950 from click import _unicodefun # type: ignore
3951 except ModuleNotFoundError:
3954 for module in (core, _unicodefun):
3955 if hasattr(module, "_verify_python3_env"):
3956 module._verify_python3_env = lambda: None
3959 def patched_main() -> None:
3965 if __name__ == "__main__":