All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
3 from abc import ABC, abstractmethod
4 from collections import defaultdict
5 from concurrent.futures import Executor, ThreadPoolExecutor, ProcessPoolExecutor
6 from contextlib import contextmanager
7 from datetime import datetime
9 from functools import lru_cache, partial, wraps
13 from multiprocessing import Manager, freeze_support
15 from pathlib import Path
45 from typing_extensions import Final
46 from mypy_extensions import mypyc_attr
48 from appdirs import user_cache_dir
49 from dataclasses import dataclass, field, replace
52 from typed_ast import ast3, ast27
53 from pathspec import PathSpec
56 from blib2to3.pytree import Node, Leaf, type_repr
57 from blib2to3 import pygram, pytree
58 from blib2to3.pgen2 import driver, token
59 from blib2to3.pgen2.grammar import Grammar
60 from blib2to3.pgen2.parse import ParseError
62 from _black_version import version as __version__
65 import colorama # noqa: F401
67 DEFAULT_LINE_LENGTH = 88
68 DEFAULT_EXCLUDES = r"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|\.svn|_build|buck-out|build|dist)/" # noqa: B950
69 DEFAULT_INCLUDES = r"\.pyi?$"
70 CACHE_DIR = Path(user_cache_dir("black", version=__version__))
72 STRING_PREFIX_CHARS: Final = "furbFURB" # All possible string prefix characters.
86 LN = Union[Leaf, Node]
87 Transformer = Callable[["Line", Collection["Feature"]], Iterator["Line"]]
90 CacheInfo = Tuple[Timestamp, FileSize]
91 Cache = Dict[Path, CacheInfo]
92 out = partial(click.secho, bold=True, err=True)
93 err = partial(click.secho, fg="red", err=True)
95 pygram.initialize(CACHE_DIR)
96 syms = pygram.python_symbols
99 class NothingChanged(UserWarning):
100 """Raised when reformatted code is the same as source."""
103 class CannotTransform(Exception):
104 """Base class for errors raised by Transformers."""
107 class CannotSplit(CannotTransform):
108 """A readable split that fits the allotted line length is impossible."""
111 class InvalidInput(ValueError):
112 """Raised when input source code fails all parse attempts."""
116 E = TypeVar("E", bound=Exception)
119 class Ok(Generic[T]):
120 def __init__(self, value: T) -> None:
127 class Err(Generic[E]):
128 def __init__(self, e: E) -> None:
135 # The 'Result' return type is used to implement an error-handling model heavily
136 # influenced by that used by the Rust programming language
137 # (see https://doc.rust-lang.org/book/ch09-00-error-handling.html).
138 Result = Union[Ok[T], Err[E]]
139 TResult = Result[T, CannotTransform] # (T)ransform Result
140 TMatchResult = TResult[Index]
143 class WriteBack(Enum):
151 def from_configuration(
152 cls, *, check: bool, diff: bool, color: bool = False
154 if check and not diff:
158 return cls.COLOR_DIFF
160 return cls.DIFF if diff else cls.YES
169 class TargetVersion(Enum):
178 def is_python2(self) -> bool:
179 return self is TargetVersion.PY27
182 PY36_VERSIONS = {TargetVersion.PY36, TargetVersion.PY37, TargetVersion.PY38}
186 # All string literals are unicode
189 NUMERIC_UNDERSCORES = 3
190 TRAILING_COMMA_IN_CALL = 4
191 TRAILING_COMMA_IN_DEF = 5
192 # The following two feature-flags are mutually exclusive, and exactly one should be
193 # set for every version of python.
194 ASYNC_IDENTIFIERS = 6
196 ASSIGNMENT_EXPRESSIONS = 8
197 POS_ONLY_ARGUMENTS = 9
200 VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
201 TargetVersion.PY27: {Feature.ASYNC_IDENTIFIERS},
202 TargetVersion.PY33: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
203 TargetVersion.PY34: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
204 TargetVersion.PY35: {
205 Feature.UNICODE_LITERALS,
206 Feature.TRAILING_COMMA_IN_CALL,
207 Feature.ASYNC_IDENTIFIERS,
209 TargetVersion.PY36: {
210 Feature.UNICODE_LITERALS,
212 Feature.NUMERIC_UNDERSCORES,
213 Feature.TRAILING_COMMA_IN_CALL,
214 Feature.TRAILING_COMMA_IN_DEF,
215 Feature.ASYNC_IDENTIFIERS,
217 TargetVersion.PY37: {
218 Feature.UNICODE_LITERALS,
220 Feature.NUMERIC_UNDERSCORES,
221 Feature.TRAILING_COMMA_IN_CALL,
222 Feature.TRAILING_COMMA_IN_DEF,
223 Feature.ASYNC_KEYWORDS,
225 TargetVersion.PY38: {
226 Feature.UNICODE_LITERALS,
228 Feature.NUMERIC_UNDERSCORES,
229 Feature.TRAILING_COMMA_IN_CALL,
230 Feature.TRAILING_COMMA_IN_DEF,
231 Feature.ASYNC_KEYWORDS,
232 Feature.ASSIGNMENT_EXPRESSIONS,
233 Feature.POS_ONLY_ARGUMENTS,
240 target_versions: Set[TargetVersion] = field(default_factory=set)
241 line_length: int = DEFAULT_LINE_LENGTH
242 string_normalization: bool = True
245 def get_cache_key(self) -> str:
246 if self.target_versions:
247 version_str = ",".join(
249 for version in sorted(self.target_versions, key=lambda v: v.value)
255 str(self.line_length),
256 str(int(self.string_normalization)),
257 str(int(self.is_pyi)),
259 return ".".join(parts)
262 # Legacy name, left for integrations.
266 def supports_feature(target_versions: Set[TargetVersion], feature: Feature) -> bool:
267 return all(feature in VERSION_TO_FEATURES[version] for version in target_versions)
270 def find_pyproject_toml(path_search_start: str) -> Optional[str]:
271 """Find the absolute filepath to a pyproject.toml if it exists"""
272 path_project_root = find_project_root(path_search_start)
273 path_pyproject_toml = path_project_root / "pyproject.toml"
274 return str(path_pyproject_toml) if path_pyproject_toml.is_file() else None
277 def parse_pyproject_toml(path_config: str) -> Dict[str, Any]:
278 """Parse a pyproject toml file, pulling out relevant parts for Black
280 If parsing fails, will raise a toml.TomlDecodeError
282 pyproject_toml = toml.load(path_config)
283 config = pyproject_toml.get("tool", {}).get("black", {})
284 return {k.replace("--", "").replace("-", "_"): v for k, v in config.items()}
287 def read_pyproject_toml(
288 ctx: click.Context, param: click.Parameter, value: Optional[str]
290 """Inject Black configuration from "pyproject.toml" into defaults in `ctx`.
292 Returns the path to a successfully found and read configuration file, None
296 value = find_pyproject_toml(ctx.params.get("src", ()))
301 config = parse_pyproject_toml(value)
302 except (toml.TomlDecodeError, OSError) as e:
303 raise click.FileError(
304 filename=value, hint=f"Error reading configuration file: {e}"
310 target_version = config.get("target_version")
311 if target_version is not None and not isinstance(target_version, list):
312 raise click.BadOptionUsage(
313 "target-version", "Config key target-version must be a list"
316 default_map: Dict[str, Any] = {}
318 default_map.update(ctx.default_map)
319 default_map.update(config)
321 ctx.default_map = default_map
325 def target_version_option_callback(
326 c: click.Context, p: Union[click.Option, click.Parameter], v: Tuple[str, ...]
327 ) -> List[TargetVersion]:
328 """Compute the target versions from a --target-version flag.
330 This is its own function because mypy couldn't infer the type correctly
331 when it was a lambda, causing mypyc trouble.
333 return [TargetVersion[val.upper()] for val in v]
336 @click.command(context_settings=dict(help_option_names=["-h", "--help"]))
337 @click.option("-c", "--code", type=str, help="Format the code passed in as a string.")
342 default=DEFAULT_LINE_LENGTH,
343 help="How many characters per line to allow.",
349 type=click.Choice([v.name.lower() for v in TargetVersion]),
350 callback=target_version_option_callback,
353 "Python versions that should be supported by Black's output. [default: per-file"
361 "Format all input files like typing stubs regardless of file extension (useful"
362 " when piping source on standard input)."
367 "--skip-string-normalization",
369 help="Don't normalize string quotes or prefixes.",
375 "Don't write the files back, just return the status. Return code 0 means"
376 " nothing would change. Return code 1 means some files would be reformatted."
377 " Return code 123 means there was an internal error."
383 help="Don't write the files back, just output a diff for each file on stdout.",
386 "--color/--no-color",
388 help="Show colored diff. Only applies when `--diff` is given.",
393 help="If --fast given, skip temporary sanity checks. [default: --safe]",
398 default=DEFAULT_INCLUDES,
400 "A regular expression that matches files and directories that should be"
401 " included on recursive searches. An empty value means all files are included"
402 " regardless of the name. Use forward slashes for directories on all platforms"
403 " (Windows, too). Exclusions are calculated first, inclusions later."
410 default=DEFAULT_EXCLUDES,
412 "A regular expression that matches files and directories that should be"
413 " excluded on recursive searches. An empty value means no paths are excluded."
414 " Use forward slashes for directories on all platforms (Windows, too). "
415 " Exclusions are calculated first, inclusions later."
423 "Like --exclude, but files and directories matching this regex will be "
424 "excluded even when they are passed explicitly as arguments"
432 "Don't emit non-error messages to stderr. Errors are still emitted; silence"
433 " those with 2>/dev/null."
441 "Also emit messages to stderr about files that were not changed or were ignored"
442 " due to --exclude=."
445 @click.version_option(version=__version__)
450 exists=True, file_okay=True, dir_okay=True, readable=True, allow_dash=True
465 callback=read_pyproject_toml,
466 help="Read configuration from PATH.",
473 target_version: List[TargetVersion],
479 skip_string_normalization: bool,
484 force_exclude: Optional[str],
485 src: Tuple[str, ...],
486 config: Optional[str],
488 """The uncompromising code formatter."""
489 write_back = WriteBack.from_configuration(check=check, diff=diff, color=color)
491 versions = set(target_version)
493 # We'll autodetect later.
496 target_versions=versions,
497 line_length=line_length,
499 string_normalization=not skip_string_normalization,
501 if config and verbose:
502 out(f"Using configuration from {config}.", bold=False, fg="blue")
504 print(format_str(code, mode=mode))
506 report = Report(check=check, diff=diff, quiet=quiet, verbose=verbose)
507 sources = get_sources(
514 force_exclude=force_exclude,
520 "No Python files are present to be formatted. Nothing to do 😴",
526 if len(sources) == 1:
530 write_back=write_back,
536 sources=sources, fast=fast, write_back=write_back, mode=mode, report=report
539 if verbose or not quiet:
540 out("Oh no! 💥 💔 💥" if report.return_code else "All done! ✨ 🍰 ✨")
541 click.secho(str(report), err=True)
542 ctx.exit(report.return_code)
548 src: Tuple[str, ...],
553 force_exclude: Optional[str],
556 """Compute the set of files to be formatted."""
558 include_regex = re_compile_maybe_verbose(include)
560 err(f"Invalid regular expression for include given: {include!r}")
563 exclude_regex = re_compile_maybe_verbose(exclude)
565 err(f"Invalid regular expression for exclude given: {exclude!r}")
568 force_exclude_regex = (
569 re_compile_maybe_verbose(force_exclude) if force_exclude else None
572 err(f"Invalid regular expression for force_exclude given: {force_exclude!r}")
575 root = find_project_root(src)
576 sources: Set[Path] = set()
577 path_empty(src, "No Path provided. Nothing to do 😴", quiet, verbose, ctx)
578 exclude_regexes = [exclude_regex]
579 if force_exclude_regex is not None:
580 exclude_regexes.append(force_exclude_regex)
600 [p], root, None, exclude_regexes, report, get_gitignore(root)
604 err(f"invalid path: {s}")
609 src: Sized, msg: str, quiet: bool, verbose: bool, ctx: click.Context
612 Exit if there is no `src` provided for formatting
615 if verbose or not quiet:
621 src: Path, fast: bool, write_back: WriteBack, mode: Mode, report: "Report"
623 """Reformat a single file under `src` without spawning child processes.
625 `fast`, `write_back`, and `mode` options are passed to
626 :func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
630 if not src.is_file() and str(src) == "-":
631 if format_stdin_to_stdout(fast=fast, write_back=write_back, mode=mode):
632 changed = Changed.YES
635 if write_back != WriteBack.DIFF:
636 cache = read_cache(mode)
637 res_src = src.resolve()
638 if res_src in cache and cache[res_src] == get_cache_info(res_src):
639 changed = Changed.CACHED
640 if changed is not Changed.CACHED and format_file_in_place(
641 src, fast=fast, write_back=write_back, mode=mode
643 changed = Changed.YES
644 if (write_back is WriteBack.YES and changed is not Changed.CACHED) or (
645 write_back is WriteBack.CHECK and changed is Changed.NO
647 write_cache(cache, [src], mode)
648 report.done(src, changed)
649 except Exception as exc:
650 report.failed(src, str(exc))
654 sources: Set[Path], fast: bool, write_back: WriteBack, mode: Mode, report: "Report"
656 """Reformat multiple files using a ProcessPoolExecutor."""
658 loop = asyncio.get_event_loop()
659 worker_count = os.cpu_count()
660 if sys.platform == "win32":
661 # Work around https://bugs.python.org/issue26903
662 worker_count = min(worker_count, 61)
664 executor = ProcessPoolExecutor(max_workers=worker_count)
665 except (ImportError, OSError):
666 # we arrive here if the underlying system does not support multi-processing
667 # like in AWS Lambda or Termux, in which case we gracefully fallback to
668 # a ThreadPollExecutor with just a single worker (more workers would not do us
669 # any good due to the Global Interpreter Lock)
670 executor = ThreadPoolExecutor(max_workers=1)
673 loop.run_until_complete(
677 write_back=write_back,
686 if executor is not None:
690 async def schedule_formatting(
693 write_back: WriteBack,
696 loop: asyncio.AbstractEventLoop,
699 """Run formatting of `sources` in parallel using the provided `executor`.
701 (Use ProcessPoolExecutors for actual parallelism.)
703 `write_back`, `fast`, and `mode` options are passed to
704 :func:`format_file_in_place`.
707 if write_back != WriteBack.DIFF:
708 cache = read_cache(mode)
709 sources, cached = filter_cached(cache, sources)
710 for src in sorted(cached):
711 report.done(src, Changed.CACHED)
716 sources_to_cache = []
718 if write_back == WriteBack.DIFF:
719 # For diff output, we need locks to ensure we don't interleave output
720 # from different processes.
722 lock = manager.Lock()
724 asyncio.ensure_future(
725 loop.run_in_executor(
726 executor, format_file_in_place, src, fast, mode, write_back, lock
729 for src in sorted(sources)
731 pending: Iterable["asyncio.Future[bool]"] = tasks.keys()
733 loop.add_signal_handler(signal.SIGINT, cancel, pending)
734 loop.add_signal_handler(signal.SIGTERM, cancel, pending)
735 except NotImplementedError:
736 # There are no good alternatives for these on Windows.
739 done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
741 src = tasks.pop(task)
743 cancelled.append(task)
744 elif task.exception():
745 report.failed(src, str(task.exception()))
747 changed = Changed.YES if task.result() else Changed.NO
748 # If the file was written back or was successfully checked as
749 # well-formatted, store this information in the cache.
750 if write_back is WriteBack.YES or (
751 write_back is WriteBack.CHECK and changed is Changed.NO
753 sources_to_cache.append(src)
754 report.done(src, changed)
756 await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
758 write_cache(cache, sources_to_cache, mode)
761 def format_file_in_place(
765 write_back: WriteBack = WriteBack.NO,
766 lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
768 """Format file under `src` path. Return True if changed.
770 If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted
772 `mode` and `fast` options are passed to :func:`format_file_contents`.
774 if src.suffix == ".pyi":
775 mode = replace(mode, is_pyi=True)
777 then = datetime.utcfromtimestamp(src.stat().st_mtime)
778 with open(src, "rb") as buf:
779 src_contents, encoding, newline = decode_bytes(buf.read())
781 dst_contents = format_file_contents(src_contents, fast=fast, mode=mode)
782 except NothingChanged:
785 if write_back == WriteBack.YES:
786 with open(src, "w", encoding=encoding, newline=newline) as f:
787 f.write(dst_contents)
788 elif write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF):
789 now = datetime.utcnow()
790 src_name = f"{src}\t{then} +0000"
791 dst_name = f"{src}\t{now} +0000"
792 diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
794 if write_back == write_back.COLOR_DIFF:
795 diff_contents = color_diff(diff_contents)
797 with lock or nullcontext():
798 f = io.TextIOWrapper(
804 f = wrap_stream_for_windows(f)
805 f.write(diff_contents)
811 def color_diff(contents: str) -> str:
812 """Inject the ANSI color codes to the diff."""
813 lines = contents.split("\n")
814 for i, line in enumerate(lines):
815 if line.startswith("+++") or line.startswith("---"):
816 line = "\033[1;37m" + line + "\033[0m" # bold white, reset
817 if line.startswith("@@"):
818 line = "\033[36m" + line + "\033[0m" # cyan, reset
819 if line.startswith("+"):
820 line = "\033[32m" + line + "\033[0m" # green, reset
821 elif line.startswith("-"):
822 line = "\033[31m" + line + "\033[0m" # red, reset
824 return "\n".join(lines)
827 def wrap_stream_for_windows(
829 ) -> Union[io.TextIOWrapper, "colorama.AnsiToWin32.AnsiToWin32"]:
831 Wrap the stream in colorama's wrap_stream so colors are shown on Windows.
833 If `colorama` is not found, then no change is made. If `colorama` does
834 exist, then it handles the logic to determine whether or not to change
838 from colorama import initialise
840 # We set `strip=False` so that we can don't have to modify
841 # test_express_diff_with_color.
842 f = initialise.wrap_stream(
843 f, convert=None, strip=False, autoreset=False, wrap=True
846 # wrap_stream returns a `colorama.AnsiToWin32.AnsiToWin32` object
847 # which does not have a `detach()` method. So we fake one.
848 f.detach = lambda *args, **kwargs: None # type: ignore
855 def format_stdin_to_stdout(
856 fast: bool, *, write_back: WriteBack = WriteBack.NO, mode: Mode
858 """Format file on stdin. Return True if changed.
860 If `write_back` is YES, write reformatted code back to stdout. If it is DIFF,
861 write a diff to stdout. The `mode` argument is passed to
862 :func:`format_file_contents`.
864 then = datetime.utcnow()
865 src, encoding, newline = decode_bytes(sys.stdin.buffer.read())
868 dst = format_file_contents(src, fast=fast, mode=mode)
871 except NothingChanged:
875 f = io.TextIOWrapper(
876 sys.stdout.buffer, encoding=encoding, newline=newline, write_through=True
878 if write_back == WriteBack.YES:
880 elif write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF):
881 now = datetime.utcnow()
882 src_name = f"STDIN\t{then} +0000"
883 dst_name = f"STDOUT\t{now} +0000"
884 d = diff(src, dst, src_name, dst_name)
885 if write_back == WriteBack.COLOR_DIFF:
887 f = wrap_stream_for_windows(f)
892 def format_file_contents(src_contents: str, *, fast: bool, mode: Mode) -> FileContent:
893 """Reformat contents a file and return new contents.
895 If `fast` is False, additionally confirm that the reformatted code is
896 valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
897 `mode` is passed to :func:`format_str`.
899 if src_contents.strip() == "":
902 dst_contents = format_str(src_contents, mode=mode)
903 if src_contents == dst_contents:
907 assert_equivalent(src_contents, dst_contents)
908 assert_stable(src_contents, dst_contents, mode=mode)
912 def format_str(src_contents: str, *, mode: Mode) -> FileContent:
913 """Reformat a string and return new contents.
915 `mode` determines formatting options, such as how many characters per line are
919 >>> print(black.format_str("def f(arg:str='')->None:...", mode=Mode()))
920 def f(arg: str = "") -> None:
923 A more complex example:
925 ... black.format_str(
926 ... "def f(arg:str='')->None: hey",
928 ... target_versions={black.TargetVersion.PY36},
930 ... string_normalization=False,
941 src_node = lib2to3_parse(src_contents.lstrip(), mode.target_versions)
943 future_imports = get_future_imports(src_node)
944 if mode.target_versions:
945 versions = mode.target_versions
947 versions = detect_target_versions(src_node)
948 normalize_fmt_off(src_node)
949 lines = LineGenerator(
950 remove_u_prefix="unicode_literals" in future_imports
951 or supports_feature(versions, Feature.UNICODE_LITERALS),
953 normalize_strings=mode.string_normalization,
955 elt = EmptyLineTracker(is_pyi=mode.is_pyi)
958 split_line_features = {
960 for feature in {Feature.TRAILING_COMMA_IN_CALL, Feature.TRAILING_COMMA_IN_DEF}
961 if supports_feature(versions, feature)
963 for current_line in lines.visit(src_node):
964 dst_contents.append(str(empty_line) * after)
965 before, after = elt.maybe_empty_lines(current_line)
966 dst_contents.append(str(empty_line) * before)
967 for line in transform_line(
969 line_length=mode.line_length,
970 normalize_strings=mode.string_normalization,
971 features=split_line_features,
973 dst_contents.append(str(line))
974 return "".join(dst_contents)
977 def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine]:
978 """Return a tuple of (decoded_contents, encoding, newline).
980 `newline` is either CRLF or LF but `decoded_contents` is decoded with
981 universal newlines (i.e. only contains LF).
983 srcbuf = io.BytesIO(src)
984 encoding, lines = tokenize.detect_encoding(srcbuf.readline)
986 return "", encoding, "\n"
988 newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
990 with io.TextIOWrapper(srcbuf, encoding) as tiow:
991 return tiow.read(), encoding, newline
994 def get_grammars(target_versions: Set[TargetVersion]) -> List[Grammar]:
995 if not target_versions:
996 # No target_version specified, so try all grammars.
999 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords,
1001 pygram.python_grammar_no_print_statement_no_exec_statement,
1002 # Python 2.7 with future print_function import
1003 pygram.python_grammar_no_print_statement,
1005 pygram.python_grammar,
1008 if all(version.is_python2() for version in target_versions):
1009 # Python 2-only code, so try Python 2 grammars.
1011 # Python 2.7 with future print_function import
1012 pygram.python_grammar_no_print_statement,
1014 pygram.python_grammar,
1017 # Python 3-compatible code, so only try Python 3 grammar.
1019 # If we have to parse both, try to parse async as a keyword first
1020 if not supports_feature(target_versions, Feature.ASYNC_IDENTIFIERS):
1023 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords
1025 if not supports_feature(target_versions, Feature.ASYNC_KEYWORDS):
1027 grammars.append(pygram.python_grammar_no_print_statement_no_exec_statement)
1028 # At least one of the above branches must have been taken, because every Python
1029 # version has exactly one of the two 'ASYNC_*' flags
1033 def lib2to3_parse(src_txt: str, target_versions: Iterable[TargetVersion] = ()) -> Node:
1034 """Given a string with source, return the lib2to3 Node."""
1035 if src_txt[-1:] != "\n":
1038 for grammar in get_grammars(set(target_versions)):
1039 drv = driver.Driver(grammar, pytree.convert)
1041 result = drv.parse_string(src_txt, True)
1044 except ParseError as pe:
1045 lineno, column = pe.context[1]
1046 lines = src_txt.splitlines()
1048 faulty_line = lines[lineno - 1]
1050 faulty_line = "<line number missing in source>"
1051 exc = InvalidInput(f"Cannot parse: {lineno}:{column}: {faulty_line}")
1055 if isinstance(result, Leaf):
1056 result = Node(syms.file_input, [result])
1060 def lib2to3_unparse(node: Node) -> str:
1061 """Given a lib2to3 node, return its string representation."""
1066 class Visitor(Generic[T]):
1067 """Basic lib2to3 visitor that yields things of type `T` on `visit()`."""
1069 def visit(self, node: LN) -> Iterator[T]:
1070 """Main method to visit `node` and its children.
1072 It tries to find a `visit_*()` method for the given `node.type`, like
1073 `visit_simple_stmt` for Node objects or `visit_INDENT` for Leaf objects.
1074 If no dedicated `visit_*()` method is found, chooses `visit_default()`
1077 Then yields objects of type `T` from the selected visitor.
1080 name = token.tok_name[node.type]
1082 name = str(type_repr(node.type))
1083 # We explicitly branch on whether a visitor exists (instead of
1084 # using self.visit_default as the default arg to getattr) in order
1085 # to save needing to create a bound method object and so mypyc can
1086 # generate a native call to visit_default.
1087 visitf = getattr(self, f"visit_{name}", None)
1089 yield from visitf(node)
1091 yield from self.visit_default(node)
1093 def visit_default(self, node: LN) -> Iterator[T]:
1094 """Default `visit_*()` implementation. Recurses to children of `node`."""
1095 if isinstance(node, Node):
1096 for child in node.children:
1097 yield from self.visit(child)
1101 class DebugVisitor(Visitor[T]):
1104 def visit_default(self, node: LN) -> Iterator[T]:
1105 indent = " " * (2 * self.tree_depth)
1106 if isinstance(node, Node):
1107 _type = type_repr(node.type)
1108 out(f"{indent}{_type}", fg="yellow")
1109 self.tree_depth += 1
1110 for child in node.children:
1111 yield from self.visit(child)
1113 self.tree_depth -= 1
1114 out(f"{indent}/{_type}", fg="yellow", bold=False)
1116 _type = token.tok_name.get(node.type, str(node.type))
1117 out(f"{indent}{_type}", fg="blue", nl=False)
1119 # We don't have to handle prefixes for `Node` objects since
1120 # that delegates to the first child anyway.
1121 out(f" {node.prefix!r}", fg="green", bold=False, nl=False)
1122 out(f" {node.value!r}", fg="blue", bold=False)
1125 def show(cls, code: Union[str, Leaf, Node]) -> None:
1126 """Pretty-print the lib2to3 AST of a given string of `code`.
1128 Convenience method for debugging.
1130 v: DebugVisitor[None] = DebugVisitor()
1131 if isinstance(code, str):
1132 code = lib2to3_parse(code)
1136 WHITESPACE: Final = {token.DEDENT, token.INDENT, token.NEWLINE}
1137 STATEMENT: Final = {
1147 STANDALONE_COMMENT: Final = 153
1148 token.tok_name[STANDALONE_COMMENT] = "STANDALONE_COMMENT"
1149 LOGIC_OPERATORS: Final = {"and", "or"}
1150 COMPARATORS: Final = {
1158 MATH_OPERATORS: Final = {
1174 STARS: Final = {token.STAR, token.DOUBLESTAR}
1175 VARARGS_SPECIALS: Final = STARS | {token.SLASH}
1176 VARARGS_PARENTS: Final = {
1178 syms.argument, # double star in arglist
1179 syms.trailer, # single argument to call
1181 syms.varargslist, # lambdas
1183 UNPACKING_PARENTS: Final = {
1184 syms.atom, # single element of a list or set literal
1188 syms.testlist_star_expr,
1190 TEST_DESCENDANTS: Final = {
1207 ASSIGNMENTS: Final = {
1223 COMPREHENSION_PRIORITY: Final = 20
1224 COMMA_PRIORITY: Final = 18
1225 TERNARY_PRIORITY: Final = 16
1226 LOGIC_PRIORITY: Final = 14
1227 STRING_PRIORITY: Final = 12
1228 COMPARATOR_PRIORITY: Final = 10
1229 MATH_PRIORITIES: Final = {
1231 token.CIRCUMFLEX: 8,
1234 token.RIGHTSHIFT: 6,
1239 token.DOUBLESLASH: 4,
1243 token.DOUBLESTAR: 2,
1245 DOT_PRIORITY: Final = 1
1249 class BracketTracker:
1250 """Keeps track of brackets on a line."""
1253 bracket_match: Dict[Tuple[Depth, NodeType], Leaf] = field(default_factory=dict)
1254 delimiters: Dict[LeafID, Priority] = field(default_factory=dict)
1255 previous: Optional[Leaf] = None
1256 _for_loop_depths: List[int] = field(default_factory=list)
1257 _lambda_argument_depths: List[int] = field(default_factory=list)
1259 def mark(self, leaf: Leaf) -> None:
1260 """Mark `leaf` with bracket-related metadata. Keep track of delimiters.
1262 All leaves receive an int `bracket_depth` field that stores how deep
1263 within brackets a given leaf is. 0 means there are no enclosing brackets
1264 that started on this line.
1266 If a leaf is itself a closing bracket, it receives an `opening_bracket`
1267 field that it forms a pair with. This is a one-directional link to
1268 avoid reference cycles.
1270 If a leaf is a delimiter (a token on which Black can split the line if
1271 needed) and it's on depth 0, its `id()` is stored in the tracker's
1274 if leaf.type == token.COMMENT:
1277 self.maybe_decrement_after_for_loop_variable(leaf)
1278 self.maybe_decrement_after_lambda_arguments(leaf)
1279 if leaf.type in CLOSING_BRACKETS:
1281 opening_bracket = self.bracket_match.pop((self.depth, leaf.type))
1282 leaf.opening_bracket = opening_bracket
1283 leaf.bracket_depth = self.depth
1285 delim = is_split_before_delimiter(leaf, self.previous)
1286 if delim and self.previous is not None:
1287 self.delimiters[id(self.previous)] = delim
1289 delim = is_split_after_delimiter(leaf, self.previous)
1291 self.delimiters[id(leaf)] = delim
1292 if leaf.type in OPENING_BRACKETS:
1293 self.bracket_match[self.depth, BRACKET[leaf.type]] = leaf
1295 self.previous = leaf
1296 self.maybe_increment_lambda_arguments(leaf)
1297 self.maybe_increment_for_loop_variable(leaf)
1299 def any_open_brackets(self) -> bool:
1300 """Return True if there is an yet unmatched open bracket on the line."""
1301 return bool(self.bracket_match)
1303 def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> Priority:
1304 """Return the highest priority of a delimiter found on the line.
1306 Values are consistent with what `is_split_*_delimiter()` return.
1307 Raises ValueError on no delimiters.
1309 return max(v for k, v in self.delimiters.items() if k not in exclude)
1311 def delimiter_count_with_priority(self, priority: Priority = 0) -> int:
1312 """Return the number of delimiters with the given `priority`.
1314 If no `priority` is passed, defaults to max priority on the line.
1316 if not self.delimiters:
1319 priority = priority or self.max_delimiter_priority()
1320 return sum(1 for p in self.delimiters.values() if p == priority)
1322 def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
1323 """In a for loop, or comprehension, the variables are often unpacks.
1325 To avoid splitting on the comma in this situation, increase the depth of
1326 tokens between `for` and `in`.
1328 if leaf.type == token.NAME and leaf.value == "for":
1330 self._for_loop_depths.append(self.depth)
1335 def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
1336 """See `maybe_increment_for_loop_variable` above for explanation."""
1338 self._for_loop_depths
1339 and self._for_loop_depths[-1] == self.depth
1340 and leaf.type == token.NAME
1341 and leaf.value == "in"
1344 self._for_loop_depths.pop()
1349 def maybe_increment_lambda_arguments(self, leaf: Leaf) -> bool:
1350 """In a lambda expression, there might be more than one argument.
1352 To avoid splitting on the comma in this situation, increase the depth of
1353 tokens between `lambda` and `:`.
1355 if leaf.type == token.NAME and leaf.value == "lambda":
1357 self._lambda_argument_depths.append(self.depth)
1362 def maybe_decrement_after_lambda_arguments(self, leaf: Leaf) -> bool:
1363 """See `maybe_increment_lambda_arguments` above for explanation."""
1365 self._lambda_argument_depths
1366 and self._lambda_argument_depths[-1] == self.depth
1367 and leaf.type == token.COLON
1370 self._lambda_argument_depths.pop()
1375 def get_open_lsqb(self) -> Optional[Leaf]:
1376 """Return the most recent opening square bracket (if any)."""
1377 return self.bracket_match.get((self.depth - 1, token.RSQB))
1382 """Holds leaves and comments. Can be printed with `str(line)`."""
1385 leaves: List[Leaf] = field(default_factory=list)
1386 # keys ordered like `leaves`
1387 comments: Dict[LeafID, List[Leaf]] = field(default_factory=dict)
1388 bracket_tracker: BracketTracker = field(default_factory=BracketTracker)
1389 inside_brackets: bool = False
1390 should_explode: bool = False
1392 def append(self, leaf: Leaf, preformatted: bool = False) -> None:
1393 """Add a new `leaf` to the end of the line.
1395 Unless `preformatted` is True, the `leaf` will receive a new consistent
1396 whitespace prefix and metadata applied by :class:`BracketTracker`.
1397 Trailing commas are maybe removed, unpacked for loop variables are
1398 demoted from being delimiters.
1400 Inline comments are put aside.
1402 has_value = leaf.type in BRACKETS or bool(leaf.value.strip())
1406 if token.COLON == leaf.type and self.is_class_paren_empty:
1407 del self.leaves[-2:]
1408 if self.leaves and not preformatted:
1409 # Note: at this point leaf.prefix should be empty except for
1410 # imports, for which we only preserve newlines.
1411 leaf.prefix += whitespace(
1412 leaf, complex_subscript=self.is_complex_subscript(leaf)
1414 if self.inside_brackets or not preformatted:
1415 self.bracket_tracker.mark(leaf)
1416 self.maybe_remove_trailing_comma(leaf)
1417 if not self.append_comment(leaf):
1418 self.leaves.append(leaf)
1420 def append_safe(self, leaf: Leaf, preformatted: bool = False) -> None:
1421 """Like :func:`append()` but disallow invalid standalone comment structure.
1423 Raises ValueError when any `leaf` is appended after a standalone comment
1424 or when a standalone comment is not the first leaf on the line.
1426 if self.bracket_tracker.depth == 0:
1428 raise ValueError("cannot append to standalone comments")
1430 if self.leaves and leaf.type == STANDALONE_COMMENT:
1432 "cannot append standalone comments to a populated line"
1435 self.append(leaf, preformatted=preformatted)
1438 def is_comment(self) -> bool:
1439 """Is this line a standalone comment?"""
1440 return len(self.leaves) == 1 and self.leaves[0].type == STANDALONE_COMMENT
1443 def is_decorator(self) -> bool:
1444 """Is this line a decorator?"""
1445 return bool(self) and self.leaves[0].type == token.AT
1448 def is_import(self) -> bool:
1449 """Is this an import line?"""
1450 return bool(self) and is_import(self.leaves[0])
1453 def is_class(self) -> bool:
1454 """Is this line a class definition?"""
1457 and self.leaves[0].type == token.NAME
1458 and self.leaves[0].value == "class"
1462 def is_stub_class(self) -> bool:
1463 """Is this line a class definition with a body consisting only of "..."?"""
1464 return self.is_class and self.leaves[-3:] == [
1465 Leaf(token.DOT, ".") for _ in range(3)
1469 def is_collection_with_optional_trailing_comma(self) -> bool:
1470 """Is this line a collection literal with a trailing comma that's optional?
1472 Note that the trailing comma in a 1-tuple is not optional.
1474 if not self.leaves or len(self.leaves) < 4:
1477 # Look for and address a trailing colon.
1478 if self.leaves[-1].type == token.COLON:
1479 closer = self.leaves[-2]
1482 closer = self.leaves[-1]
1484 if closer.type not in CLOSING_BRACKETS or self.inside_brackets:
1487 if closer.type == token.RPAR:
1488 # Tuples require an extra check, because if there's only
1489 # one element in the tuple removing the comma unmakes the
1492 # We also check for parens before looking for the trailing
1493 # comma because in some cases (eg assigning a dict
1494 # literal) the literal gets wrapped in temporary parens
1495 # during parsing. This case is covered by the
1496 # collections.py test data.
1497 opener = closer.opening_bracket
1498 for _open_index, leaf in enumerate(self.leaves):
1503 # Couldn't find the matching opening paren, play it safe.
1507 comma_depth = self.leaves[close_index - 1].bracket_depth
1508 for leaf in self.leaves[_open_index + 1 : close_index]:
1509 if leaf.bracket_depth == comma_depth and leaf.type == token.COMMA:
1512 # We haven't looked yet for the trailing comma because
1513 # we might also have caught noop parens.
1514 return self.leaves[close_index - 1].type == token.COMMA
1517 return False # it's either a one-tuple or didn't have a trailing comma
1519 if self.leaves[close_index - 1].type in CLOSING_BRACKETS:
1521 closer = self.leaves[close_index]
1522 if closer.type == token.RPAR:
1523 # TODO: this is a gut feeling. Will we ever see this?
1526 if self.leaves[close_index - 1].type != token.COMMA:
1532 def is_def(self) -> bool:
1533 """Is this a function definition? (Also returns True for async defs.)"""
1535 first_leaf = self.leaves[0]
1540 second_leaf: Optional[Leaf] = self.leaves[1]
1543 return (first_leaf.type == token.NAME and first_leaf.value == "def") or (
1544 first_leaf.type == token.ASYNC
1545 and second_leaf is not None
1546 and second_leaf.type == token.NAME
1547 and second_leaf.value == "def"
1551 def is_class_paren_empty(self) -> bool:
1552 """Is this a class with no base classes but using parentheses?
1554 Those are unnecessary and should be removed.
1558 and len(self.leaves) == 4
1560 and self.leaves[2].type == token.LPAR
1561 and self.leaves[2].value == "("
1562 and self.leaves[3].type == token.RPAR
1563 and self.leaves[3].value == ")"
1567 def is_triple_quoted_string(self) -> bool:
1568 """Is the line a triple quoted string?"""
1571 and self.leaves[0].type == token.STRING
1572 and self.leaves[0].value.startswith(('"""', "'''"))
1575 def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
1576 """If so, needs to be split before emitting."""
1577 for leaf in self.leaves:
1578 if leaf.type == STANDALONE_COMMENT and leaf.bracket_depth <= depth_limit:
1583 def contains_uncollapsable_type_comments(self) -> bool:
1586 last_leaf = self.leaves[-1]
1587 ignored_ids.add(id(last_leaf))
1588 if last_leaf.type == token.COMMA or (
1589 last_leaf.type == token.RPAR and not last_leaf.value
1591 # When trailing commas or optional parens are inserted by Black for
1592 # consistency, comments after the previous last element are not moved
1593 # (they don't have to, rendering will still be correct). So we ignore
1594 # trailing commas and invisible.
1595 last_leaf = self.leaves[-2]
1596 ignored_ids.add(id(last_leaf))
1600 # A type comment is uncollapsable if it is attached to a leaf
1601 # that isn't at the end of the line (since that could cause it
1602 # to get associated to a different argument) or if there are
1603 # comments before it (since that could cause it to get hidden
1605 comment_seen = False
1606 for leaf_id, comments in self.comments.items():
1607 for comment in comments:
1608 if is_type_comment(comment):
1609 if comment_seen or (
1610 not is_type_comment(comment, " ignore")
1611 and leaf_id not in ignored_ids
1619 def contains_unsplittable_type_ignore(self) -> bool:
1623 # If a 'type: ignore' is attached to the end of a line, we
1624 # can't split the line, because we can't know which of the
1625 # subexpressions the ignore was meant to apply to.
1627 # We only want this to apply to actual physical lines from the
1628 # original source, though: we don't want the presence of a
1629 # 'type: ignore' at the end of a multiline expression to
1630 # justify pushing it all onto one line. Thus we
1631 # (unfortunately) need to check the actual source lines and
1632 # only report an unsplittable 'type: ignore' if this line was
1633 # one line in the original code.
1635 # Grab the first and last line numbers, skipping generated leaves
1636 first_line = next((leaf.lineno for leaf in self.leaves if leaf.lineno != 0), 0)
1638 (leaf.lineno for leaf in reversed(self.leaves) if leaf.lineno != 0), 0
1641 if first_line == last_line:
1642 # We look at the last two leaves since a comma or an
1643 # invisible paren could have been added at the end of the
1645 for node in self.leaves[-2:]:
1646 for comment in self.comments.get(id(node), []):
1647 if is_type_comment(comment, " ignore"):
1652 def contains_multiline_strings(self) -> bool:
1653 return any(is_multiline_string(leaf) for leaf in self.leaves)
1655 def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
1656 """Remove trailing comma if there is one and it's safe."""
1657 if not (self.leaves and self.leaves[-1].type == token.COMMA):
1660 # We remove trailing commas only in the case of importing a
1661 # single name from a module.
1665 and len(self.leaves) > 4
1666 and self.leaves[-1].type == token.COMMA
1667 and closing.type in CLOSING_BRACKETS
1668 and self.leaves[-4].type == token.NAME
1670 # regular `from foo import bar,`
1671 self.leaves[-4].value == "import"
1672 # `from foo import (bar as baz,)
1674 len(self.leaves) > 6
1675 and self.leaves[-6].value == "import"
1676 and self.leaves[-3].value == "as"
1678 # `from foo import bar as baz,`
1680 len(self.leaves) > 5
1681 and self.leaves[-5].value == "import"
1682 and self.leaves[-3].value == "as"
1685 and closing.type == token.RPAR
1689 self.remove_trailing_comma()
1692 def append_comment(self, comment: Leaf) -> bool:
1693 """Add an inline or standalone comment to the line."""
1695 comment.type == STANDALONE_COMMENT
1696 and self.bracket_tracker.any_open_brackets()
1701 if comment.type != token.COMMENT:
1705 comment.type = STANDALONE_COMMENT
1709 last_leaf = self.leaves[-1]
1711 last_leaf.type == token.RPAR
1712 and not last_leaf.value
1713 and last_leaf.parent
1714 and len(list(last_leaf.parent.leaves())) <= 3
1715 and not is_type_comment(comment)
1717 # Comments on an optional parens wrapping a single leaf should belong to
1718 # the wrapped node except if it's a type comment. Pinning the comment like
1719 # this avoids unstable formatting caused by comment migration.
1720 if len(self.leaves) < 2:
1721 comment.type = STANDALONE_COMMENT
1725 last_leaf = self.leaves[-2]
1726 self.comments.setdefault(id(last_leaf), []).append(comment)
1729 def comments_after(self, leaf: Leaf) -> List[Leaf]:
1730 """Generate comments that should appear directly after `leaf`."""
1731 return self.comments.get(id(leaf), [])
1733 def remove_trailing_comma(self) -> None:
1734 """Remove the trailing comma and moves the comments attached to it."""
1735 trailing_comma = self.leaves.pop()
1736 trailing_comma_comments = self.comments.pop(id(trailing_comma), [])
1737 self.comments.setdefault(id(self.leaves[-1]), []).extend(
1738 trailing_comma_comments
1741 def is_complex_subscript(self, leaf: Leaf) -> bool:
1742 """Return True iff `leaf` is part of a slice with non-trivial exprs."""
1743 open_lsqb = self.bracket_tracker.get_open_lsqb()
1744 if open_lsqb is None:
1747 subscript_start = open_lsqb.next_sibling
1749 if isinstance(subscript_start, Node):
1750 if subscript_start.type == syms.listmaker:
1753 if subscript_start.type == syms.subscriptlist:
1754 subscript_start = child_towards(subscript_start, leaf)
1755 return subscript_start is not None and any(
1756 n.type in TEST_DESCENDANTS for n in subscript_start.pre_order()
1759 def clone(self) -> "Line":
1762 inside_brackets=self.inside_brackets,
1763 should_explode=self.should_explode,
1766 def __str__(self) -> str:
1767 """Render the line."""
1771 indent = " " * self.depth
1772 leaves = iter(self.leaves)
1773 first = next(leaves)
1774 res = f"{first.prefix}{indent}{first.value}"
1777 for comment in itertools.chain.from_iterable(self.comments.values()):
1782 def __bool__(self) -> bool:
1783 """Return True if the line has leaves or comments."""
1784 return bool(self.leaves or self.comments)
1788 class EmptyLineTracker:
1789 """Provides a stateful method that returns the number of potential extra
1790 empty lines needed before and after the currently processed line.
1792 Note: this tracker works on lines that haven't been split yet. It assumes
1793 the prefix of the first leaf consists of optional newlines. Those newlines
1794 are consumed by `maybe_empty_lines()` and included in the computation.
1797 is_pyi: bool = False
1798 previous_line: Optional[Line] = None
1799 previous_after: int = 0
1800 previous_defs: List[int] = field(default_factory=list)
1802 def maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1803 """Return the number of extra empty lines before and after the `current_line`.
1805 This is for separating `def`, `async def` and `class` with extra empty
1806 lines (two on module-level).
1808 before, after = self._maybe_empty_lines(current_line)
1810 # Black should not insert empty lines at the beginning
1813 if self.previous_line is None
1814 else before - self.previous_after
1816 self.previous_after = after
1817 self.previous_line = current_line
1818 return before, after
1820 def _maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1822 if current_line.depth == 0:
1823 max_allowed = 1 if self.is_pyi else 2
1824 if current_line.leaves:
1825 # Consume the first leaf's extra newlines.
1826 first_leaf = current_line.leaves[0]
1827 before = first_leaf.prefix.count("\n")
1828 before = min(before, max_allowed)
1829 first_leaf.prefix = ""
1832 depth = current_line.depth
1833 while self.previous_defs and self.previous_defs[-1] >= depth:
1834 self.previous_defs.pop()
1836 before = 0 if depth else 1
1838 before = 1 if depth else 2
1839 if current_line.is_decorator or current_line.is_def or current_line.is_class:
1840 return self._maybe_empty_lines_for_class_or_def(current_line, before)
1844 and self.previous_line.is_import
1845 and not current_line.is_import
1846 and depth == self.previous_line.depth
1848 return (before or 1), 0
1852 and self.previous_line.is_class
1853 and current_line.is_triple_quoted_string
1859 def _maybe_empty_lines_for_class_or_def(
1860 self, current_line: Line, before: int
1861 ) -> Tuple[int, int]:
1862 if not current_line.is_decorator:
1863 self.previous_defs.append(current_line.depth)
1864 if self.previous_line is None:
1865 # Don't insert empty lines before the first line in the file.
1868 if self.previous_line.is_decorator:
1871 if self.previous_line.depth < current_line.depth and (
1872 self.previous_line.is_class or self.previous_line.is_def
1877 self.previous_line.is_comment
1878 and self.previous_line.depth == current_line.depth
1884 if self.previous_line.depth > current_line.depth:
1886 elif current_line.is_class or self.previous_line.is_class:
1887 if current_line.is_stub_class and self.previous_line.is_stub_class:
1888 # No blank line between classes with an empty body
1892 elif current_line.is_def and not self.previous_line.is_def:
1893 # Blank line between a block of functions and a block of non-functions
1899 if current_line.depth and newlines:
1905 class LineGenerator(Visitor[Line]):
1906 """Generates reformatted Line objects. Empty lines are not emitted.
1908 Note: destroys the tree it's visiting by mutating prefixes of its leaves
1909 in ways that will no longer stringify to valid Python code on the tree.
1912 is_pyi: bool = False
1913 normalize_strings: bool = True
1914 current_line: Line = field(default_factory=Line)
1915 remove_u_prefix: bool = False
1917 def line(self, indent: int = 0) -> Iterator[Line]:
1920 If the line is empty, only emit if it makes sense.
1921 If the line is too long, split it first and then generate.
1923 If any lines were generated, set up a new current_line.
1925 if not self.current_line:
1926 self.current_line.depth += indent
1927 return # Line is empty, don't emit. Creating a new one unnecessary.
1929 complete_line = self.current_line
1930 self.current_line = Line(depth=complete_line.depth + indent)
1933 def visit_default(self, node: LN) -> Iterator[Line]:
1934 """Default `visit_*()` implementation. Recurses to children of `node`."""
1935 if isinstance(node, Leaf):
1936 any_open_brackets = self.current_line.bracket_tracker.any_open_brackets()
1937 for comment in generate_comments(node):
1938 if any_open_brackets:
1939 # any comment within brackets is subject to splitting
1940 self.current_line.append(comment)
1941 elif comment.type == token.COMMENT:
1942 # regular trailing comment
1943 self.current_line.append(comment)
1944 yield from self.line()
1947 # regular standalone comment
1948 yield from self.line()
1950 self.current_line.append(comment)
1951 yield from self.line()
1953 normalize_prefix(node, inside_brackets=any_open_brackets)
1954 if self.normalize_strings and node.type == token.STRING:
1955 normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
1956 normalize_string_quotes(node)
1957 if node.type == token.NUMBER:
1958 normalize_numeric_literal(node)
1959 if node.type not in WHITESPACE:
1960 self.current_line.append(node)
1961 yield from super().visit_default(node)
1963 def visit_INDENT(self, node: Leaf) -> Iterator[Line]:
1964 """Increase indentation level, maybe yield a line."""
1965 # In blib2to3 INDENT never holds comments.
1966 yield from self.line(+1)
1967 yield from self.visit_default(node)
1969 def visit_DEDENT(self, node: Leaf) -> Iterator[Line]:
1970 """Decrease indentation level, maybe yield a line."""
1971 # The current line might still wait for trailing comments. At DEDENT time
1972 # there won't be any (they would be prefixes on the preceding NEWLINE).
1973 # Emit the line then.
1974 yield from self.line()
1976 # While DEDENT has no value, its prefix may contain standalone comments
1977 # that belong to the current indentation level. Get 'em.
1978 yield from self.visit_default(node)
1980 # Finally, emit the dedent.
1981 yield from self.line(-1)
1984 self, node: Node, keywords: Set[str], parens: Set[str]
1985 ) -> Iterator[Line]:
1986 """Visit a statement.
1988 This implementation is shared for `if`, `while`, `for`, `try`, `except`,
1989 `def`, `with`, `class`, `assert` and assignments.
1991 The relevant Python language `keywords` for a given statement will be
1992 NAME leaves within it. This methods puts those on a separate line.
1994 `parens` holds a set of string leaf values immediately after which
1995 invisible parens should be put.
1997 normalize_invisible_parens(node, parens_after=parens)
1998 for child in node.children:
1999 if child.type == token.NAME and child.value in keywords: # type: ignore
2000 yield from self.line()
2002 yield from self.visit(child)
2004 def visit_suite(self, node: Node) -> Iterator[Line]:
2005 """Visit a suite."""
2006 if self.is_pyi and is_stub_suite(node):
2007 yield from self.visit(node.children[2])
2009 yield from self.visit_default(node)
2011 def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
2012 """Visit a statement without nested statements."""
2013 is_suite_like = node.parent and node.parent.type in STATEMENT
2015 if self.is_pyi and is_stub_body(node):
2016 yield from self.visit_default(node)
2018 yield from self.line(+1)
2019 yield from self.visit_default(node)
2020 yield from self.line(-1)
2023 if not self.is_pyi or not node.parent or not is_stub_suite(node.parent):
2024 yield from self.line()
2025 yield from self.visit_default(node)
2027 def visit_async_stmt(self, node: Node) -> Iterator[Line]:
2028 """Visit `async def`, `async for`, `async with`."""
2029 yield from self.line()
2031 children = iter(node.children)
2032 for child in children:
2033 yield from self.visit(child)
2035 if child.type == token.ASYNC:
2038 internal_stmt = next(children)
2039 for child in internal_stmt.children:
2040 yield from self.visit(child)
2042 def visit_decorators(self, node: Node) -> Iterator[Line]:
2043 """Visit decorators."""
2044 for child in node.children:
2045 yield from self.line()
2046 yield from self.visit(child)
2048 def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
2049 """Remove a semicolon and put the other statement on a separate line."""
2050 yield from self.line()
2052 def visit_ENDMARKER(self, leaf: Leaf) -> Iterator[Line]:
2053 """End of file. Process outstanding comments and end with a newline."""
2054 yield from self.visit_default(leaf)
2055 yield from self.line()
2057 def visit_STANDALONE_COMMENT(self, leaf: Leaf) -> Iterator[Line]:
2058 if not self.current_line.bracket_tracker.any_open_brackets():
2059 yield from self.line()
2060 yield from self.visit_default(leaf)
2062 def visit_factor(self, node: Node) -> Iterator[Line]:
2063 """Force parentheses between a unary op and a binary power:
2065 -2 ** 8 -> -(2 ** 8)
2067 _operator, operand = node.children
2069 operand.type == syms.power
2070 and len(operand.children) == 3
2071 and operand.children[1].type == token.DOUBLESTAR
2073 lpar = Leaf(token.LPAR, "(")
2074 rpar = Leaf(token.RPAR, ")")
2075 index = operand.remove() or 0
2076 node.insert_child(index, Node(syms.atom, [lpar, operand, rpar]))
2077 yield from self.visit_default(node)
2079 def visit_STRING(self, leaf: Leaf) -> Iterator[Line]:
2080 # Check if it's a docstring
2081 if prev_siblings_are(
2082 leaf.parent, [None, token.NEWLINE, token.INDENT, syms.simple_stmt]
2083 ) and is_multiline_string(leaf):
2084 prefix = " " * self.current_line.depth
2085 docstring = fix_docstring(leaf.value[3:-3], prefix)
2086 leaf.value = leaf.value[0:3] + docstring + leaf.value[-3:]
2087 normalize_string_quotes(leaf)
2089 yield from self.visit_default(leaf)
2091 def __post_init__(self) -> None:
2092 """You are in a twisty little maze of passages."""
2095 self.visit_assert_stmt = partial(v, keywords={"assert"}, parens={"assert", ","})
2096 self.visit_if_stmt = partial(
2097 v, keywords={"if", "else", "elif"}, parens={"if", "elif"}
2099 self.visit_while_stmt = partial(v, keywords={"while", "else"}, parens={"while"})
2100 self.visit_for_stmt = partial(v, keywords={"for", "else"}, parens={"for", "in"})
2101 self.visit_try_stmt = partial(
2102 v, keywords={"try", "except", "else", "finally"}, parens=Ø
2104 self.visit_except_clause = partial(v, keywords={"except"}, parens=Ø)
2105 self.visit_with_stmt = partial(v, keywords={"with"}, parens=Ø)
2106 self.visit_funcdef = partial(v, keywords={"def"}, parens=Ø)
2107 self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
2108 self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
2109 self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
2110 self.visit_import_from = partial(v, keywords=Ø, parens={"import"})
2111 self.visit_del_stmt = partial(v, keywords=Ø, parens={"del"})
2112 self.visit_async_funcdef = self.visit_async_stmt
2113 self.visit_decorated = self.visit_decorators
2116 IMPLICIT_TUPLE = {syms.testlist, syms.testlist_star_expr, syms.exprlist}
2117 BRACKET = {token.LPAR: token.RPAR, token.LSQB: token.RSQB, token.LBRACE: token.RBRACE}
2118 OPENING_BRACKETS = set(BRACKET.keys())
2119 CLOSING_BRACKETS = set(BRACKET.values())
2120 BRACKETS = OPENING_BRACKETS | CLOSING_BRACKETS
2121 ALWAYS_NO_SPACE = CLOSING_BRACKETS | {token.COMMA, STANDALONE_COMMENT}
2124 def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str: # noqa: C901
2125 """Return whitespace prefix if needed for the given `leaf`.
2127 `complex_subscript` signals whether the given leaf is part of a subscription
2128 which has non-trivial arguments, like arithmetic expressions or function calls.
2136 if t in ALWAYS_NO_SPACE:
2139 if t == token.COMMENT:
2142 assert p is not None, f"INTERNAL ERROR: hand-made leaf without parent: {leaf!r}"
2143 if t == token.COLON and p.type not in {
2150 prev = leaf.prev_sibling
2152 prevp = preceding_leaf(p)
2153 if not prevp or prevp.type in OPENING_BRACKETS:
2156 if t == token.COLON:
2157 if prevp.type == token.COLON:
2160 elif prevp.type != token.COMMA and not complex_subscript:
2165 if prevp.type == token.EQUAL:
2167 if prevp.parent.type in {
2175 elif prevp.parent.type == syms.typedargslist:
2176 # A bit hacky: if the equal sign has whitespace, it means we
2177 # previously found it's a typed argument. So, we're using
2181 elif prevp.type in VARARGS_SPECIALS:
2182 if is_vararg(prevp, within=VARARGS_PARENTS | UNPACKING_PARENTS):
2185 elif prevp.type == token.COLON:
2186 if prevp.parent and prevp.parent.type in {syms.subscript, syms.sliceop}:
2187 return SPACE if complex_subscript else NO
2191 and prevp.parent.type == syms.factor
2192 and prevp.type in MATH_OPERATORS
2197 prevp.type == token.RIGHTSHIFT
2199 and prevp.parent.type == syms.shift_expr
2200 and prevp.prev_sibling
2201 and prevp.prev_sibling.type == token.NAME
2202 and prevp.prev_sibling.value == "print" # type: ignore
2204 # Python 2 print chevron
2207 elif prev.type in OPENING_BRACKETS:
2210 if p.type in {syms.parameters, syms.arglist}:
2211 # untyped function signatures or calls
2212 if not prev or prev.type != token.COMMA:
2215 elif p.type == syms.varargslist:
2217 if prev and prev.type != token.COMMA:
2220 elif p.type == syms.typedargslist:
2221 # typed function signatures
2225 if t == token.EQUAL:
2226 if prev.type != syms.tname:
2229 elif prev.type == token.EQUAL:
2230 # A bit hacky: if the equal sign has whitespace, it means we
2231 # previously found it's a typed argument. So, we're using that, too.
2234 elif prev.type != token.COMMA:
2237 elif p.type == syms.tname:
2240 prevp = preceding_leaf(p)
2241 if not prevp or prevp.type != token.COMMA:
2244 elif p.type == syms.trailer:
2245 # attributes and calls
2246 if t == token.LPAR or t == token.RPAR:
2251 prevp = preceding_leaf(p)
2252 if not prevp or prevp.type != token.NUMBER:
2255 elif t == token.LSQB:
2258 elif prev.type != token.COMMA:
2261 elif p.type == syms.argument:
2263 if t == token.EQUAL:
2267 prevp = preceding_leaf(p)
2268 if not prevp or prevp.type == token.LPAR:
2271 elif prev.type in {token.EQUAL} | VARARGS_SPECIALS:
2274 elif p.type == syms.decorator:
2278 elif p.type == syms.dotted_name:
2282 prevp = preceding_leaf(p)
2283 if not prevp or prevp.type == token.AT or prevp.type == token.DOT:
2286 elif p.type == syms.classdef:
2290 if prev and prev.type == token.LPAR:
2293 elif p.type in {syms.subscript, syms.sliceop}:
2296 assert p.parent is not None, "subscripts are always parented"
2297 if p.parent.type == syms.subscriptlist:
2302 elif not complex_subscript:
2305 elif p.type == syms.atom:
2306 if prev and t == token.DOT:
2307 # dots, but not the first one.
2310 elif p.type == syms.dictsetmaker:
2312 if prev and prev.type == token.DOUBLESTAR:
2315 elif p.type in {syms.factor, syms.star_expr}:
2318 prevp = preceding_leaf(p)
2319 if not prevp or prevp.type in OPENING_BRACKETS:
2322 prevp_parent = prevp.parent
2323 assert prevp_parent is not None
2324 if prevp.type == token.COLON and prevp_parent.type in {
2330 elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
2333 elif t in {token.NAME, token.NUMBER, token.STRING}:
2336 elif p.type == syms.import_from:
2338 if prev and prev.type == token.DOT:
2341 elif t == token.NAME:
2345 if prev and prev.type == token.DOT:
2348 elif p.type == syms.sliceop:
2354 def preceding_leaf(node: Optional[LN]) -> Optional[Leaf]:
2355 """Return the first leaf that precedes `node`, if any."""
2357 res = node.prev_sibling
2359 if isinstance(res, Leaf):
2363 return list(res.leaves())[-1]
2372 def prev_siblings_are(node: Optional[LN], tokens: List[Optional[NodeType]]) -> bool:
2373 """Return if the `node` and its previous siblings match types against the provided
2374 list of tokens; the provided `node`has its type matched against the last element in
2375 the list. `None` can be used as the first element to declare that the start of the
2376 list is anchored at the start of its parent's children."""
2379 if tokens[-1] is None:
2383 if node.type != tokens[-1]:
2385 return prev_siblings_are(node.prev_sibling, tokens[:-1])
2388 def child_towards(ancestor: Node, descendant: LN) -> Optional[LN]:
2389 """Return the child of `ancestor` that contains `descendant`."""
2390 node: Optional[LN] = descendant
2391 while node and node.parent != ancestor:
2396 def container_of(leaf: Leaf) -> LN:
2397 """Return `leaf` or one of its ancestors that is the topmost container of it.
2399 By "container" we mean a node where `leaf` is the very first child.
2401 same_prefix = leaf.prefix
2402 container: LN = leaf
2404 parent = container.parent
2408 if parent.children[0].prefix != same_prefix:
2411 if parent.type == syms.file_input:
2414 if parent.prev_sibling is not None and parent.prev_sibling.type in BRACKETS:
2421 def is_split_after_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2422 """Return the priority of the `leaf` delimiter, given a line break after it.
2424 The delimiter priorities returned here are from those delimiters that would
2425 cause a line break after themselves.
2427 Higher numbers are higher priority.
2429 if leaf.type == token.COMMA:
2430 return COMMA_PRIORITY
2435 def is_split_before_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2436 """Return the priority of the `leaf` delimiter, given a line break before it.
2438 The delimiter priorities returned here are from those delimiters that would
2439 cause a line break before themselves.
2441 Higher numbers are higher priority.
2443 if is_vararg(leaf, within=VARARGS_PARENTS | UNPACKING_PARENTS):
2444 # * and ** might also be MATH_OPERATORS but in this case they are not.
2445 # Don't treat them as a delimiter.
2449 leaf.type == token.DOT
2451 and leaf.parent.type not in {syms.import_from, syms.dotted_name}
2452 and (previous is None or previous.type in CLOSING_BRACKETS)
2457 leaf.type in MATH_OPERATORS
2459 and leaf.parent.type not in {syms.factor, syms.star_expr}
2461 return MATH_PRIORITIES[leaf.type]
2463 if leaf.type in COMPARATORS:
2464 return COMPARATOR_PRIORITY
2467 leaf.type == token.STRING
2468 and previous is not None
2469 and previous.type == token.STRING
2471 return STRING_PRIORITY
2473 if leaf.type not in {token.NAME, token.ASYNC}:
2479 and leaf.parent.type in {syms.comp_for, syms.old_comp_for}
2480 or leaf.type == token.ASYNC
2483 not isinstance(leaf.prev_sibling, Leaf)
2484 or leaf.prev_sibling.value != "async"
2486 return COMPREHENSION_PRIORITY
2491 and leaf.parent.type in {syms.comp_if, syms.old_comp_if}
2493 return COMPREHENSION_PRIORITY
2495 if leaf.value in {"if", "else"} and leaf.parent and leaf.parent.type == syms.test:
2496 return TERNARY_PRIORITY
2498 if leaf.value == "is":
2499 return COMPARATOR_PRIORITY
2504 and leaf.parent.type in {syms.comp_op, syms.comparison}
2506 previous is not None
2507 and previous.type == token.NAME
2508 and previous.value == "not"
2511 return COMPARATOR_PRIORITY
2516 and leaf.parent.type == syms.comp_op
2518 previous is not None
2519 and previous.type == token.NAME
2520 and previous.value == "is"
2523 return COMPARATOR_PRIORITY
2525 if leaf.value in LOGIC_OPERATORS and leaf.parent:
2526 return LOGIC_PRIORITY
2531 FMT_OFF = {"# fmt: off", "# fmt:off", "# yapf: disable"}
2532 FMT_ON = {"# fmt: on", "# fmt:on", "# yapf: enable"}
2535 def generate_comments(leaf: LN) -> Iterator[Leaf]:
2536 """Clean the prefix of the `leaf` and generate comments from it, if any.
2538 Comments in lib2to3 are shoved into the whitespace prefix. This happens
2539 in `pgen2/driver.py:Driver.parse_tokens()`. This was a brilliant implementation
2540 move because it does away with modifying the grammar to include all the
2541 possible places in which comments can be placed.
2543 The sad consequence for us though is that comments don't "belong" anywhere.
2544 This is why this function generates simple parentless Leaf objects for
2545 comments. We simply don't know what the correct parent should be.
2547 No matter though, we can live without this. We really only need to
2548 differentiate between inline and standalone comments. The latter don't
2549 share the line with any code.
2551 Inline comments are emitted as regular token.COMMENT leaves. Standalone
2552 are emitted with a fake STANDALONE_COMMENT token identifier.
2554 for pc in list_comments(leaf.prefix, is_endmarker=leaf.type == token.ENDMARKER):
2555 yield Leaf(pc.type, pc.value, prefix="\n" * pc.newlines)
2560 """Describes a piece of syntax that is a comment.
2562 It's not a :class:`blib2to3.pytree.Leaf` so that:
2564 * it can be cached (`Leaf` objects should not be reused more than once as
2565 they store their lineno, column, prefix, and parent information);
2566 * `newlines` and `consumed` fields are kept separate from the `value`. This
2567 simplifies handling of special marker comments like ``# fmt: off/on``.
2570 type: int # token.COMMENT or STANDALONE_COMMENT
2571 value: str # content of the comment
2572 newlines: int # how many newlines before the comment
2573 consumed: int # how many characters of the original leaf's prefix did we consume
2576 @lru_cache(maxsize=4096)
2577 def list_comments(prefix: str, *, is_endmarker: bool) -> List[ProtoComment]:
2578 """Return a list of :class:`ProtoComment` objects parsed from the given `prefix`."""
2579 result: List[ProtoComment] = []
2580 if not prefix or "#" not in prefix:
2586 for index, line in enumerate(prefix.split("\n")):
2587 consumed += len(line) + 1 # adding the length of the split '\n'
2588 line = line.lstrip()
2591 if not line.startswith("#"):
2592 # Escaped newlines outside of a comment are not really newlines at
2593 # all. We treat a single-line comment following an escaped newline
2594 # as a simple trailing comment.
2595 if line.endswith("\\"):
2599 if index == ignored_lines and not is_endmarker:
2600 comment_type = token.COMMENT # simple trailing comment
2602 comment_type = STANDALONE_COMMENT
2603 comment = make_comment(line)
2606 type=comment_type, value=comment, newlines=nlines, consumed=consumed
2613 def make_comment(content: str) -> str:
2614 """Return a consistently formatted comment from the given `content` string.
2616 All comments (except for "##", "#!", "#:", '#'", "#%%") should have a single
2617 space between the hash sign and the content.
2619 If `content` didn't start with a hash sign, one is provided.
2621 content = content.rstrip()
2625 if content[0] == "#":
2626 content = content[1:]
2627 if content and content[0] not in " !:#'%":
2628 content = " " + content
2629 return "#" + content
2635 normalize_strings: bool,
2636 features: Collection[Feature] = (),
2637 ) -> Iterator[Line]:
2638 """Transform a `line`, potentially splitting it into many lines.
2640 They should fit in the allotted `line_length` but might not be able to.
2642 `features` are syntactical features that may be used in the output.
2648 line_str = line_to_string(line)
2650 def init_st(ST: Type[StringTransformer]) -> StringTransformer:
2651 """Initialize StringTransformer"""
2652 return ST(line_length, normalize_strings)
2654 string_merge = init_st(StringMerger)
2655 string_paren_strip = init_st(StringParenStripper)
2656 string_split = init_st(StringSplitter)
2657 string_paren_wrap = init_st(StringParenWrapper)
2659 transformers: List[Transformer]
2661 not line.contains_uncollapsable_type_comments()
2662 and not line.should_explode
2663 and not line.is_collection_with_optional_trailing_comma
2665 is_line_short_enough(line, line_length=line_length, line_str=line_str)
2666 or line.contains_unsplittable_type_ignore()
2668 and not (line.contains_standalone_comments() and line.inside_brackets)
2670 # Only apply basic string preprocessing, since lines shouldn't be split here.
2671 transformers = [string_merge, string_paren_strip]
2673 transformers = [left_hand_split]
2676 def rhs(line: Line, features: Collection[Feature]) -> Iterator[Line]:
2677 for omit in generate_trailers_to_omit(line, line_length):
2678 lines = list(right_hand_split(line, line_length, features, omit=omit))
2679 if is_line_short_enough(lines[0], line_length=line_length):
2683 # All splits failed, best effort split with no omits.
2684 # This mostly happens to multiline strings that are by definition
2685 # reported as not fitting a single line.
2686 # line_length=1 here was historically a bug that somehow became a feature.
2687 # See #762 and #781 for the full story.
2688 yield from right_hand_split(line, line_length=1, features=features)
2690 if line.inside_brackets:
2695 standalone_comment_split,
2709 for transform in transformers:
2710 # We are accumulating lines in `result` because we might want to abort
2711 # mission and return the original line in the end, or attempt a different
2713 result: List[Line] = []
2715 for transformed_line in transform(line, features):
2716 if str(transformed_line).strip("\n") == line_str:
2717 raise CannotTransform(
2718 "Line transformer returned an unchanged result"
2724 line_length=line_length,
2725 normalize_strings=normalize_strings,
2729 except CannotTransform:
2739 @dataclass # type: ignore
2740 class StringTransformer(ABC):
2742 An implementation of the Transformer protocol that relies on its
2743 subclasses overriding the template methods `do_match(...)` and
2744 `do_transform(...)`.
2746 This Transformer works exclusively on strings (for example, by merging
2749 The following sections can be found among the docstrings of each concrete
2750 StringTransformer subclass.
2753 Which requirements must be met of the given Line for this
2754 StringTransformer to be applied?
2757 If the given Line meets all of the above requirements, which string
2758 transformations can you expect to be applied to it by this
2762 What contractual agreements does this StringTransformer have with other
2763 StringTransfomers? Such collaborations should be eliminated/minimized
2764 as much as possible.
2768 normalize_strings: bool
2771 def do_match(self, line: Line) -> TMatchResult:
2774 * Ok(string_idx) such that `line.leaves[string_idx]` is our target
2775 string, if a match was able to be made.
2777 * Err(CannotTransform), if a match was not able to be made.
2781 def do_transform(self, line: Line, string_idx: int) -> Iterator[TResult[Line]]:
2784 * Ok(new_line) where new_line is the new transformed line.
2786 * Err(CannotTransform) if the transformation failed for some reason. The
2787 `do_match(...)` template method should usually be used to reject
2788 the form of the given Line, but in some cases it is difficult to
2789 know whether or not a Line meets the StringTransformer's
2790 requirements until the transformation is already midway.
2793 This method should NOT mutate @line directly, but it MAY mutate the
2794 Line's underlying Node structure. (WARNING: If the underlying Node
2795 structure IS altered, then this method should NOT be allowed to
2796 yield an CannotTransform after that point.)
2799 def __call__(self, line: Line, _features: Collection[Feature]) -> Iterator[Line]:
2801 StringTransformer instances have a call signature that mirrors that of
2802 the Transformer type.
2805 CannotTransform(...) if the concrete StringTransformer class is unable
2808 # Optimization to avoid calling `self.do_match(...)` when the line does
2809 # not contain any string.
2810 if not any(leaf.type == token.STRING for leaf in line.leaves):
2811 raise CannotTransform("There are no strings in this line.")
2813 match_result = self.do_match(line)
2815 if isinstance(match_result, Err):
2816 cant_transform = match_result.err()
2817 raise CannotTransform(
2818 f"The string transformer {self.__class__.__name__} does not recognize"
2819 " this line as one that it can transform."
2820 ) from cant_transform
2822 string_idx = match_result.ok()
2824 for line_result in self.do_transform(line, string_idx):
2825 if isinstance(line_result, Err):
2826 cant_transform = line_result.err()
2827 raise CannotTransform(
2828 "StringTransformer failed while attempting to transform string."
2829 ) from cant_transform
2830 line = line_result.ok()
2836 """A custom (i.e. manual) string split.
2838 A single CustomSplit instance represents a single substring.
2841 Consider the following string:
2848 This string will correspond to the following three CustomSplit instances:
2850 CustomSplit(False, 16)
2851 CustomSplit(False, 17)
2852 CustomSplit(True, 16)
2860 class CustomSplitMapMixin:
2862 This mixin class is used to map merged strings to a sequence of
2863 CustomSplits, which will then be used to re-split the strings iff none of
2864 the resultant substrings go over the configured max line length.
2867 _Key = Tuple[StringID, str]
2868 _CUSTOM_SPLIT_MAP: Dict[_Key, Tuple[CustomSplit, ...]] = defaultdict(tuple)
2871 def _get_key(string: str) -> "CustomSplitMapMixin._Key":
2874 A unique identifier that is used internally to map @string to a
2875 group of custom splits.
2877 return (id(string), string)
2879 def add_custom_splits(
2880 self, string: str, custom_splits: Iterable[CustomSplit]
2882 """Custom Split Map Setter Method
2885 Adds a mapping from @string to the custom splits @custom_splits.
2887 key = self._get_key(string)
2888 self._CUSTOM_SPLIT_MAP[key] = tuple(custom_splits)
2890 def pop_custom_splits(self, string: str) -> List[CustomSplit]:
2891 """Custom Split Map Getter Method
2894 * A list of the custom splits that are mapped to @string, if any
2900 Deletes the mapping between @string and its associated custom
2901 splits (which are returned to the caller).
2903 key = self._get_key(string)
2905 custom_splits = self._CUSTOM_SPLIT_MAP[key]
2906 del self._CUSTOM_SPLIT_MAP[key]
2908 return list(custom_splits)
2910 def has_custom_splits(self, string: str) -> bool:
2913 True iff @string is associated with a set of custom splits.
2915 key = self._get_key(string)
2916 return key in self._CUSTOM_SPLIT_MAP
2919 class StringMerger(CustomSplitMapMixin, StringTransformer):
2920 """StringTransformer that merges strings together.
2923 (A) The line contains adjacent strings such that at most one substring
2924 has inline comments AND none of those inline comments are pragmas AND
2925 the set of all substring prefixes is either of length 1 or equal to
2926 {"", "f"} AND none of the substrings are raw strings (i.e. are prefixed
2929 (B) The line contains a string which uses line continuation backslashes.
2932 Depending on which of the two requirements above where met, either:
2934 (A) The string group associated with the target string is merged.
2936 (B) All line-continuation backslashes are removed from the target string.
2939 StringMerger provides custom split information to StringSplitter.
2942 def do_match(self, line: Line) -> TMatchResult:
2945 is_valid_index = is_valid_index_factory(LL)
2947 for (i, leaf) in enumerate(LL):
2949 leaf.type == token.STRING
2950 and is_valid_index(i + 1)
2951 and LL[i + 1].type == token.STRING
2955 if leaf.type == token.STRING and "\\\n" in leaf.value:
2958 return TErr("This line has no strings that need merging.")
2960 def do_transform(self, line: Line, string_idx: int) -> Iterator[TResult[Line]]:
2962 rblc_result = self.__remove_backslash_line_continuation_chars(
2963 new_line, string_idx
2965 if isinstance(rblc_result, Ok):
2966 new_line = rblc_result.ok()
2968 msg_result = self.__merge_string_group(new_line, string_idx)
2969 if isinstance(msg_result, Ok):
2970 new_line = msg_result.ok()
2972 if isinstance(rblc_result, Err) and isinstance(msg_result, Err):
2973 msg_cant_transform = msg_result.err()
2974 rblc_cant_transform = rblc_result.err()
2975 cant_transform = CannotTransform(
2976 "StringMerger failed to merge any strings in this line."
2979 # Chain the errors together using `__cause__`.
2980 msg_cant_transform.__cause__ = rblc_cant_transform
2981 cant_transform.__cause__ = msg_cant_transform
2983 yield Err(cant_transform)
2988 def __remove_backslash_line_continuation_chars(
2989 line: Line, string_idx: int
2992 Merge strings that were split across multiple lines using
2993 line-continuation backslashes.
2996 Ok(new_line), if @line contains backslash line-continuation
2999 Err(CannotTransform), otherwise.
3003 string_leaf = LL[string_idx]
3005 string_leaf.type == token.STRING
3006 and "\\\n" in string_leaf.value
3007 and not has_triple_quotes(string_leaf.value)
3010 f"String leaf {string_leaf} does not contain any backslash line"
3011 " continuation characters."
3014 new_line = line.clone()
3015 new_line.comments = line.comments
3016 append_leaves(new_line, line, LL)
3018 new_string_leaf = new_line.leaves[string_idx]
3019 new_string_leaf.value = new_string_leaf.value.replace("\\\n", "")
3023 def __merge_string_group(self, line: Line, string_idx: int) -> TResult[Line]:
3025 Merges string group (i.e. set of adjacent strings) where the first
3026 string in the group is `line.leaves[string_idx]`.
3029 Ok(new_line), if ALL of the validation checks found in
3030 __validate_msg(...) pass.
3032 Err(CannotTransform), otherwise.
3036 is_valid_index = is_valid_index_factory(LL)
3038 vresult = self.__validate_msg(line, string_idx)
3039 if isinstance(vresult, Err):
3042 # If the string group is wrapped inside an Atom node, we must make sure
3043 # to later replace that Atom with our new (merged) string leaf.
3044 atom_node = LL[string_idx].parent
3046 # We will place BREAK_MARK in between every two substrings that we
3047 # merge. We will then later go through our final result and use the
3048 # various instances of BREAK_MARK we find to add the right values to
3049 # the custom split map.
3050 BREAK_MARK = "@@@@@ BLACK BREAKPOINT MARKER @@@@@"
3052 QUOTE = LL[string_idx].value[-1]
3054 def make_naked(string: str, string_prefix: str) -> str:
3055 """Strip @string (i.e. make it a "naked" string)
3058 * assert_is_leaf_string(@string)
3061 A string that is identical to @string except that
3062 @string_prefix has been stripped, the surrounding QUOTE
3063 characters have been removed, and any remaining QUOTE
3064 characters have been escaped.
3066 assert_is_leaf_string(string)
3068 RE_EVEN_BACKSLASHES = r"(?:(?<!\\)(?:\\\\)*)"
3069 naked_string = string[len(string_prefix) + 1 : -1]
3070 naked_string = re.sub(
3071 "(" + RE_EVEN_BACKSLASHES + ")" + QUOTE, r"\1\\" + QUOTE, naked_string
3075 # Holds the CustomSplit objects that will later be added to the custom
3079 # Temporary storage for the 'has_prefix' part of the CustomSplit objects.
3082 # Sets the 'prefix' variable. This is the prefix that the final merged
3084 next_str_idx = string_idx
3088 and is_valid_index(next_str_idx)
3089 and LL[next_str_idx].type == token.STRING
3091 prefix = get_string_prefix(LL[next_str_idx].value)
3094 # The next loop merges the string group. The final string will be
3097 # The following convenience variables are used:
3102 # NSS: naked next string
3106 next_str_idx = string_idx
3107 while is_valid_index(next_str_idx) and LL[next_str_idx].type == token.STRING:
3110 SS = LL[next_str_idx].value
3111 next_prefix = get_string_prefix(SS)
3113 # If this is an f-string group but this substring is not prefixed
3115 if "f" in prefix and "f" not in next_prefix:
3116 # Then we must escape any braces contained in this substring.
3117 SS = re.subf(r"(\{|\})", "{1}{1}", SS)
3119 NSS = make_naked(SS, next_prefix)
3121 has_prefix = bool(next_prefix)
3122 prefix_tracker.append(has_prefix)
3124 S = prefix + QUOTE + NS + NSS + BREAK_MARK + QUOTE
3125 NS = make_naked(S, prefix)
3129 S_leaf = Leaf(token.STRING, S)
3130 if self.normalize_strings:
3131 normalize_string_quotes(S_leaf)
3133 # Fill the 'custom_splits' list with the appropriate CustomSplit objects.
3134 temp_string = S_leaf.value[len(prefix) + 1 : -1]
3135 for has_prefix in prefix_tracker:
3136 mark_idx = temp_string.find(BREAK_MARK)
3139 ), "Logic error while filling the custom string breakpoint cache."
3141 temp_string = temp_string[mark_idx + len(BREAK_MARK) :]
3142 breakpoint_idx = mark_idx + (len(prefix) if has_prefix else 0) + 1
3143 custom_splits.append(CustomSplit(has_prefix, breakpoint_idx))
3145 string_leaf = Leaf(token.STRING, S_leaf.value.replace(BREAK_MARK, ""))
3147 if atom_node is not None:
3148 replace_child(atom_node, string_leaf)
3150 # Build the final line ('new_line') that this method will later return.
3151 new_line = line.clone()
3152 for (i, leaf) in enumerate(LL):
3154 new_line.append(string_leaf)
3156 if string_idx <= i < string_idx + num_of_strings:
3157 for comment_leaf in line.comments_after(LL[i]):
3158 new_line.append(comment_leaf, preformatted=True)
3161 append_leaves(new_line, line, [leaf])
3163 self.add_custom_splits(string_leaf.value, custom_splits)
3167 def __validate_msg(line: Line, string_idx: int) -> TResult[None]:
3168 """Validate (M)erge (S)tring (G)roup
3170 Transform-time string validation logic for __merge_string_group(...).
3173 * Ok(None), if ALL validation checks (listed below) pass.
3175 * Err(CannotTransform), if any of the following are true:
3176 - The target string is not in a string group (i.e. it has no
3178 - The string group has more than one inline comment.
3179 - The string group has an inline comment that appears to be a pragma.
3180 - The set of all string prefixes in the string group is of
3181 length greater than one and is not equal to {"", "f"}.
3182 - The string group consists of raw strings.
3184 num_of_inline_string_comments = 0
3185 set_of_prefixes = set()
3187 for leaf in line.leaves[string_idx:]:
3188 if leaf.type != token.STRING:
3189 # If the string group is trailed by a comma, we count the
3190 # comments trailing the comma to be one of the string group's
3192 if leaf.type == token.COMMA and id(leaf) in line.comments:
3193 num_of_inline_string_comments += 1
3196 if has_triple_quotes(leaf.value):
3197 return TErr("StringMerger does NOT merge multiline strings.")
3200 prefix = get_string_prefix(leaf.value)
3202 return TErr("StringMerger does NOT merge raw strings.")
3204 set_of_prefixes.add(prefix)
3206 if id(leaf) in line.comments:
3207 num_of_inline_string_comments += 1
3208 if contains_pragma_comment(line.comments[id(leaf)]):
3209 return TErr("Cannot merge strings which have pragma comments.")
3211 if num_of_strings < 2:
3213 f"Not enough strings to merge (num_of_strings={num_of_strings})."
3216 if num_of_inline_string_comments > 1:
3218 f"Too many inline string comments ({num_of_inline_string_comments})."
3221 if len(set_of_prefixes) > 1 and set_of_prefixes != {"", "f"}:
3222 return TErr(f"Too many different prefixes ({set_of_prefixes}).")
3227 class StringParenStripper(StringTransformer):
3228 """StringTransformer that strips surrounding parentheses from strings.
3231 The line contains a string which is surrounded by parentheses and:
3232 - The target string is NOT the only argument to a function call).
3233 - The RPAR is NOT followed by an attribute access (i.e. a dot).
3236 The parentheses mentioned in the 'Requirements' section are stripped.
3239 StringParenStripper has its own inherent usefulness, but it is also
3240 relied on to clean up the parentheses created by StringParenWrapper (in
3241 the event that they are no longer needed).
3244 def do_match(self, line: Line) -> TMatchResult:
3247 is_valid_index = is_valid_index_factory(LL)
3249 for (idx, leaf) in enumerate(LL):
3250 # Should be a string...
3251 if leaf.type != token.STRING:
3254 # Should be preceded by a non-empty LPAR...
3256 not is_valid_index(idx - 1)
3257 or LL[idx - 1].type != token.LPAR
3258 or is_empty_lpar(LL[idx - 1])
3262 # That LPAR should NOT be preceded by a function name or a closing
3263 # bracket (which could be a function which returns a function or a
3264 # list/dictionary that contains a function)...
3265 if is_valid_index(idx - 2) and (
3266 LL[idx - 2].type == token.NAME or LL[idx - 2].type in CLOSING_BRACKETS
3272 # Skip the string trailer, if one exists.
3273 string_parser = StringParser()
3274 next_idx = string_parser.parse(LL, string_idx)
3276 # Should be followed by a non-empty RPAR...
3278 is_valid_index(next_idx)
3279 and LL[next_idx].type == token.RPAR
3280 and not is_empty_rpar(LL[next_idx])
3282 # That RPAR should NOT be followed by a '.' symbol.
3283 if is_valid_index(next_idx + 1) and LL[next_idx + 1].type == token.DOT:
3286 return Ok(string_idx)
3288 return TErr("This line has no strings wrapped in parens.")
3290 def do_transform(self, line: Line, string_idx: int) -> Iterator[TResult[Line]]:
3293 string_parser = StringParser()
3294 rpar_idx = string_parser.parse(LL, string_idx)
3296 for leaf in (LL[string_idx - 1], LL[rpar_idx]):
3297 if line.comments_after(leaf):
3299 "Will not strip parentheses which have comments attached to them."
3302 new_line = line.clone()
3303 new_line.comments = line.comments.copy()
3305 append_leaves(new_line, line, LL[: string_idx - 1])
3307 string_leaf = Leaf(token.STRING, LL[string_idx].value)
3308 LL[string_idx - 1].remove()
3309 replace_child(LL[string_idx], string_leaf)
3310 new_line.append(string_leaf)
3313 new_line, line, LL[string_idx + 1 : rpar_idx] + LL[rpar_idx + 1 :],
3316 LL[rpar_idx].remove()
3321 class BaseStringSplitter(StringTransformer):
3323 Abstract class for StringTransformers which transform a Line's strings by splitting
3324 them or placing them on their own lines where necessary to avoid going over
3325 the configured line length.
3328 * The target string value is responsible for the line going over the
3329 line length limit. It follows that after all of black's other line
3330 split methods have been exhausted, this line (or one of the resulting
3331 lines after all line splits are performed) would still be over the
3332 line_length limit unless we split this string.
3334 * The target string is NOT a "pointless" string (i.e. a string that has
3335 no parent or siblings).
3337 * The target string is not followed by an inline comment that appears
3340 * The target string is not a multiline (i.e. triple-quote) string.
3344 def do_splitter_match(self, line: Line) -> TMatchResult:
3346 BaseStringSplitter asks its clients to override this method instead of
3347 `StringTransformer.do_match(...)`.
3349 Follows the same protocol as `StringTransformer.do_match(...)`.
3351 Refer to `help(StringTransformer.do_match)` for more information.
3354 def do_match(self, line: Line) -> TMatchResult:
3355 match_result = self.do_splitter_match(line)
3356 if isinstance(match_result, Err):
3359 string_idx = match_result.ok()
3360 vresult = self.__validate(line, string_idx)
3361 if isinstance(vresult, Err):
3366 def __validate(self, line: Line, string_idx: int) -> TResult[None]:
3368 Checks that @line meets all of the requirements listed in this classes'
3369 docstring. Refer to `help(BaseStringSplitter)` for a detailed
3370 description of those requirements.
3373 * Ok(None), if ALL of the requirements are met.
3375 * Err(CannotTransform), if ANY of the requirements are NOT met.
3379 string_leaf = LL[string_idx]
3381 max_string_length = self.__get_max_string_length(line, string_idx)
3382 if len(string_leaf.value) <= max_string_length:
3384 "The string itself is not what is causing this line to be too long."
3387 if not string_leaf.parent or [L.type for L in string_leaf.parent.children] == [
3392 f"This string ({string_leaf.value}) appears to be pointless (i.e. has"
3396 if id(line.leaves[string_idx]) in line.comments and contains_pragma_comment(
3397 line.comments[id(line.leaves[string_idx])]
3400 "Line appears to end with an inline pragma comment. Splitting the line"
3401 " could modify the pragma's behavior."
3404 if has_triple_quotes(string_leaf.value):
3405 return TErr("We cannot split multiline strings.")
3409 def __get_max_string_length(self, line: Line, string_idx: int) -> int:
3411 Calculates the max string length used when attempting to determine
3412 whether or not the target string is responsible for causing the line to
3413 go over the line length limit.
3415 WARNING: This method is tightly coupled to both StringSplitter and
3416 (especially) StringParenWrapper. There is probably a better way to
3417 accomplish what is being done here.
3420 max_string_length: such that `line.leaves[string_idx].value >
3421 max_string_length` implies that the target string IS responsible
3422 for causing this line to exceed the line length limit.
3426 is_valid_index = is_valid_index_factory(LL)
3428 # We use the shorthand "WMA4" in comments to abbreviate "We must
3429 # account for". When giving examples, we use STRING to mean some/any
3432 # Finally, we use the following convenience variables:
3434 # P: The leaf that is before the target string leaf.
3435 # N: The leaf that is after the target string leaf.
3436 # NN: The leaf that is after N.
3438 # WMA4 the whitespace at the beginning of the line.
3439 offset = line.depth * 4
3441 if is_valid_index(string_idx - 1):
3442 p_idx = string_idx - 1
3444 LL[string_idx - 1].type == token.LPAR
3445 and LL[string_idx - 1].value == ""
3448 # If the previous leaf is an empty LPAR placeholder, we should skip it.
3452 if P.type == token.PLUS:
3453 # WMA4 a space and a '+' character (e.g. `+ STRING`).
3456 if P.type == token.COMMA:
3457 # WMA4 a space, a comma, and a closing bracket [e.g. `), STRING`].
3460 if P.type in [token.COLON, token.EQUAL, token.NAME]:
3461 # This conditional branch is meant to handle dictionary keys,
3462 # variable assignments, 'return STRING' statement lines, and
3463 # 'else STRING' ternary expression lines.
3465 # WMA4 a single space.
3468 # WMA4 the lengths of any leaves that came before that space.
3469 for leaf in LL[: p_idx + 1]:
3470 offset += len(str(leaf))
3472 if is_valid_index(string_idx + 1):
3473 N = LL[string_idx + 1]
3474 if N.type == token.RPAR and N.value == "" and len(LL) > string_idx + 2:
3475 # If the next leaf is an empty RPAR placeholder, we should skip it.
3476 N = LL[string_idx + 2]
3478 if N.type == token.COMMA:
3479 # WMA4 a single comma at the end of the string (e.g `STRING,`).
3482 if is_valid_index(string_idx + 2):
3483 NN = LL[string_idx + 2]
3485 if N.type == token.DOT and NN.type == token.NAME:
3486 # This conditional branch is meant to handle method calls invoked
3487 # off of a string literal up to and including the LPAR character.
3489 # WMA4 the '.' character.
3493 is_valid_index(string_idx + 3)
3494 and LL[string_idx + 3].type == token.LPAR
3496 # WMA4 the left parenthesis character.
3499 # WMA4 the length of the method's name.
3500 offset += len(NN.value)
3502 has_comments = False
3503 for comment_leaf in line.comments_after(LL[string_idx]):
3504 if not has_comments:
3506 # WMA4 two spaces before the '#' character.
3509 # WMA4 the length of the inline comment.
3510 offset += len(comment_leaf.value)
3512 max_string_length = self.line_length - offset
3513 return max_string_length
3516 class StringSplitter(CustomSplitMapMixin, BaseStringSplitter):
3518 StringTransformer that splits "atom" strings (i.e. strings which exist on
3519 lines by themselves).
3522 * The line consists ONLY of a single string (with the exception of a
3523 '+' symbol which MAY exist at the start of the line), MAYBE a string
3524 trailer, and MAYBE a trailing comma.
3526 * All of the requirements listed in BaseStringSplitter's docstring.
3529 The string mentioned in the 'Requirements' section is split into as
3530 many substrings as necessary to adhere to the configured line length.
3532 In the final set of substrings, no substring should be smaller than
3533 MIN_SUBSTR_SIZE characters.
3535 The string will ONLY be split on spaces (i.e. each new substring should
3536 start with a space).
3538 If the string is an f-string, it will NOT be split in the middle of an
3539 f-expression (e.g. in f"FooBar: {foo() if x else bar()}", {foo() if x
3540 else bar()} is an f-expression).
3542 If the string that is being split has an associated set of custom split
3543 records and those custom splits will NOT result in any line going over
3544 the configured line length, those custom splits are used. Otherwise the
3545 string is split as late as possible (from left-to-right) while still
3546 adhering to the transformation rules listed above.
3549 StringSplitter relies on StringMerger to construct the appropriate
3550 CustomSplit objects and add them to the custom split map.
3554 # Matches an "f-expression" (e.g. {var}) that might be found in an f-string.
3562 (?<!\})(?:\}\})*\}(?!\})
3565 def do_splitter_match(self, line: Line) -> TMatchResult:
3568 is_valid_index = is_valid_index_factory(LL)
3572 # The first leaf MAY be a '+' symbol...
3573 if is_valid_index(idx) and LL[idx].type == token.PLUS:
3576 # The next/first leaf MAY be an empty LPAR...
3577 if is_valid_index(idx) and is_empty_lpar(LL[idx]):
3580 # The next/first leaf MUST be a string...
3581 if not is_valid_index(idx) or LL[idx].type != token.STRING:
3582 return TErr("Line does not start with a string.")
3586 # Skip the string trailer, if one exists.
3587 string_parser = StringParser()
3588 idx = string_parser.parse(LL, string_idx)
3590 # That string MAY be followed by an empty RPAR...
3591 if is_valid_index(idx) and is_empty_rpar(LL[idx]):
3594 # That string / empty RPAR leaf MAY be followed by a comma...
3595 if is_valid_index(idx) and LL[idx].type == token.COMMA:
3598 # But no more leaves are allowed...
3599 if is_valid_index(idx):
3600 return TErr("This line does not end with a string.")
3602 return Ok(string_idx)
3604 def do_transform(self, line: Line, string_idx: int) -> Iterator[TResult[Line]]:
3607 QUOTE = LL[string_idx].value[-1]
3609 is_valid_index = is_valid_index_factory(LL)
3610 insert_str_child = insert_str_child_factory(LL[string_idx])
3612 prefix = get_string_prefix(LL[string_idx].value)
3614 # We MAY choose to drop the 'f' prefix from substrings that don't
3615 # contain any f-expressions, but ONLY if the original f-string
3616 # contains at least one f-expression. Otherwise, we will alter the AST
3618 drop_pointless_f_prefix = ("f" in prefix) and re.search(
3619 self.RE_FEXPR, LL[string_idx].value, re.VERBOSE
3622 first_string_line = True
3623 starts_with_plus = LL[0].type == token.PLUS
3625 def line_needs_plus() -> bool:
3626 return first_string_line and starts_with_plus
3628 def maybe_append_plus(new_line: Line) -> None:
3631 If @line starts with a plus and this is the first line we are
3632 constructing, this function appends a PLUS leaf to @new_line
3633 and replaces the old PLUS leaf in the node structure. Otherwise
3634 this function does nothing.
3636 if line_needs_plus():
3637 plus_leaf = Leaf(token.PLUS, "+")
3638 replace_child(LL[0], plus_leaf)
3639 new_line.append(plus_leaf)
3642 is_valid_index(string_idx + 1) and LL[string_idx + 1].type == token.COMMA
3645 def max_last_string() -> int:
3648 The max allowed length of the string value used for the last
3649 line we will construct.
3651 result = self.line_length
3652 result -= line.depth * 4
3653 result -= 1 if ends_with_comma else 0
3654 result -= 2 if line_needs_plus() else 0
3657 # --- Calculate Max Break Index (for string value)
3658 # We start with the line length limit
3659 max_break_idx = self.line_length
3660 # The last index of a string of length N is N-1.
3662 # Leading whitespace is not present in the string value (e.g. Leaf.value).
3663 max_break_idx -= line.depth * 4
3664 if max_break_idx < 0:
3666 f"Unable to split {LL[string_idx].value} at such high of a line depth:"
3671 # Check if StringMerger registered any custom splits.
3672 custom_splits = self.pop_custom_splits(LL[string_idx].value)
3673 # We use them ONLY if none of them would produce lines that exceed the
3675 use_custom_breakpoints = bool(
3677 and all(csplit.break_idx <= max_break_idx for csplit in custom_splits)
3680 # Temporary storage for the remaining chunk of the string line that
3681 # can't fit onto the line currently being constructed.
3682 rest_value = LL[string_idx].value
3684 def more_splits_should_be_made() -> bool:
3687 True iff `rest_value` (the remaining string value from the last
3688 split), should be split again.
3690 if use_custom_breakpoints:
3691 return len(custom_splits) > 1
3693 return len(rest_value) > max_last_string()
3695 string_line_results: List[Ok[Line]] = []
3696 while more_splits_should_be_made():
3697 if use_custom_breakpoints:
3698 # Custom User Split (manual)
3699 csplit = custom_splits.pop(0)
3700 break_idx = csplit.break_idx
3702 # Algorithmic Split (automatic)
3703 max_bidx = max_break_idx - 2 if line_needs_plus() else max_break_idx
3704 maybe_break_idx = self.__get_break_idx(rest_value, max_bidx)
3705 if maybe_break_idx is None:
3706 # If we are unable to algorithmically determine a good split
3707 # and this string has custom splits registered to it, we
3708 # fall back to using them--which means we have to start
3709 # over from the beginning.
3711 rest_value = LL[string_idx].value
3712 string_line_results = []
3713 first_string_line = True
3714 use_custom_breakpoints = True
3717 # Otherwise, we stop splitting here.
3720 break_idx = maybe_break_idx
3722 # --- Construct `next_value`
3723 next_value = rest_value[:break_idx] + QUOTE
3725 # Are we allowed to try to drop a pointless 'f' prefix?
3726 drop_pointless_f_prefix
3727 # If we are, will we be successful?
3728 and next_value != self.__normalize_f_string(next_value, prefix)
3730 # If the current custom split did NOT originally use a prefix,
3731 # then `csplit.break_idx` will be off by one after removing
3735 if use_custom_breakpoints and not csplit.has_prefix
3738 next_value = rest_value[:break_idx] + QUOTE
3739 next_value = self.__normalize_f_string(next_value, prefix)
3741 # --- Construct `next_leaf`
3742 next_leaf = Leaf(token.STRING, next_value)
3743 insert_str_child(next_leaf)
3744 self.__maybe_normalize_string_quotes(next_leaf)
3746 # --- Construct `next_line`
3747 next_line = line.clone()
3748 maybe_append_plus(next_line)
3749 next_line.append(next_leaf)
3750 string_line_results.append(Ok(next_line))
3752 rest_value = prefix + QUOTE + rest_value[break_idx:]
3753 first_string_line = False
3755 yield from string_line_results
3757 if drop_pointless_f_prefix:
3758 rest_value = self.__normalize_f_string(rest_value, prefix)
3760 rest_leaf = Leaf(token.STRING, rest_value)
3761 insert_str_child(rest_leaf)
3763 # NOTE: I could not find a test case that verifies that the following
3764 # line is actually necessary, but it seems to be. Otherwise we risk
3765 # not normalizing the last substring, right?
3766 self.__maybe_normalize_string_quotes(rest_leaf)
3768 last_line = line.clone()
3769 maybe_append_plus(last_line)
3771 # If there are any leaves to the right of the target string...
3772 if is_valid_index(string_idx + 1):
3773 # We use `temp_value` here to determine how long the last line
3774 # would be if we were to append all the leaves to the right of the
3775 # target string to the last string line.
3776 temp_value = rest_value
3777 for leaf in LL[string_idx + 1 :]:
3778 temp_value += str(leaf)
3779 if leaf.type == token.LPAR:
3782 # Try to fit them all on the same line with the last substring...
3784 len(temp_value) <= max_last_string()
3785 or LL[string_idx + 1].type == token.COMMA
3787 last_line.append(rest_leaf)
3788 append_leaves(last_line, line, LL[string_idx + 1 :])
3790 # Otherwise, place the last substring on one line and everything
3791 # else on a line below that...
3793 last_line.append(rest_leaf)
3796 non_string_line = line.clone()
3797 append_leaves(non_string_line, line, LL[string_idx + 1 :])
3798 yield Ok(non_string_line)
3799 # Else the target string was the last leaf...
3801 last_line.append(rest_leaf)
3802 last_line.comments = line.comments.copy()
3805 def __get_break_idx(self, string: str, max_break_idx: int) -> Optional[int]:
3807 This method contains the algorithm that StringSplitter uses to
3808 determine which character to split each string at.
3811 @string: The substring that we are attempting to split.
3812 @max_break_idx: The ideal break index. We will return this value if it
3813 meets all the necessary conditions. In the likely event that it
3814 doesn't we will try to find the closest index BELOW @max_break_idx
3815 that does. If that fails, we will expand our search by also
3816 considering all valid indices ABOVE @max_break_idx.
3819 * assert_is_leaf_string(@string)
3820 * 0 <= @max_break_idx < len(@string)
3823 break_idx, if an index is able to be found that meets all of the
3824 conditions listed in the 'Transformations' section of this classes'
3829 is_valid_index = is_valid_index_factory(string)
3831 assert is_valid_index(max_break_idx)
3832 assert_is_leaf_string(string)
3834 _fexpr_slices: Optional[List[Tuple[Index, Index]]] = None
3836 def fexpr_slices() -> Iterator[Tuple[Index, Index]]:
3839 All ranges of @string which, if @string were to be split there,
3840 would result in the splitting of an f-expression (which is NOT
3843 nonlocal _fexpr_slices
3845 if _fexpr_slices is None:
3847 for match in re.finditer(self.RE_FEXPR, string, re.VERBOSE):
3848 _fexpr_slices.append(match.span())
3850 yield from _fexpr_slices
3852 is_fstring = "f" in get_string_prefix(string)
3854 def breaks_fstring_expression(i: Index) -> bool:
3857 True iff returning @i would result in the splitting of an
3858 f-expression (which is NOT allowed).
3863 for (start, end) in fexpr_slices():
3864 if start <= i < end:
3869 def passes_all_checks(i: Index) -> bool:
3872 True iff ALL of the conditions listed in the 'Transformations'
3873 section of this classes' docstring would be be met by returning @i.
3875 is_space = string[i] == " "
3877 len(string[i:]) >= self.MIN_SUBSTR_SIZE
3878 and len(string[:i]) >= self.MIN_SUBSTR_SIZE
3880 return is_space and is_big_enough and not breaks_fstring_expression(i)
3882 # First, we check all indices BELOW @max_break_idx.
3883 break_idx = max_break_idx
3884 while is_valid_index(break_idx - 1) and not passes_all_checks(break_idx):
3887 if not passes_all_checks(break_idx):
3888 # If that fails, we check all indices ABOVE @max_break_idx.
3890 # If we are able to find a valid index here, the next line is going
3891 # to be longer than the specified line length, but it's probably
3892 # better than doing nothing at all.
3893 break_idx = max_break_idx + 1
3894 while is_valid_index(break_idx + 1) and not passes_all_checks(break_idx):
3897 if not is_valid_index(break_idx) or not passes_all_checks(break_idx):
3902 def __maybe_normalize_string_quotes(self, leaf: Leaf) -> None:
3903 if self.normalize_strings:
3904 normalize_string_quotes(leaf)
3906 def __normalize_f_string(self, string: str, prefix: str) -> str:
3909 * assert_is_leaf_string(@string)
3912 * If @string is an f-string that contains no f-expressions, we
3913 return a string identical to @string except that the 'f' prefix
3914 has been stripped and all double braces (i.e. '{{' or '}}') have
3915 been normalized (i.e. turned into '{' or '}').
3917 * Otherwise, we return @string.
3919 assert_is_leaf_string(string)
3921 if "f" in prefix and not re.search(self.RE_FEXPR, string, re.VERBOSE):
3922 new_prefix = prefix.replace("f", "")
3924 temp = string[len(prefix) :]
3925 temp = re.sub(r"\{\{", "{", temp)
3926 temp = re.sub(r"\}\}", "}", temp)
3929 return f"{new_prefix}{new_string}"
3934 class StringParenWrapper(CustomSplitMapMixin, BaseStringSplitter):
3936 StringTransformer that splits non-"atom" strings (i.e. strings that do not
3937 exist on lines by themselves).
3940 All of the requirements listed in BaseStringSplitter's docstring in
3941 addition to the requirements listed below:
3943 * The line is a return/yield statement, which returns/yields a string.
3945 * The line is part of a ternary expression (e.g. `x = y if cond else
3946 z`) such that the line starts with `else <string>`, where <string> is
3949 * The line is an assert statement, which ends with a string.
3951 * The line is an assignment statement (e.g. `x = <string>` or `x +=
3952 <string>`) such that the variable is being assigned the value of some
3955 * The line is a dictionary key assignment where some valid key is being
3956 assigned the value of some string.
3959 The chosen string is wrapped in parentheses and then split at the LPAR.
3961 We then have one line which ends with an LPAR and another line that
3962 starts with the chosen string. The latter line is then split again at
3963 the RPAR. This results in the RPAR (and possibly a trailing comma)
3964 being placed on its own line.
3966 NOTE: If any leaves exist to the right of the chosen string (except
3967 for a trailing comma, which would be placed after the RPAR), those
3968 leaves are placed inside the parentheses. In effect, the chosen
3969 string is not necessarily being "wrapped" by parentheses. We can,
3970 however, count on the LPAR being placed directly before the chosen
3973 In other words, StringParenWrapper creates "atom" strings. These
3974 can then be split again by StringSplitter, if necessary.
3977 In the event that a string line split by StringParenWrapper is
3978 changed such that it no longer needs to be given its own line,
3979 StringParenWrapper relies on StringParenStripper to clean up the
3980 parentheses it created.
3983 def do_splitter_match(self, line: Line) -> TMatchResult:
3987 string_idx = string_idx or self._return_match(LL)
3988 string_idx = string_idx or self._else_match(LL)
3989 string_idx = string_idx or self._assert_match(LL)
3990 string_idx = string_idx or self._assign_match(LL)
3991 string_idx = string_idx or self._dict_match(LL)
3993 if string_idx is not None:
3994 string_value = line.leaves[string_idx].value
3995 # If the string has no spaces...
3996 if " " not in string_value:
3997 # And will still violate the line length limit when split...
3998 max_string_length = self.line_length - ((line.depth + 1) * 4)
3999 if len(string_value) > max_string_length:
4000 # And has no associated custom splits...
4001 if not self.has_custom_splits(string_value):
4002 # Then we should NOT put this string on its own line.
4004 "We do not wrap long strings in parentheses when the"
4005 " resultant line would still be over the specified line"
4006 " length and can't be split further by StringSplitter."
4008 return Ok(string_idx)
4010 return TErr("This line does not contain any non-atomic strings.")
4013 def _return_match(LL: List[Leaf]) -> Optional[int]:
4016 string_idx such that @LL[string_idx] is equal to our target (i.e.
4017 matched) string, if this line matches the return/yield statement
4018 requirements listed in the 'Requirements' section of this classes'
4023 # If this line is apart of a return/yield statement and the first leaf
4024 # contains either the "return" or "yield" keywords...
4025 if parent_type(LL[0]) in [syms.return_stmt, syms.yield_expr] and LL[
4027 ].value in ["return", "yield"]:
4028 is_valid_index = is_valid_index_factory(LL)
4030 idx = 2 if is_valid_index(1) and is_empty_par(LL[1]) else 1
4031 # The next visible leaf MUST contain a string...
4032 if is_valid_index(idx) and LL[idx].type == token.STRING:
4038 def _else_match(LL: List[Leaf]) -> Optional[int]:
4041 string_idx such that @LL[string_idx] is equal to our target (i.e.
4042 matched) string, if this line matches the ternary expression
4043 requirements listed in the 'Requirements' section of this classes'
4048 # If this line is apart of a ternary expression and the first leaf
4049 # contains the "else" keyword...
4051 parent_type(LL[0]) == syms.test
4052 and LL[0].type == token.NAME
4053 and LL[0].value == "else"
4055 is_valid_index = is_valid_index_factory(LL)
4057 idx = 2 if is_valid_index(1) and is_empty_par(LL[1]) else 1
4058 # The next visible leaf MUST contain a string...
4059 if is_valid_index(idx) and LL[idx].type == token.STRING:
4065 def _assert_match(LL: List[Leaf]) -> Optional[int]:
4068 string_idx such that @LL[string_idx] is equal to our target (i.e.
4069 matched) string, if this line matches the assert statement
4070 requirements listed in the 'Requirements' section of this classes'
4075 # If this line is apart of an assert statement and the first leaf
4076 # contains the "assert" keyword...
4077 if parent_type(LL[0]) == syms.assert_stmt and LL[0].value == "assert":
4078 is_valid_index = is_valid_index_factory(LL)
4080 for (i, leaf) in enumerate(LL):
4081 # We MUST find a comma...
4082 if leaf.type == token.COMMA:
4083 idx = i + 2 if is_empty_par(LL[i + 1]) else i + 1
4085 # That comma MUST be followed by a string...
4086 if is_valid_index(idx) and LL[idx].type == token.STRING:
4089 # Skip the string trailer, if one exists.
4090 string_parser = StringParser()
4091 idx = string_parser.parse(LL, string_idx)
4093 # But no more leaves are allowed...
4094 if not is_valid_index(idx):
4100 def _assign_match(LL: List[Leaf]) -> Optional[int]:
4103 string_idx such that @LL[string_idx] is equal to our target (i.e.
4104 matched) string, if this line matches the assignment statement
4105 requirements listed in the 'Requirements' section of this classes'
4110 # If this line is apart of an expression statement or is a function
4111 # argument AND the first leaf contains a variable name...
4113 parent_type(LL[0]) in [syms.expr_stmt, syms.argument, syms.power]
4114 and LL[0].type == token.NAME
4116 is_valid_index = is_valid_index_factory(LL)
4118 for (i, leaf) in enumerate(LL):
4119 # We MUST find either an '=' or '+=' symbol...
4120 if leaf.type in [token.EQUAL, token.PLUSEQUAL]:
4121 idx = i + 2 if is_empty_par(LL[i + 1]) else i + 1
4123 # That symbol MUST be followed by a string...
4124 if is_valid_index(idx) and LL[idx].type == token.STRING:
4127 # Skip the string trailer, if one exists.
4128 string_parser = StringParser()
4129 idx = string_parser.parse(LL, string_idx)
4131 # The next leaf MAY be a comma iff this line is apart
4132 # of a function argument...
4134 parent_type(LL[0]) == syms.argument
4135 and is_valid_index(idx)
4136 and LL[idx].type == token.COMMA
4140 # But no more leaves are allowed...
4141 if not is_valid_index(idx):
4147 def _dict_match(LL: List[Leaf]) -> Optional[int]:
4150 string_idx such that @LL[string_idx] is equal to our target (i.e.
4151 matched) string, if this line matches the dictionary key assignment
4152 statement requirements listed in the 'Requirements' section of this
4157 # If this line is apart of a dictionary key assignment...
4158 if syms.dictsetmaker in [parent_type(LL[0]), parent_type(LL[0].parent)]:
4159 is_valid_index = is_valid_index_factory(LL)
4161 for (i, leaf) in enumerate(LL):
4162 # We MUST find a colon...
4163 if leaf.type == token.COLON:
4164 idx = i + 2 if is_empty_par(LL[i + 1]) else i + 1
4166 # That colon MUST be followed by a string...
4167 if is_valid_index(idx) and LL[idx].type == token.STRING:
4170 # Skip the string trailer, if one exists.
4171 string_parser = StringParser()
4172 idx = string_parser.parse(LL, string_idx)
4174 # That string MAY be followed by a comma...
4175 if is_valid_index(idx) and LL[idx].type == token.COMMA:
4178 # But no more leaves are allowed...
4179 if not is_valid_index(idx):
4184 def do_transform(self, line: Line, string_idx: int) -> Iterator[TResult[Line]]:
4187 is_valid_index = is_valid_index_factory(LL)
4188 insert_str_child = insert_str_child_factory(LL[string_idx])
4190 comma_idx = len(LL) - 1
4191 ends_with_comma = False
4192 if LL[comma_idx].type == token.COMMA:
4193 ends_with_comma = True
4195 leaves_to_steal_comments_from = [LL[string_idx]]
4197 leaves_to_steal_comments_from.append(LL[comma_idx])
4200 first_line = line.clone()
4201 left_leaves = LL[:string_idx]
4203 # We have to remember to account for (possibly invisible) LPAR and RPAR
4204 # leaves that already wrapped the target string. If these leaves do
4205 # exist, we will replace them with our own LPAR and RPAR leaves.
4206 old_parens_exist = False
4207 if left_leaves and left_leaves[-1].type == token.LPAR:
4208 old_parens_exist = True
4209 leaves_to_steal_comments_from.append(left_leaves[-1])
4212 append_leaves(first_line, line, left_leaves)
4214 lpar_leaf = Leaf(token.LPAR, "(")
4215 if old_parens_exist:
4216 replace_child(LL[string_idx - 1], lpar_leaf)
4218 insert_str_child(lpar_leaf)
4219 first_line.append(lpar_leaf)
4221 # We throw inline comments that were originally to the right of the
4222 # target string to the top line. They will now be shown to the right of
4224 for leaf in leaves_to_steal_comments_from:
4225 for comment_leaf in line.comments_after(leaf):
4226 first_line.append(comment_leaf, preformatted=True)
4228 yield Ok(first_line)
4230 # --- Middle (String) Line
4231 # We only need to yield one (possibly too long) string line, since the
4232 # `StringSplitter` will break it down further if necessary.
4233 string_value = LL[string_idx].value
4235 depth=line.depth + 1,
4236 inside_brackets=True,
4237 should_explode=line.should_explode,
4239 string_leaf = Leaf(token.STRING, string_value)
4240 insert_str_child(string_leaf)
4241 string_line.append(string_leaf)
4243 old_rpar_leaf = None
4244 if is_valid_index(string_idx + 1):
4245 right_leaves = LL[string_idx + 1 :]
4249 if old_parens_exist:
4251 right_leaves and right_leaves[-1].type == token.RPAR
4252 ), "Apparently, old parentheses do NOT exist?!"
4253 old_rpar_leaf = right_leaves.pop()
4255 append_leaves(string_line, line, right_leaves)
4257 yield Ok(string_line)
4260 last_line = line.clone()
4261 last_line.bracket_tracker = first_line.bracket_tracker
4263 new_rpar_leaf = Leaf(token.RPAR, ")")
4264 if old_rpar_leaf is not None:
4265 replace_child(old_rpar_leaf, new_rpar_leaf)
4267 insert_str_child(new_rpar_leaf)
4268 last_line.append(new_rpar_leaf)
4270 # If the target string ended with a comma, we place this comma to the
4271 # right of the RPAR on the last line.
4273 comma_leaf = Leaf(token.COMMA, ",")
4274 replace_child(LL[comma_idx], comma_leaf)
4275 last_line.append(comma_leaf)
4282 A state machine that aids in parsing a string's "trailer", which can be
4283 either non-existent, an old-style formatting sequence (e.g. `% varX` or `%
4284 (varX, varY)`), or a method-call / attribute access (e.g. `.format(varX,
4287 NOTE: A new StringParser object MUST be instantiated for each string
4288 trailer we need to parse.
4291 We shall assume that `line` equals the `Line` object that corresponds
4292 to the following line of python code:
4294 x = "Some {}.".format("String") + some_other_string
4297 Furthermore, we will assume that `string_idx` is some index such that:
4299 assert line.leaves[string_idx].value == "Some {}."
4302 The following code snippet then holds:
4304 string_parser = StringParser()
4305 idx = string_parser.parse(line.leaves, string_idx)
4306 assert line.leaves[idx].type == token.PLUS
4312 # String Parser States
4322 # Lookup Table for Next State
4323 _goto: Dict[Tuple[ParserState, NodeType], ParserState] = {
4324 # A string trailer may start with '.' OR '%'.
4325 (START, token.DOT): DOT,
4326 (START, token.PERCENT): PERCENT,
4327 (START, DEFAULT_TOKEN): DONE,
4328 # A '.' MUST be followed by an attribute or method name.
4329 (DOT, token.NAME): NAME,
4330 # A method name MUST be followed by an '(', whereas an attribute name
4331 # is the last symbol in the string trailer.
4332 (NAME, token.LPAR): LPAR,
4333 (NAME, DEFAULT_TOKEN): DONE,
4334 # A '%' symbol can be followed by an '(' or a single argument (e.g. a
4335 # string or variable name).
4336 (PERCENT, token.LPAR): LPAR,
4337 (PERCENT, DEFAULT_TOKEN): SINGLE_FMT_ARG,
4338 # If a '%' symbol is followed by a single argument, that argument is
4339 # the last leaf in the string trailer.
4340 (SINGLE_FMT_ARG, DEFAULT_TOKEN): DONE,
4341 # If present, a ')' symbol is the last symbol in a string trailer.
4342 # (NOTE: LPARS and nested RPARS are not included in this lookup table,
4343 # since they are treated as a special case by the parsing logic in this
4344 # classes' implementation.)
4345 (RPAR, DEFAULT_TOKEN): DONE,
4348 def __init__(self) -> None:
4349 self._state = self.START
4350 self._unmatched_lpars = 0
4352 def parse(self, leaves: List[Leaf], string_idx: int) -> int:
4355 * @leaves[@string_idx].type == token.STRING
4358 The index directly after the last leaf which is apart of the string
4359 trailer, if a "trailer" exists.
4361 @string_idx + 1, if no string "trailer" exists.
4363 assert leaves[string_idx].type == token.STRING
4365 idx = string_idx + 1
4366 while idx < len(leaves) and self._next_state(leaves[idx]):
4370 def _next_state(self, leaf: Leaf) -> bool:
4373 * On the first call to this function, @leaf MUST be the leaf that
4374 was directly after the string leaf in question (e.g. if our target
4375 string is `line.leaves[i]` then the first call to this method must
4376 be `line.leaves[i + 1]`).
4377 * On the next call to this function, the leaf parameter passed in
4378 MUST be the leaf directly following @leaf.
4381 True iff @leaf is apart of the string's trailer.
4383 # We ignore empty LPAR or RPAR leaves.
4384 if is_empty_par(leaf):
4387 next_token = leaf.type
4388 if next_token == token.LPAR:
4389 self._unmatched_lpars += 1
4391 current_state = self._state
4393 # The LPAR parser state is a special case. We will return True until we
4394 # find the matching RPAR token.
4395 if current_state == self.LPAR:
4396 if next_token == token.RPAR:
4397 self._unmatched_lpars -= 1
4398 if self._unmatched_lpars == 0:
4399 self._state = self.RPAR
4400 # Otherwise, we use a lookup table to determine the next state.
4402 # If the lookup table matches the current state to the next
4403 # token, we use the lookup table.
4404 if (current_state, next_token) in self._goto:
4405 self._state = self._goto[current_state, next_token]
4407 # Otherwise, we check if a the current state was assigned a
4409 if (current_state, self.DEFAULT_TOKEN) in self._goto:
4410 self._state = self._goto[current_state, self.DEFAULT_TOKEN]
4411 # If no default has been assigned, then this parser has a logic
4414 raise RuntimeError(f"{self.__class__.__name__} LOGIC ERROR!")
4416 if self._state == self.DONE:
4422 def TErr(err_msg: str) -> Err[CannotTransform]:
4425 Convenience function used when working with the TResult type.
4427 cant_transform = CannotTransform(err_msg)
4428 return Err(cant_transform)
4431 def contains_pragma_comment(comment_list: List[Leaf]) -> bool:
4434 True iff one of the comments in @comment_list is a pragma used by one
4435 of the more common static analysis tools for python (e.g. mypy, flake8,
4438 for comment in comment_list:
4439 if comment.value.startswith(("# type:", "# noqa", "# pylint:")):
4445 def insert_str_child_factory(string_leaf: Leaf) -> Callable[[LN], None]:
4447 Factory for a convenience function that is used to orphan @string_leaf
4448 and then insert multiple new leaves into the same part of the node
4449 structure that @string_leaf had originally occupied.
4452 Let `string_leaf = Leaf(token.STRING, '"foo"')` and `N =
4453 string_leaf.parent`. Assume the node `N` has the following
4460 Leaf(STRING, '"foo"'),
4464 We then run the code snippet shown below.
4466 insert_str_child = insert_str_child_factory(string_leaf)
4468 lpar = Leaf(token.LPAR, '(')
4469 insert_str_child(lpar)
4471 bar = Leaf(token.STRING, '"bar"')
4472 insert_str_child(bar)
4474 rpar = Leaf(token.RPAR, ')')
4475 insert_str_child(rpar)
4478 After which point, it follows that `string_leaf.parent is None` and
4479 the node `N` now has the following structure:
4486 Leaf(STRING, '"bar"'),
4491 string_parent = string_leaf.parent
4492 string_child_idx = string_leaf.remove()
4494 def insert_str_child(child: LN) -> None:
4495 nonlocal string_child_idx
4497 assert string_parent is not None
4498 assert string_child_idx is not None
4500 string_parent.insert_child(string_child_idx, child)
4501 string_child_idx += 1
4503 return insert_str_child
4506 def has_triple_quotes(string: str) -> bool:
4509 True iff @string starts with three quotation characters.
4511 raw_string = string.lstrip(STRING_PREFIX_CHARS)
4512 return raw_string[:3] in {'"""', "'''"}
4515 def parent_type(node: Optional[LN]) -> Optional[NodeType]:
4518 @node.parent.type, if @node is not None and has a parent.
4522 if node is None or node.parent is None:
4525 return node.parent.type
4528 def is_empty_par(leaf: Leaf) -> bool:
4529 return is_empty_lpar(leaf) or is_empty_rpar(leaf)
4532 def is_empty_lpar(leaf: Leaf) -> bool:
4533 return leaf.type == token.LPAR and leaf.value == ""
4536 def is_empty_rpar(leaf: Leaf) -> bool:
4537 return leaf.type == token.RPAR and leaf.value == ""
4540 def is_valid_index_factory(seq: Sequence[Any]) -> Callable[[int], bool]:
4546 is_valid_index = is_valid_index_factory(my_list)
4548 assert is_valid_index(0)
4549 assert is_valid_index(2)
4551 assert not is_valid_index(3)
4552 assert not is_valid_index(-1)
4556 def is_valid_index(idx: int) -> bool:
4559 True iff @idx is positive AND seq[@idx] does NOT raise an
4562 return 0 <= idx < len(seq)
4564 return is_valid_index
4567 def line_to_string(line: Line) -> str:
4568 """Returns the string representation of @line.
4570 WARNING: This is known to be computationally expensive.
4572 return str(line).strip("\n")
4575 def append_leaves(new_line: Line, old_line: Line, leaves: List[Leaf]) -> None:
4577 Append leaves (taken from @old_line) to @new_line, making sure to fix the
4578 underlying Node structure where appropriate.
4580 All of the leaves in @leaves are duplicated. The duplicates are then
4581 appended to @new_line and used to replace their originals in the underlying
4582 Node structure. Any comments attached to the old leaves are reattached to
4586 set(@leaves) is a subset of set(@old_line.leaves).
4588 for old_leaf in leaves:
4589 assert old_leaf in old_line.leaves
4591 new_leaf = Leaf(old_leaf.type, old_leaf.value)
4592 replace_child(old_leaf, new_leaf)
4593 new_line.append(new_leaf)
4595 for comment_leaf in old_line.comments_after(old_leaf):
4596 new_line.append(comment_leaf, preformatted=True)
4599 def replace_child(old_child: LN, new_child: LN) -> None:
4602 * If @old_child.parent is set, replace @old_child with @new_child in
4603 @old_child's underlying Node structure.
4605 * Otherwise, this function does nothing.
4607 parent = old_child.parent
4611 child_idx = old_child.remove()
4612 if child_idx is not None:
4613 parent.insert_child(child_idx, new_child)
4616 def get_string_prefix(string: str) -> str:
4619 * assert_is_leaf_string(@string)
4622 @string's prefix (e.g. '', 'r', 'f', or 'rf').
4624 assert_is_leaf_string(string)
4628 while string[prefix_idx] in STRING_PREFIX_CHARS:
4629 prefix += string[prefix_idx].lower()
4635 def assert_is_leaf_string(string: str) -> None:
4637 Checks the pre-condition that @string has the format that you would expect
4638 of `leaf.value` where `leaf` is some Leaf such that `leaf.type ==
4639 token.STRING`. A more precise description of the pre-conditions that are
4640 checked are listed below.
4643 * @string starts with either ', ", <prefix>', or <prefix>" where
4644 `set(<prefix>)` is some subset of `set(STRING_PREFIX_CHARS)`.
4645 * @string ends with a quote character (' or ").
4648 AssertionError(...) if the pre-conditions listed above are not
4651 dquote_idx = string.find('"')
4652 squote_idx = string.find("'")
4653 if -1 in [dquote_idx, squote_idx]:
4654 quote_idx = max(dquote_idx, squote_idx)
4656 quote_idx = min(squote_idx, dquote_idx)
4659 0 <= quote_idx < len(string) - 1
4660 ), f"{string!r} is missing a starting quote character (' or \")."
4661 assert string[-1] in (
4664 ), f"{string!r} is missing an ending quote character (' or \")."
4665 assert set(string[:quote_idx]).issubset(
4666 set(STRING_PREFIX_CHARS)
4667 ), f"{set(string[:quote_idx])} is NOT a subset of {set(STRING_PREFIX_CHARS)}."
4670 def left_hand_split(line: Line, _features: Collection[Feature] = ()) -> Iterator[Line]:
4671 """Split line into many lines, starting with the first matching bracket pair.
4673 Note: this usually looks weird, only use this for function definitions.
4674 Prefer RHS otherwise. This is why this function is not symmetrical with
4675 :func:`right_hand_split` which also handles optional parentheses.
4677 tail_leaves: List[Leaf] = []
4678 body_leaves: List[Leaf] = []
4679 head_leaves: List[Leaf] = []
4680 current_leaves = head_leaves
4681 matching_bracket: Optional[Leaf] = None
4682 for leaf in line.leaves:
4684 current_leaves is body_leaves
4685 and leaf.type in CLOSING_BRACKETS
4686 and leaf.opening_bracket is matching_bracket
4688 current_leaves = tail_leaves if body_leaves else head_leaves
4689 current_leaves.append(leaf)
4690 if current_leaves is head_leaves:
4691 if leaf.type in OPENING_BRACKETS:
4692 matching_bracket = leaf
4693 current_leaves = body_leaves
4694 if not matching_bracket:
4695 raise CannotSplit("No brackets found")
4697 head = bracket_split_build_line(head_leaves, line, matching_bracket)
4698 body = bracket_split_build_line(body_leaves, line, matching_bracket, is_body=True)
4699 tail = bracket_split_build_line(tail_leaves, line, matching_bracket)
4700 bracket_split_succeeded_or_raise(head, body, tail)
4701 for result in (head, body, tail):
4706 def right_hand_split(
4709 features: Collection[Feature] = (),
4710 omit: Collection[LeafID] = (),
4711 ) -> Iterator[Line]:
4712 """Split line into many lines, starting with the last matching bracket pair.
4714 If the split was by optional parentheses, attempt splitting without them, too.
4715 `omit` is a collection of closing bracket IDs that shouldn't be considered for
4718 Note: running this function modifies `bracket_depth` on the leaves of `line`.
4720 tail_leaves: List[Leaf] = []
4721 body_leaves: List[Leaf] = []
4722 head_leaves: List[Leaf] = []
4723 current_leaves = tail_leaves
4724 opening_bracket: Optional[Leaf] = None
4725 closing_bracket: Optional[Leaf] = None
4726 for leaf in reversed(line.leaves):
4727 if current_leaves is body_leaves:
4728 if leaf is opening_bracket:
4729 current_leaves = head_leaves if body_leaves else tail_leaves
4730 current_leaves.append(leaf)
4731 if current_leaves is tail_leaves:
4732 if leaf.type in CLOSING_BRACKETS and id(leaf) not in omit:
4733 opening_bracket = leaf.opening_bracket
4734 closing_bracket = leaf
4735 current_leaves = body_leaves
4736 if not (opening_bracket and closing_bracket and head_leaves):
4737 # If there is no opening or closing_bracket that means the split failed and
4738 # all content is in the tail. Otherwise, if `head_leaves` are empty, it means
4739 # the matching `opening_bracket` wasn't available on `line` anymore.
4740 raise CannotSplit("No brackets found")
4742 tail_leaves.reverse()
4743 body_leaves.reverse()
4744 head_leaves.reverse()
4745 head = bracket_split_build_line(head_leaves, line, opening_bracket)
4746 body = bracket_split_build_line(body_leaves, line, opening_bracket, is_body=True)
4747 tail = bracket_split_build_line(tail_leaves, line, opening_bracket)
4748 bracket_split_succeeded_or_raise(head, body, tail)
4750 # the body shouldn't be exploded
4751 not body.should_explode
4752 # the opening bracket is an optional paren
4753 and opening_bracket.type == token.LPAR
4754 and not opening_bracket.value
4755 # the closing bracket is an optional paren
4756 and closing_bracket.type == token.RPAR
4757 and not closing_bracket.value
4758 # it's not an import (optional parens are the only thing we can split on
4759 # in this case; attempting a split without them is a waste of time)
4760 and not line.is_import
4761 # there are no standalone comments in the body
4762 and not body.contains_standalone_comments(0)
4763 # and we can actually remove the parens
4764 and can_omit_invisible_parens(body, line_length)
4766 omit = {id(closing_bracket), *omit}
4768 yield from right_hand_split(line, line_length, features=features, omit=omit)
4774 or is_line_short_enough(body, line_length=line_length)
4777 "Splitting failed, body is still too long and can't be split."
4780 elif head.contains_multiline_strings() or tail.contains_multiline_strings():
4782 "The current optional pair of parentheses is bound to fail to"
4783 " satisfy the splitting algorithm because the head or the tail"
4784 " contains multiline strings which by definition never fit one"
4788 ensure_visible(opening_bracket)
4789 ensure_visible(closing_bracket)
4790 for result in (head, body, tail):
4795 def bracket_split_succeeded_or_raise(head: Line, body: Line, tail: Line) -> None:
4796 """Raise :exc:`CannotSplit` if the last left- or right-hand split failed.
4798 Do nothing otherwise.
4800 A left- or right-hand split is based on a pair of brackets. Content before
4801 (and including) the opening bracket is left on one line, content inside the
4802 brackets is put on a separate line, and finally content starting with and
4803 following the closing bracket is put on a separate line.
4805 Those are called `head`, `body`, and `tail`, respectively. If the split
4806 produced the same line (all content in `head`) or ended up with an empty `body`
4807 and the `tail` is just the closing bracket, then it's considered failed.
4809 tail_len = len(str(tail).strip())
4812 raise CannotSplit("Splitting brackets produced the same line")
4816 f"Splitting brackets on an empty body to save {tail_len} characters is"
4821 def bracket_split_build_line(
4822 leaves: List[Leaf], original: Line, opening_bracket: Leaf, *, is_body: bool = False
4824 """Return a new line with given `leaves` and respective comments from `original`.
4826 If `is_body` is True, the result line is one-indented inside brackets and as such
4827 has its first leaf's prefix normalized and a trailing comma added when expected.
4829 result = Line(depth=original.depth)
4831 result.inside_brackets = True
4834 # Since body is a new indent level, remove spurious leading whitespace.
4835 normalize_prefix(leaves[0], inside_brackets=True)
4836 # Ensure a trailing comma for imports and standalone function arguments, but
4837 # be careful not to add one after any comments or within type annotations.
4840 and opening_bracket.value == "("
4841 and not any(leaf.type == token.COMMA for leaf in leaves)
4844 if original.is_import or no_commas:
4845 for i in range(len(leaves) - 1, -1, -1):
4846 if leaves[i].type == STANDALONE_COMMENT:
4849 if leaves[i].type != token.COMMA:
4850 leaves.insert(i + 1, Leaf(token.COMMA, ","))
4855 result.append(leaf, preformatted=True)
4856 for comment_after in original.comments_after(leaf):
4857 result.append(comment_after, preformatted=True)
4859 result.should_explode = should_explode(result, opening_bracket)
4863 def dont_increase_indentation(split_func: Transformer) -> Transformer:
4864 """Normalize prefix of the first leaf in every line returned by `split_func`.
4866 This is a decorator over relevant split functions.
4870 def split_wrapper(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
4871 for line in split_func(line, features):
4872 normalize_prefix(line.leaves[0], inside_brackets=True)
4875 return split_wrapper
4878 @dont_increase_indentation
4879 def delimiter_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
4880 """Split according to delimiters of the highest priority.
4882 If the appropriate Features are given, the split will add trailing commas
4883 also in function signatures and calls that contain `*` and `**`.
4886 last_leaf = line.leaves[-1]
4888 raise CannotSplit("Line empty")
4890 bt = line.bracket_tracker
4892 delimiter_priority = bt.max_delimiter_priority(exclude={id(last_leaf)})
4894 raise CannotSplit("No delimiters found")
4896 if delimiter_priority == DOT_PRIORITY:
4897 if bt.delimiter_count_with_priority(delimiter_priority) == 1:
4898 raise CannotSplit("Splitting a single attribute from its owner looks wrong")
4900 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
4901 lowest_depth = sys.maxsize
4902 trailing_comma_safe = True
4904 def append_to_line(leaf: Leaf) -> Iterator[Line]:
4905 """Append `leaf` to current line or to new line if appending impossible."""
4906 nonlocal current_line
4908 current_line.append_safe(leaf, preformatted=True)
4912 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
4913 current_line.append(leaf)
4915 for leaf in line.leaves:
4916 yield from append_to_line(leaf)
4918 for comment_after in line.comments_after(leaf):
4919 yield from append_to_line(comment_after)
4921 lowest_depth = min(lowest_depth, leaf.bracket_depth)
4922 if leaf.bracket_depth == lowest_depth:
4923 if is_vararg(leaf, within={syms.typedargslist}):
4924 trailing_comma_safe = (
4925 trailing_comma_safe and Feature.TRAILING_COMMA_IN_DEF in features
4927 elif is_vararg(leaf, within={syms.arglist, syms.argument}):
4928 trailing_comma_safe = (
4929 trailing_comma_safe and Feature.TRAILING_COMMA_IN_CALL in features
4932 leaf_priority = bt.delimiters.get(id(leaf))
4933 if leaf_priority == delimiter_priority:
4936 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
4940 and delimiter_priority == COMMA_PRIORITY
4941 and current_line.leaves[-1].type != token.COMMA
4942 and current_line.leaves[-1].type != STANDALONE_COMMENT
4944 current_line.append(Leaf(token.COMMA, ","))
4948 @dont_increase_indentation
4949 def standalone_comment_split(
4950 line: Line, features: Collection[Feature] = ()
4951 ) -> Iterator[Line]:
4952 """Split standalone comments from the rest of the line."""
4953 if not line.contains_standalone_comments(0):
4954 raise CannotSplit("Line does not have any standalone comments")
4956 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
4958 def append_to_line(leaf: Leaf) -> Iterator[Line]:
4959 """Append `leaf` to current line or to new line if appending impossible."""
4960 nonlocal current_line
4962 current_line.append_safe(leaf, preformatted=True)
4966 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
4967 current_line.append(leaf)
4969 for leaf in line.leaves:
4970 yield from append_to_line(leaf)
4972 for comment_after in line.comments_after(leaf):
4973 yield from append_to_line(comment_after)
4979 def is_import(leaf: Leaf) -> bool:
4980 """Return True if the given leaf starts an import statement."""
4987 (v == "import" and p and p.type == syms.import_name)
4988 or (v == "from" and p and p.type == syms.import_from)
4993 def is_type_comment(leaf: Leaf, suffix: str = "") -> bool:
4994 """Return True if the given leaf is a special comment.
4995 Only returns true for type comments for now."""
4998 return t in {token.COMMENT, STANDALONE_COMMENT} and v.startswith("# type:" + suffix)
5001 def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
5002 """Leave existing extra newlines if not `inside_brackets`. Remove everything
5005 Note: don't use backslashes for formatting or you'll lose your voting rights.
5007 if not inside_brackets:
5008 spl = leaf.prefix.split("#")
5009 if "\\" not in spl[0]:
5010 nl_count = spl[-1].count("\n")
5013 leaf.prefix = "\n" * nl_count
5019 def normalize_string_prefix(leaf: Leaf, remove_u_prefix: bool = False) -> None:
5020 """Make all string prefixes lowercase.
5022 If remove_u_prefix is given, also removes any u prefix from the string.
5024 Note: Mutates its argument.
5026 match = re.match(r"^([" + STRING_PREFIX_CHARS + r"]*)(.*)$", leaf.value, re.DOTALL)
5027 assert match is not None, f"failed to match string {leaf.value!r}"
5028 orig_prefix = match.group(1)
5029 new_prefix = orig_prefix.replace("F", "f").replace("B", "b").replace("U", "u")
5031 new_prefix = new_prefix.replace("u", "")
5032 leaf.value = f"{new_prefix}{match.group(2)}"
5035 def normalize_string_quotes(leaf: Leaf) -> None:
5036 """Prefer double quotes but only if it doesn't cause more escaping.
5038 Adds or removes backslashes as appropriate. Doesn't parse and fix
5039 strings nested in f-strings (yet).
5041 Note: Mutates its argument.
5043 value = leaf.value.lstrip(STRING_PREFIX_CHARS)
5044 if value[:3] == '"""':
5047 elif value[:3] == "'''":
5050 elif value[0] == '"':
5056 first_quote_pos = leaf.value.find(orig_quote)
5057 if first_quote_pos == -1:
5058 return # There's an internal error
5060 prefix = leaf.value[:first_quote_pos]
5061 unescaped_new_quote = re.compile(rf"(([^\\]|^)(\\\\)*){new_quote}")
5062 escaped_new_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){new_quote}")
5063 escaped_orig_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){orig_quote}")
5064 body = leaf.value[first_quote_pos + len(orig_quote) : -len(orig_quote)]
5065 if "r" in prefix.casefold():
5066 if unescaped_new_quote.search(body):
5067 # There's at least one unescaped new_quote in this raw string
5068 # so converting is impossible
5071 # Do not introduce or remove backslashes in raw strings
5074 # remove unnecessary escapes
5075 new_body = sub_twice(escaped_new_quote, rf"\1\2{new_quote}", body)
5076 if body != new_body:
5077 # Consider the string without unnecessary escapes as the original
5079 leaf.value = f"{prefix}{orig_quote}{body}{orig_quote}"
5080 new_body = sub_twice(escaped_orig_quote, rf"\1\2{orig_quote}", new_body)
5081 new_body = sub_twice(unescaped_new_quote, rf"\1\\{new_quote}", new_body)
5082 if "f" in prefix.casefold():
5083 matches = re.findall(
5085 (?:[^{]|^)\{ # start of the string or a non-{ followed by a single {
5086 ([^{].*?) # contents of the brackets except if begins with {{
5087 \}(?:[^}]|$) # A } followed by end of the string or a non-}
5094 # Do not introduce backslashes in interpolated expressions
5097 if new_quote == '"""' and new_body[-1:] == '"':
5099 new_body = new_body[:-1] + '\\"'
5100 orig_escape_count = body.count("\\")
5101 new_escape_count = new_body.count("\\")
5102 if new_escape_count > orig_escape_count:
5103 return # Do not introduce more escaping
5105 if new_escape_count == orig_escape_count and orig_quote == '"':
5106 return # Prefer double quotes
5108 leaf.value = f"{prefix}{new_quote}{new_body}{new_quote}"
5111 def normalize_numeric_literal(leaf: Leaf) -> None:
5112 """Normalizes numeric (float, int, and complex) literals.
5114 All letters used in the representation are normalized to lowercase (except
5115 in Python 2 long literals).
5117 text = leaf.value.lower()
5118 if text.startswith(("0o", "0b")):
5119 # Leave octal and binary literals alone.
5121 elif text.startswith("0x"):
5122 # Change hex literals to upper case.
5123 before, after = text[:2], text[2:]
5124 text = f"{before}{after.upper()}"
5126 before, after = text.split("e")
5128 if after.startswith("-"):
5131 elif after.startswith("+"):
5133 before = format_float_or_int_string(before)
5134 text = f"{before}e{sign}{after}"
5135 elif text.endswith(("j", "l")):
5138 # Capitalize in "2L" because "l" looks too similar to "1".
5141 text = f"{format_float_or_int_string(number)}{suffix}"
5143 text = format_float_or_int_string(text)
5147 def format_float_or_int_string(text: str) -> str:
5148 """Formats a float string like "1.0"."""
5152 before, after = text.split(".")
5153 return f"{before or 0}.{after or 0}"
5156 def normalize_invisible_parens(node: Node, parens_after: Set[str]) -> None:
5157 """Make existing optional parentheses invisible or create new ones.
5159 `parens_after` is a set of string leaf values immediately after which parens
5162 Standardizes on visible parentheses for single-element tuples, and keeps
5163 existing visible parentheses for other tuples and generator expressions.
5165 for pc in list_comments(node.prefix, is_endmarker=False):
5166 if pc.value in FMT_OFF:
5167 # This `node` has a prefix with `# fmt: off`, don't mess with parens.
5170 for index, child in enumerate(list(node.children)):
5171 # Fixes a bug where invisible parens are not properly stripped from
5172 # assignment statements that contain type annotations.
5173 if isinstance(child, Node) and child.type == syms.annassign:
5174 normalize_invisible_parens(child, parens_after=parens_after)
5176 # Add parentheses around long tuple unpacking in assignments.
5179 and isinstance(child, Node)
5180 and child.type == syms.testlist_star_expr
5185 if is_walrus_assignment(child):
5188 if child.type == syms.atom:
5189 if maybe_make_parens_invisible_in_atom(child, parent=node):
5190 wrap_in_parentheses(node, child, visible=False)
5191 elif is_one_tuple(child):
5192 wrap_in_parentheses(node, child, visible=True)
5193 elif node.type == syms.import_from:
5194 # "import from" nodes store parentheses directly as part of
5196 if child.type == token.LPAR:
5197 # make parentheses invisible
5198 child.value = "" # type: ignore
5199 node.children[-1].value = "" # type: ignore
5200 elif child.type != token.STAR:
5201 # insert invisible parentheses
5202 node.insert_child(index, Leaf(token.LPAR, ""))
5203 node.append_child(Leaf(token.RPAR, ""))
5206 elif not (isinstance(child, Leaf) and is_multiline_string(child)):
5207 wrap_in_parentheses(node, child, visible=False)
5209 check_lpar = isinstance(child, Leaf) and child.value in parens_after
5212 def normalize_fmt_off(node: Node) -> None:
5213 """Convert content between `# fmt: off`/`# fmt: on` into standalone comments."""
5216 try_again = convert_one_fmt_off_pair(node)
5219 def convert_one_fmt_off_pair(node: Node) -> bool:
5220 """Convert content of a single `# fmt: off`/`# fmt: on` into a standalone comment.
5222 Returns True if a pair was converted.
5224 for leaf in node.leaves():
5225 previous_consumed = 0
5226 for comment in list_comments(leaf.prefix, is_endmarker=False):
5227 if comment.value in FMT_OFF:
5228 # We only want standalone comments. If there's no previous leaf or
5229 # the previous leaf is indentation, it's a standalone comment in
5231 if comment.type != STANDALONE_COMMENT:
5232 prev = preceding_leaf(leaf)
5233 if prev and prev.type not in WHITESPACE:
5236 ignored_nodes = list(generate_ignored_nodes(leaf))
5237 if not ignored_nodes:
5240 first = ignored_nodes[0] # Can be a container node with the `leaf`.
5241 parent = first.parent
5242 prefix = first.prefix
5243 first.prefix = prefix[comment.consumed :]
5245 comment.value + "\n" + "".join(str(n) for n in ignored_nodes)
5247 if hidden_value.endswith("\n"):
5248 # That happens when one of the `ignored_nodes` ended with a NEWLINE
5249 # leaf (possibly followed by a DEDENT).
5250 hidden_value = hidden_value[:-1]
5251 first_idx: Optional[int] = None
5252 for ignored in ignored_nodes:
5253 index = ignored.remove()
5254 if first_idx is None:
5256 assert parent is not None, "INTERNAL ERROR: fmt: on/off handling (1)"
5257 assert first_idx is not None, "INTERNAL ERROR: fmt: on/off handling (2)"
5258 parent.insert_child(
5263 prefix=prefix[:previous_consumed] + "\n" * comment.newlines,
5268 previous_consumed = comment.consumed
5273 def generate_ignored_nodes(leaf: Leaf) -> Iterator[LN]:
5274 """Starting from the container of `leaf`, generate all leaves until `# fmt: on`.
5276 Stops at the end of the block.
5278 container: Optional[LN] = container_of(leaf)
5279 while container is not None and container.type != token.ENDMARKER:
5280 if is_fmt_on(container):
5283 # fix for fmt: on in children
5284 if contains_fmt_on_at_column(container, leaf.column):
5285 for child in container.children:
5286 if contains_fmt_on_at_column(child, leaf.column):
5291 container = container.next_sibling
5294 def is_fmt_on(container: LN) -> bool:
5295 """Determine whether formatting is switched on within a container.
5296 Determined by whether the last `# fmt:` comment is `on` or `off`.
5299 for comment in list_comments(container.prefix, is_endmarker=False):
5300 if comment.value in FMT_ON:
5302 elif comment.value in FMT_OFF:
5307 def contains_fmt_on_at_column(container: LN, column: int) -> bool:
5308 """Determine if children at a given column have formatting switched on."""
5309 for child in container.children:
5311 isinstance(child, Node)
5312 and first_leaf_column(child) == column
5313 or isinstance(child, Leaf)
5314 and child.column == column
5316 if is_fmt_on(child):
5322 def first_leaf_column(node: Node) -> Optional[int]:
5323 """Returns the column of the first leaf child of a node."""
5324 for child in node.children:
5325 if isinstance(child, Leaf):
5330 def maybe_make_parens_invisible_in_atom(node: LN, parent: LN) -> bool:
5331 """If it's safe, make the parens in the atom `node` invisible, recursively.
5332 Additionally, remove repeated, adjacent invisible parens from the atom `node`
5333 as they are redundant.
5335 Returns whether the node should itself be wrapped in invisible parentheses.
5339 node.type != syms.atom
5340 or is_empty_tuple(node)
5341 or is_one_tuple(node)
5342 or (is_yield(node) and parent.type != syms.expr_stmt)
5343 or max_delimiter_priority_in_atom(node) >= COMMA_PRIORITY
5347 first = node.children[0]
5348 last = node.children[-1]
5349 if first.type == token.LPAR and last.type == token.RPAR:
5350 middle = node.children[1]
5351 # make parentheses invisible
5352 first.value = "" # type: ignore
5353 last.value = "" # type: ignore
5354 maybe_make_parens_invisible_in_atom(middle, parent=parent)
5356 if is_atom_with_invisible_parens(middle):
5357 # Strip the invisible parens from `middle` by replacing
5358 # it with the child in-between the invisible parens
5359 middle.replace(middle.children[1])
5366 def is_atom_with_invisible_parens(node: LN) -> bool:
5367 """Given a `LN`, determines whether it's an atom `node` with invisible
5368 parens. Useful in dedupe-ing and normalizing parens.
5370 if isinstance(node, Leaf) or node.type != syms.atom:
5373 first, last = node.children[0], node.children[-1]
5375 isinstance(first, Leaf)
5376 and first.type == token.LPAR
5377 and first.value == ""
5378 and isinstance(last, Leaf)
5379 and last.type == token.RPAR
5380 and last.value == ""
5384 def is_empty_tuple(node: LN) -> bool:
5385 """Return True if `node` holds an empty tuple."""
5387 node.type == syms.atom
5388 and len(node.children) == 2
5389 and node.children[0].type == token.LPAR
5390 and node.children[1].type == token.RPAR
5394 def unwrap_singleton_parenthesis(node: LN) -> Optional[LN]:
5395 """Returns `wrapped` if `node` is of the shape ( wrapped ).
5397 Parenthesis can be optional. Returns None otherwise"""
5398 if len(node.children) != 3:
5401 lpar, wrapped, rpar = node.children
5402 if not (lpar.type == token.LPAR and rpar.type == token.RPAR):
5408 def wrap_in_parentheses(parent: Node, child: LN, *, visible: bool = True) -> None:
5409 """Wrap `child` in parentheses.
5411 This replaces `child` with an atom holding the parentheses and the old
5412 child. That requires moving the prefix.
5414 If `visible` is False, the leaves will be valueless (and thus invisible).
5416 lpar = Leaf(token.LPAR, "(" if visible else "")
5417 rpar = Leaf(token.RPAR, ")" if visible else "")
5418 prefix = child.prefix
5420 index = child.remove() or 0
5421 new_child = Node(syms.atom, [lpar, child, rpar])
5422 new_child.prefix = prefix
5423 parent.insert_child(index, new_child)
5426 def is_one_tuple(node: LN) -> bool:
5427 """Return True if `node` holds a tuple with one element, with or without parens."""
5428 if node.type == syms.atom:
5429 gexp = unwrap_singleton_parenthesis(node)
5430 if gexp is None or gexp.type != syms.testlist_gexp:
5433 return len(gexp.children) == 2 and gexp.children[1].type == token.COMMA
5436 node.type in IMPLICIT_TUPLE
5437 and len(node.children) == 2
5438 and node.children[1].type == token.COMMA
5442 def is_walrus_assignment(node: LN) -> bool:
5443 """Return True iff `node` is of the shape ( test := test )"""
5444 inner = unwrap_singleton_parenthesis(node)
5445 return inner is not None and inner.type == syms.namedexpr_test
5448 def is_yield(node: LN) -> bool:
5449 """Return True if `node` holds a `yield` or `yield from` expression."""
5450 if node.type == syms.yield_expr:
5453 if node.type == token.NAME and node.value == "yield": # type: ignore
5456 if node.type != syms.atom:
5459 if len(node.children) != 3:
5462 lpar, expr, rpar = node.children
5463 if lpar.type == token.LPAR and rpar.type == token.RPAR:
5464 return is_yield(expr)
5469 def is_vararg(leaf: Leaf, within: Set[NodeType]) -> bool:
5470 """Return True if `leaf` is a star or double star in a vararg or kwarg.
5472 If `within` includes VARARGS_PARENTS, this applies to function signatures.
5473 If `within` includes UNPACKING_PARENTS, it applies to right hand-side
5474 extended iterable unpacking (PEP 3132) and additional unpacking
5475 generalizations (PEP 448).
5477 if leaf.type not in VARARGS_SPECIALS or not leaf.parent:
5481 if p.type == syms.star_expr:
5482 # Star expressions are also used as assignment targets in extended
5483 # iterable unpacking (PEP 3132). See what its parent is instead.
5489 return p.type in within
5492 def is_multiline_string(leaf: Leaf) -> bool:
5493 """Return True if `leaf` is a multiline string that actually spans many lines."""
5494 return has_triple_quotes(leaf.value) and "\n" in leaf.value
5497 def is_stub_suite(node: Node) -> bool:
5498 """Return True if `node` is a suite with a stub body."""
5500 len(node.children) != 4
5501 or node.children[0].type != token.NEWLINE
5502 or node.children[1].type != token.INDENT
5503 or node.children[3].type != token.DEDENT
5507 return is_stub_body(node.children[2])
5510 def is_stub_body(node: LN) -> bool:
5511 """Return True if `node` is a simple statement containing an ellipsis."""
5512 if not isinstance(node, Node) or node.type != syms.simple_stmt:
5515 if len(node.children) != 2:
5518 child = node.children[0]
5520 child.type == syms.atom
5521 and len(child.children) == 3
5522 and all(leaf == Leaf(token.DOT, ".") for leaf in child.children)
5526 def max_delimiter_priority_in_atom(node: LN) -> Priority:
5527 """Return maximum delimiter priority inside `node`.
5529 This is specific to atoms with contents contained in a pair of parentheses.
5530 If `node` isn't an atom or there are no enclosing parentheses, returns 0.
5532 if node.type != syms.atom:
5535 first = node.children[0]
5536 last = node.children[-1]
5537 if not (first.type == token.LPAR and last.type == token.RPAR):
5540 bt = BracketTracker()
5541 for c in node.children[1:-1]:
5542 if isinstance(c, Leaf):
5545 for leaf in c.leaves():
5548 return bt.max_delimiter_priority()
5554 def ensure_visible(leaf: Leaf) -> None:
5555 """Make sure parentheses are visible.
5557 They could be invisible as part of some statements (see
5558 :func:`normalize_invisible_parens` and :func:`visit_import_from`).
5560 if leaf.type == token.LPAR:
5562 elif leaf.type == token.RPAR:
5566 def should_explode(line: Line, opening_bracket: Leaf) -> bool:
5567 """Should `line` immediately be split with `delimiter_split()` after RHS?"""
5570 opening_bracket.parent
5571 and opening_bracket.parent.type in {syms.atom, syms.import_from}
5572 and opening_bracket.value in "[{("
5577 last_leaf = line.leaves[-1]
5578 exclude = {id(last_leaf)} if last_leaf.type == token.COMMA else set()
5579 max_priority = line.bracket_tracker.max_delimiter_priority(exclude=exclude)
5580 except (IndexError, ValueError):
5583 return max_priority == COMMA_PRIORITY
5586 def get_features_used(node: Node) -> Set[Feature]:
5587 """Return a set of (relatively) new Python features used in this file.
5589 Currently looking for:
5591 - underscores in numeric literals;
5592 - trailing commas after * or ** in function signatures and calls;
5593 - positional only arguments in function signatures and lambdas;
5595 features: Set[Feature] = set()
5596 for n in node.pre_order():
5597 if n.type == token.STRING:
5598 value_head = n.value[:2] # type: ignore
5599 if value_head in {'f"', 'F"', "f'", "F'", "rf", "fr", "RF", "FR"}:
5600 features.add(Feature.F_STRINGS)
5602 elif n.type == token.NUMBER:
5603 if "_" in n.value: # type: ignore
5604 features.add(Feature.NUMERIC_UNDERSCORES)
5606 elif n.type == token.SLASH:
5607 if n.parent and n.parent.type in {syms.typedargslist, syms.arglist}:
5608 features.add(Feature.POS_ONLY_ARGUMENTS)
5610 elif n.type == token.COLONEQUAL:
5611 features.add(Feature.ASSIGNMENT_EXPRESSIONS)
5614 n.type in {syms.typedargslist, syms.arglist}
5616 and n.children[-1].type == token.COMMA
5618 if n.type == syms.typedargslist:
5619 feature = Feature.TRAILING_COMMA_IN_DEF
5621 feature = Feature.TRAILING_COMMA_IN_CALL
5623 for ch in n.children:
5624 if ch.type in STARS:
5625 features.add(feature)
5627 if ch.type == syms.argument:
5628 for argch in ch.children:
5629 if argch.type in STARS:
5630 features.add(feature)
5635 def detect_target_versions(node: Node) -> Set[TargetVersion]:
5636 """Detect the version to target based on the nodes used."""
5637 features = get_features_used(node)
5639 version for version in TargetVersion if features <= VERSION_TO_FEATURES[version]
5643 def generate_trailers_to_omit(line: Line, line_length: int) -> Iterator[Set[LeafID]]:
5644 """Generate sets of closing bracket IDs that should be omitted in a RHS.
5646 Brackets can be omitted if the entire trailer up to and including
5647 a preceding closing bracket fits in one line.
5649 Yielded sets are cumulative (contain results of previous yields, too). First
5653 omit: Set[LeafID] = set()
5656 length = 4 * line.depth
5657 opening_bracket: Optional[Leaf] = None
5658 closing_bracket: Optional[Leaf] = None
5659 inner_brackets: Set[LeafID] = set()
5660 for index, leaf, leaf_length in enumerate_with_length(line, reversed=True):
5661 length += leaf_length
5662 if length > line_length:
5665 has_inline_comment = leaf_length > len(leaf.value) + len(leaf.prefix)
5666 if leaf.type == STANDALONE_COMMENT or has_inline_comment:
5670 if leaf is opening_bracket:
5671 opening_bracket = None
5672 elif leaf.type in CLOSING_BRACKETS:
5673 inner_brackets.add(id(leaf))
5674 elif leaf.type in CLOSING_BRACKETS:
5675 if index > 0 and line.leaves[index - 1].type in OPENING_BRACKETS:
5676 # Empty brackets would fail a split so treat them as "inner"
5677 # brackets (e.g. only add them to the `omit` set if another
5678 # pair of brackets was good enough.
5679 inner_brackets.add(id(leaf))
5683 omit.add(id(closing_bracket))
5684 omit.update(inner_brackets)
5685 inner_brackets.clear()
5689 opening_bracket = leaf.opening_bracket
5690 closing_bracket = leaf
5693 def get_future_imports(node: Node) -> Set[str]:
5694 """Return a set of __future__ imports in the file."""
5695 imports: Set[str] = set()
5697 def get_imports_from_children(children: List[LN]) -> Generator[str, None, None]:
5698 for child in children:
5699 if isinstance(child, Leaf):
5700 if child.type == token.NAME:
5703 elif child.type == syms.import_as_name:
5704 orig_name = child.children[0]
5705 assert isinstance(orig_name, Leaf), "Invalid syntax parsing imports"
5706 assert orig_name.type == token.NAME, "Invalid syntax parsing imports"
5707 yield orig_name.value
5709 elif child.type == syms.import_as_names:
5710 yield from get_imports_from_children(child.children)
5713 raise AssertionError("Invalid syntax parsing imports")
5715 for child in node.children:
5716 if child.type != syms.simple_stmt:
5719 first_child = child.children[0]
5720 if isinstance(first_child, Leaf):
5721 # Continue looking if we see a docstring; otherwise stop.
5723 len(child.children) == 2
5724 and first_child.type == token.STRING
5725 and child.children[1].type == token.NEWLINE
5731 elif first_child.type == syms.import_from:
5732 module_name = first_child.children[1]
5733 if not isinstance(module_name, Leaf) or module_name.value != "__future__":
5736 imports |= set(get_imports_from_children(first_child.children[3:]))
5744 def get_gitignore(root: Path) -> PathSpec:
5745 """ Return a PathSpec matching gitignore content if present."""
5746 gitignore = root / ".gitignore"
5747 lines: List[str] = []
5748 if gitignore.is_file():
5749 with gitignore.open() as gf:
5750 lines = gf.readlines()
5751 return PathSpec.from_lines("gitwildmatch", lines)
5754 def gen_python_files(
5755 paths: Iterable[Path],
5757 include: Optional[Pattern[str]],
5758 exclude_regexes: Iterable[Pattern[str]],
5760 gitignore: PathSpec,
5761 ) -> Iterator[Path]:
5762 """Generate all files under `path` whose paths are not excluded by the
5763 `exclude` regex, but are included by the `include` regex.
5765 Symbolic links pointing outside of the `root` directory are ignored.
5767 `report` is where output about exclusions goes.
5769 assert root.is_absolute(), f"INTERNAL ERROR: `root` must be absolute but is {root}"
5771 # Then ignore with `exclude` option.
5773 normalized_path = child.resolve().relative_to(root).as_posix()
5774 except OSError as e:
5775 report.path_ignored(child, f"cannot be read because {e}")
5778 if child.is_symlink():
5779 report.path_ignored(
5780 child, f"is a symbolic link that points outside {root}"
5786 # First ignore files matching .gitignore
5787 if gitignore.match_file(normalized_path):
5788 report.path_ignored(child, "matches the .gitignore file content")
5791 normalized_path = "/" + normalized_path
5793 normalized_path += "/"
5796 for exclude in exclude_regexes:
5797 exclude_match = exclude.search(normalized_path) if exclude else None
5798 if exclude_match and exclude_match.group(0):
5799 report.path_ignored(child, "matches the --exclude regular expression")
5806 yield from gen_python_files(
5807 child.iterdir(), root, include, exclude_regexes, report, gitignore
5810 elif child.is_file():
5811 include_match = include.search(normalized_path) if include else True
5817 def find_project_root(srcs: Iterable[str]) -> Path:
5818 """Return a directory containing .git, .hg, or pyproject.toml.
5820 That directory can be one of the directories passed in `srcs` or their
5823 If no directory in the tree contains a marker that would specify it's the
5824 project root, the root of the file system is returned.
5827 return Path("/").resolve()
5829 common_base = min(Path(src).resolve() for src in srcs)
5830 if common_base.is_dir():
5831 # Append a fake file so `parents` below returns `common_base_dir`, too.
5832 common_base /= "fake-file"
5833 for directory in common_base.parents:
5834 if (directory / ".git").exists():
5837 if (directory / ".hg").is_dir():
5840 if (directory / "pyproject.toml").is_file():
5848 """Provides a reformatting counter. Can be rendered with `str(report)`."""
5853 verbose: bool = False
5854 change_count: int = 0
5856 failure_count: int = 0
5858 def done(self, src: Path, changed: Changed) -> None:
5859 """Increment the counter for successful reformatting. Write out a message."""
5860 if changed is Changed.YES:
5861 reformatted = "would reformat" if self.check or self.diff else "reformatted"
5862 if self.verbose or not self.quiet:
5863 out(f"{reformatted} {src}")
5864 self.change_count += 1
5867 if changed is Changed.NO:
5868 msg = f"{src} already well formatted, good job."
5870 msg = f"{src} wasn't modified on disk since last run."
5871 out(msg, bold=False)
5872 self.same_count += 1
5874 def failed(self, src: Path, message: str) -> None:
5875 """Increment the counter for failed reformatting. Write out a message."""
5876 err(f"error: cannot format {src}: {message}")
5877 self.failure_count += 1
5879 def path_ignored(self, path: Path, message: str) -> None:
5881 out(f"{path} ignored: {message}", bold=False)
5884 def return_code(self) -> int:
5885 """Return the exit code that the app should use.
5887 This considers the current state of changed files and failures:
5888 - if there were any failures, return 123;
5889 - if any files were changed and --check is being used, return 1;
5890 - otherwise return 0.
5892 # According to http://tldp.org/LDP/abs/html/exitcodes.html starting with
5893 # 126 we have special return codes reserved by the shell.
5894 if self.failure_count:
5897 elif self.change_count and self.check:
5902 def __str__(self) -> str:
5903 """Render a color report of the current state.
5905 Use `click.unstyle` to remove colors.
5907 if self.check or self.diff:
5908 reformatted = "would be reformatted"
5909 unchanged = "would be left unchanged"
5910 failed = "would fail to reformat"
5912 reformatted = "reformatted"
5913 unchanged = "left unchanged"
5914 failed = "failed to reformat"
5916 if self.change_count:
5917 s = "s" if self.change_count > 1 else ""
5919 click.style(f"{self.change_count} file{s} {reformatted}", bold=True)
5922 s = "s" if self.same_count > 1 else ""
5923 report.append(f"{self.same_count} file{s} {unchanged}")
5924 if self.failure_count:
5925 s = "s" if self.failure_count > 1 else ""
5927 click.style(f"{self.failure_count} file{s} {failed}", fg="red")
5929 return ", ".join(report) + "."
5932 def parse_ast(src: str) -> Union[ast.AST, ast3.AST, ast27.AST]:
5933 filename = "<unknown>"
5934 if sys.version_info >= (3, 8):
5935 # TODO: support Python 4+ ;)
5936 for minor_version in range(sys.version_info[1], 4, -1):
5938 return ast.parse(src, filename, feature_version=(3, minor_version))
5942 for feature_version in (7, 6):
5944 return ast3.parse(src, filename, feature_version=feature_version)
5948 return ast27.parse(src)
5951 def _fixup_ast_constants(
5952 node: Union[ast.AST, ast3.AST, ast27.AST]
5953 ) -> Union[ast.AST, ast3.AST, ast27.AST]:
5954 """Map ast nodes deprecated in 3.8 to Constant."""
5955 if isinstance(node, (ast.Str, ast3.Str, ast27.Str, ast.Bytes, ast3.Bytes)):
5956 return ast.Constant(value=node.s)
5958 if isinstance(node, (ast.Num, ast3.Num, ast27.Num)):
5959 return ast.Constant(value=node.n)
5961 if isinstance(node, (ast.NameConstant, ast3.NameConstant)):
5962 return ast.Constant(value=node.value)
5968 node: Union[ast.AST, ast3.AST, ast27.AST], depth: int = 0
5970 """Simple visitor generating strings to compare ASTs by content."""
5972 node = _fixup_ast_constants(node)
5974 yield f"{' ' * depth}{node.__class__.__name__}("
5976 for field in sorted(node._fields): # noqa: F402
5977 # TypeIgnore has only one field 'lineno' which breaks this comparison
5978 type_ignore_classes = (ast3.TypeIgnore, ast27.TypeIgnore)
5979 if sys.version_info >= (3, 8):
5980 type_ignore_classes += (ast.TypeIgnore,)
5981 if isinstance(node, type_ignore_classes):
5985 value = getattr(node, field)
5986 except AttributeError:
5989 yield f"{' ' * (depth+1)}{field}="
5991 if isinstance(value, list):
5993 # Ignore nested tuples within del statements, because we may insert
5994 # parentheses and they change the AST.
5997 and isinstance(node, (ast.Delete, ast3.Delete, ast27.Delete))
5998 and isinstance(item, (ast.Tuple, ast3.Tuple, ast27.Tuple))
6000 for item in item.elts:
6001 yield from _stringify_ast(item, depth + 2)
6003 elif isinstance(item, (ast.AST, ast3.AST, ast27.AST)):
6004 yield from _stringify_ast(item, depth + 2)
6006 elif isinstance(value, (ast.AST, ast3.AST, ast27.AST)):
6007 yield from _stringify_ast(value, depth + 2)
6010 # Constant strings may be indented across newlines, if they are
6011 # docstrings; fold spaces after newlines when comparing. Similarly,
6012 # trailing and leading space may be removed.
6014 isinstance(node, ast.Constant)
6015 and field == "value"
6016 and isinstance(value, str)
6018 normalized = re.sub(r" *\n[ \t]+", "\n ", value).strip()
6021 yield f"{' ' * (depth+2)}{normalized!r}, # {value.__class__.__name__}"
6023 yield f"{' ' * depth}) # /{node.__class__.__name__}"
6026 def assert_equivalent(src: str, dst: str) -> None:
6027 """Raise AssertionError if `src` and `dst` aren't equivalent."""
6029 src_ast = parse_ast(src)
6030 except Exception as exc:
6031 raise AssertionError(
6032 "cannot use --safe with this file; failed to parse source file. AST"
6033 f" error message: {exc}"
6037 dst_ast = parse_ast(dst)
6038 except Exception as exc:
6039 log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
6040 raise AssertionError(
6041 f"INTERNAL ERROR: Black produced invalid code: {exc}. Please report a bug"
6042 " on https://github.com/psf/black/issues. This invalid output might be"
6046 src_ast_str = "\n".join(_stringify_ast(src_ast))
6047 dst_ast_str = "\n".join(_stringify_ast(dst_ast))
6048 if src_ast_str != dst_ast_str:
6049 log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst"))
6050 raise AssertionError(
6051 "INTERNAL ERROR: Black produced code that is not equivalent to the"
6052 " source. Please report a bug on https://github.com/psf/black/issues. "
6053 f" This diff might be helpful: {log}"
6057 def assert_stable(src: str, dst: str, mode: Mode) -> None:
6058 """Raise AssertionError if `dst` reformats differently the second time."""
6059 newdst = format_str(dst, mode=mode)
6062 diff(src, dst, "source", "first pass"),
6063 diff(dst, newdst, "first pass", "second pass"),
6065 raise AssertionError(
6066 "INTERNAL ERROR: Black produced different code on the second pass of the"
6067 " formatter. Please report a bug on https://github.com/psf/black/issues."
6068 f" This diff might be helpful: {log}"
6072 @mypyc_attr(patchable=True)
6073 def dump_to_file(*output: str) -> str:
6074 """Dump `output` to a temporary file. Return path to the file."""
6075 with tempfile.NamedTemporaryFile(
6076 mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
6078 for lines in output:
6080 if lines and lines[-1] != "\n":
6086 def nullcontext() -> Iterator[None]:
6087 """Return an empty context manager.
6089 To be used like `nullcontext` in Python 3.7.
6094 def diff(a: str, b: str, a_name: str, b_name: str) -> str:
6095 """Return a unified diff string between strings `a` and `b`."""
6098 a_lines = [line + "\n" for line in a.splitlines()]
6099 b_lines = [line + "\n" for line in b.splitlines()]
6101 difflib.unified_diff(a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5)
6105 def cancel(tasks: Iterable["asyncio.Task[Any]"]) -> None:
6106 """asyncio signal handler that cancels all `tasks` and reports to stderr."""
6112 def shutdown(loop: asyncio.AbstractEventLoop) -> None:
6113 """Cancel all pending tasks on `loop`, wait for them, and close the loop."""
6115 if sys.version_info[:2] >= (3, 7):
6116 all_tasks = asyncio.all_tasks
6118 all_tasks = asyncio.Task.all_tasks
6119 # This part is borrowed from asyncio/runners.py in Python 3.7b2.
6120 to_cancel = [task for task in all_tasks(loop) if not task.done()]
6124 for task in to_cancel:
6126 loop.run_until_complete(
6127 asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
6130 # `concurrent.futures.Future` objects cannot be cancelled once they
6131 # are already running. There might be some when the `shutdown()` happened.
6132 # Silence their logger's spew about the event loop being closed.
6133 cf_logger = logging.getLogger("concurrent.futures")
6134 cf_logger.setLevel(logging.CRITICAL)
6138 def sub_twice(regex: Pattern[str], replacement: str, original: str) -> str:
6139 """Replace `regex` with `replacement` twice on `original`.
6141 This is used by string normalization to perform replaces on
6142 overlapping matches.
6144 return regex.sub(replacement, regex.sub(replacement, original))
6147 def re_compile_maybe_verbose(regex: str) -> Pattern[str]:
6148 """Compile a regular expression string in `regex`.
6150 If it contains newlines, use verbose mode.
6153 regex = "(?x)" + regex
6154 compiled: Pattern[str] = re.compile(regex)
6158 def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
6159 """Like `reversed(enumerate(sequence))` if that were possible."""
6160 index = len(sequence) - 1
6161 for element in reversed(sequence):
6162 yield (index, element)
6166 def enumerate_with_length(
6167 line: Line, reversed: bool = False
6168 ) -> Iterator[Tuple[Index, Leaf, int]]:
6169 """Return an enumeration of leaves with their length.
6171 Stops prematurely on multiline strings and standalone comments.
6174 Callable[[Sequence[Leaf]], Iterator[Tuple[Index, Leaf]]],
6175 enumerate_reversed if reversed else enumerate,
6177 for index, leaf in op(line.leaves):
6178 length = len(leaf.prefix) + len(leaf.value)
6179 if "\n" in leaf.value:
6180 return # Multiline strings, we can't continue.
6182 for comment in line.comments_after(leaf):
6183 length += len(comment.value)
6185 yield index, leaf, length
6188 def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
6189 """Return True if `line` is no longer than `line_length`.
6191 Uses the provided `line_str` rendering, if any, otherwise computes a new one.
6194 line_str = line_to_string(line)
6196 len(line_str) <= line_length
6197 and "\n" not in line_str # multiline strings
6198 and not line.contains_standalone_comments()
6202 def can_be_split(line: Line) -> bool:
6203 """Return False if the line cannot be split *for sure*.
6205 This is not an exhaustive search but a cheap heuristic that we can use to
6206 avoid some unfortunate formattings (mostly around wrapping unsplittable code
6207 in unnecessary parentheses).
6209 leaves = line.leaves
6213 if leaves[0].type == token.STRING and leaves[1].type == token.DOT:
6217 for leaf in leaves[-2::-1]:
6218 if leaf.type in OPENING_BRACKETS:
6219 if next.type not in CLOSING_BRACKETS:
6223 elif leaf.type == token.DOT:
6225 elif leaf.type == token.NAME:
6226 if not (next.type == token.DOT or next.type in OPENING_BRACKETS):
6229 elif leaf.type not in CLOSING_BRACKETS:
6232 if dot_count > 1 and call_count > 1:
6238 def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
6239 """Does `line` have a shape safe to reformat without optional parens around it?
6241 Returns True for only a subset of potentially nice looking formattings but
6242 the point is to not return false positives that end up producing lines that
6245 bt = line.bracket_tracker
6246 if not bt.delimiters:
6247 # Without delimiters the optional parentheses are useless.
6250 max_priority = bt.max_delimiter_priority()
6251 if bt.delimiter_count_with_priority(max_priority) > 1:
6252 # With more than one delimiter of a kind the optional parentheses read better.
6255 if max_priority == DOT_PRIORITY:
6256 # A single stranded method call doesn't require optional parentheses.
6259 assert len(line.leaves) >= 2, "Stranded delimiter"
6261 first = line.leaves[0]
6262 second = line.leaves[1]
6263 penultimate = line.leaves[-2]
6264 last = line.leaves[-1]
6266 # With a single delimiter, omit if the expression starts or ends with
6268 if first.type in OPENING_BRACKETS and second.type not in CLOSING_BRACKETS:
6270 length = 4 * line.depth
6271 for _index, leaf, leaf_length in enumerate_with_length(line):
6272 if leaf.type in CLOSING_BRACKETS and leaf.opening_bracket is first:
6275 length += leaf_length
6276 if length > line_length:
6279 if leaf.type in OPENING_BRACKETS:
6280 # There are brackets we can further split on.
6284 # checked the entire string and line length wasn't exceeded
6285 if len(line.leaves) == _index + 1:
6288 # Note: we are not returning False here because a line might have *both*
6289 # a leading opening bracket and a trailing closing bracket. If the
6290 # opening bracket doesn't match our rule, maybe the closing will.
6293 last.type == token.RPAR
6294 or last.type == token.RBRACE
6296 # don't use indexing for omitting optional parentheses;
6298 last.type == token.RSQB
6300 and last.parent.type != syms.trailer
6303 if penultimate.type in OPENING_BRACKETS:
6304 # Empty brackets don't help.
6307 if is_multiline_string(first):
6308 # Additional wrapping of a multiline string in this situation is
6312 length = 4 * line.depth
6313 seen_other_brackets = False
6314 for _index, leaf, leaf_length in enumerate_with_length(line):
6315 length += leaf_length
6316 if leaf is last.opening_bracket:
6317 if seen_other_brackets or length <= line_length:
6320 elif leaf.type in OPENING_BRACKETS:
6321 # There are brackets we can further split on.
6322 seen_other_brackets = True
6327 def get_cache_file(mode: Mode) -> Path:
6328 return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
6331 def read_cache(mode: Mode) -> Cache:
6332 """Read the cache if it exists and is well formed.
6334 If it is not well formed, the call to write_cache later should resolve the issue.
6336 cache_file = get_cache_file(mode)
6337 if not cache_file.exists():
6340 with cache_file.open("rb") as fobj:
6342 cache: Cache = pickle.load(fobj)
6343 except (pickle.UnpicklingError, ValueError):
6349 def get_cache_info(path: Path) -> CacheInfo:
6350 """Return the information used to check if a file is already formatted or not."""
6352 return stat.st_mtime, stat.st_size
6355 def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
6356 """Split an iterable of paths in `sources` into two sets.
6358 The first contains paths of files that modified on disk or are not in the
6359 cache. The other contains paths to non-modified files.
6361 todo, done = set(), set()
6364 if cache.get(src) != get_cache_info(src):
6371 def write_cache(cache: Cache, sources: Iterable[Path], mode: Mode) -> None:
6372 """Update the cache file."""
6373 cache_file = get_cache_file(mode)
6375 CACHE_DIR.mkdir(parents=True, exist_ok=True)
6376 new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
6377 with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
6378 pickle.dump(new_cache, f, protocol=4)
6379 os.replace(f.name, cache_file)
6384 def patch_click() -> None:
6385 """Make Click not crash.
6387 On certain misconfigured environments, Python 3 selects the ASCII encoding as the
6388 default which restricts paths that it can access during the lifetime of the
6389 application. Click refuses to work in this scenario by raising a RuntimeError.
6391 In case of Black the likelihood that non-ASCII characters are going to be used in
6392 file paths is minimal since it's Python source code. Moreover, this crash was
6393 spurious on Python 3.7 thanks to PEP 538 and PEP 540.
6396 from click import core
6397 from click import _unicodefun # type: ignore
6398 except ModuleNotFoundError:
6401 for module in (core, _unicodefun):
6402 if hasattr(module, "_verify_python3_env"):
6403 module._verify_python3_env = lambda: None
6406 def patched_main() -> None:
6412 def fix_docstring(docstring: str, prefix: str) -> str:
6413 # https://www.python.org/dev/peps/pep-0257/#handling-docstring-indentation
6416 # Convert tabs to spaces (following the normal Python rules)
6417 # and split into a list of lines:
6418 lines = docstring.expandtabs().splitlines()
6419 # Determine minimum indentation (first line doesn't count):
6420 indent = sys.maxsize
6421 for line in lines[1:]:
6422 stripped = line.lstrip()
6424 indent = min(indent, len(line) - len(stripped))
6425 # Remove indentation (first line is special):
6426 trimmed = [lines[0].strip()]
6427 if indent < sys.maxsize:
6428 last_line_idx = len(lines) - 2
6429 for i, line in enumerate(lines[1:]):
6430 stripped_line = line[indent:].rstrip()
6431 if stripped_line or i == last_line_idx:
6432 trimmed.append(prefix + stripped_line)
6435 # Return a single string:
6436 return "\n".join(trimmed)
6439 if __name__ == "__main__":