All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
3 from abc import ABC, abstractmethod
4 from collections import defaultdict
5 from concurrent.futures import Executor, ThreadPoolExecutor, ProcessPoolExecutor
6 from contextlib import contextmanager
7 from datetime import datetime
9 from functools import lru_cache, partial, wraps
13 from multiprocessing import Manager, freeze_support
15 from pathlib import Path
45 from typing_extensions import Final
46 from mypy_extensions import mypyc_attr
48 from appdirs import user_cache_dir
49 from dataclasses import dataclass, field, replace
52 from typed_ast import ast3, ast27
53 from pathspec import PathSpec
56 from blib2to3.pytree import Node, Leaf, type_repr
57 from blib2to3 import pygram, pytree
58 from blib2to3.pgen2 import driver, token
59 from blib2to3.pgen2.grammar import Grammar
60 from blib2to3.pgen2.parse import ParseError
62 from _black_version import version as __version__
65 import colorama # noqa: F401
67 DEFAULT_LINE_LENGTH = 88
68 DEFAULT_EXCLUDES = r"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|\.svn|_build|buck-out|build|dist)/" # noqa: B950
69 DEFAULT_INCLUDES = r"\.pyi?$"
70 CACHE_DIR = Path(user_cache_dir("black", version=__version__))
72 STRING_PREFIX_CHARS: Final = "furbFURB" # All possible string prefix characters.
86 LN = Union[Leaf, Node]
87 Transformer = Callable[["Line", Collection["Feature"]], Iterator["Line"]]
90 CacheInfo = Tuple[Timestamp, FileSize]
91 Cache = Dict[Path, CacheInfo]
92 out = partial(click.secho, bold=True, err=True)
93 err = partial(click.secho, fg="red", err=True)
95 pygram.initialize(CACHE_DIR)
96 syms = pygram.python_symbols
99 class NothingChanged(UserWarning):
100 """Raised when reformatted code is the same as source."""
103 class CannotTransform(Exception):
104 """Base class for errors raised by Transformers."""
107 class CannotSplit(CannotTransform):
108 """A readable split that fits the allotted line length is impossible."""
111 class InvalidInput(ValueError):
112 """Raised when input source code fails all parse attempts."""
116 E = TypeVar("E", bound=Exception)
119 class Ok(Generic[T]):
120 def __init__(self, value: T) -> None:
127 class Err(Generic[E]):
128 def __init__(self, e: E) -> None:
135 # The 'Result' return type is used to implement an error-handling model heavily
136 # influenced by that used by the Rust programming language
137 # (see https://doc.rust-lang.org/book/ch09-00-error-handling.html).
138 Result = Union[Ok[T], Err[E]]
139 TResult = Result[T, CannotTransform] # (T)ransform Result
140 TMatchResult = TResult[Index]
143 class WriteBack(Enum):
151 def from_configuration(
152 cls, *, check: bool, diff: bool, color: bool = False
154 if check and not diff:
158 return cls.COLOR_DIFF
160 return cls.DIFF if diff else cls.YES
169 class TargetVersion(Enum):
178 def is_python2(self) -> bool:
179 return self is TargetVersion.PY27
182 PY36_VERSIONS = {TargetVersion.PY36, TargetVersion.PY37, TargetVersion.PY38}
186 # All string literals are unicode
189 NUMERIC_UNDERSCORES = 3
190 TRAILING_COMMA_IN_CALL = 4
191 TRAILING_COMMA_IN_DEF = 5
192 # The following two feature-flags are mutually exclusive, and exactly one should be
193 # set for every version of python.
194 ASYNC_IDENTIFIERS = 6
196 ASSIGNMENT_EXPRESSIONS = 8
197 POS_ONLY_ARGUMENTS = 9
200 VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
201 TargetVersion.PY27: {Feature.ASYNC_IDENTIFIERS},
202 TargetVersion.PY33: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
203 TargetVersion.PY34: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
204 TargetVersion.PY35: {
205 Feature.UNICODE_LITERALS,
206 Feature.TRAILING_COMMA_IN_CALL,
207 Feature.ASYNC_IDENTIFIERS,
209 TargetVersion.PY36: {
210 Feature.UNICODE_LITERALS,
212 Feature.NUMERIC_UNDERSCORES,
213 Feature.TRAILING_COMMA_IN_CALL,
214 Feature.TRAILING_COMMA_IN_DEF,
215 Feature.ASYNC_IDENTIFIERS,
217 TargetVersion.PY37: {
218 Feature.UNICODE_LITERALS,
220 Feature.NUMERIC_UNDERSCORES,
221 Feature.TRAILING_COMMA_IN_CALL,
222 Feature.TRAILING_COMMA_IN_DEF,
223 Feature.ASYNC_KEYWORDS,
225 TargetVersion.PY38: {
226 Feature.UNICODE_LITERALS,
228 Feature.NUMERIC_UNDERSCORES,
229 Feature.TRAILING_COMMA_IN_CALL,
230 Feature.TRAILING_COMMA_IN_DEF,
231 Feature.ASYNC_KEYWORDS,
232 Feature.ASSIGNMENT_EXPRESSIONS,
233 Feature.POS_ONLY_ARGUMENTS,
240 target_versions: Set[TargetVersion] = field(default_factory=set)
241 line_length: int = DEFAULT_LINE_LENGTH
242 string_normalization: bool = True
245 def get_cache_key(self) -> str:
246 if self.target_versions:
247 version_str = ",".join(
249 for version in sorted(self.target_versions, key=lambda v: v.value)
255 str(self.line_length),
256 str(int(self.string_normalization)),
257 str(int(self.is_pyi)),
259 return ".".join(parts)
262 # Legacy name, left for integrations.
266 def supports_feature(target_versions: Set[TargetVersion], feature: Feature) -> bool:
267 return all(feature in VERSION_TO_FEATURES[version] for version in target_versions)
270 def find_pyproject_toml(path_search_start: Iterable[str]) -> Optional[str]:
271 """Find the absolute filepath to a pyproject.toml if it exists"""
272 path_project_root = find_project_root(path_search_start)
273 path_pyproject_toml = path_project_root / "pyproject.toml"
274 return str(path_pyproject_toml) if path_pyproject_toml.is_file() else None
277 def parse_pyproject_toml(path_config: str) -> Dict[str, Any]:
278 """Parse a pyproject toml file, pulling out relevant parts for Black
280 If parsing fails, will raise a toml.TomlDecodeError
282 pyproject_toml = toml.load(path_config)
283 config = pyproject_toml.get("tool", {}).get("black", {})
284 return {k.replace("--", "").replace("-", "_"): v for k, v in config.items()}
287 def read_pyproject_toml(
288 ctx: click.Context, param: click.Parameter, value: Optional[str]
290 """Inject Black configuration from "pyproject.toml" into defaults in `ctx`.
292 Returns the path to a successfully found and read configuration file, None
296 value = find_pyproject_toml(ctx.params.get("src", ()))
301 config = parse_pyproject_toml(value)
302 except (toml.TomlDecodeError, OSError) as e:
303 raise click.FileError(
304 filename=value, hint=f"Error reading configuration file: {e}"
310 # Sanitize the values to be Click friendly. For more information please see:
311 # https://github.com/psf/black/issues/1458
312 # https://github.com/pallets/click/issues/1567
314 k: str(v) if not isinstance(v, (list, dict)) else v
315 for k, v in config.items()
318 target_version = config.get("target_version")
319 if target_version is not None and not isinstance(target_version, list):
320 raise click.BadOptionUsage(
321 "target-version", "Config key target-version must be a list"
324 default_map: Dict[str, Any] = {}
326 default_map.update(ctx.default_map)
327 default_map.update(config)
329 ctx.default_map = default_map
333 def target_version_option_callback(
334 c: click.Context, p: Union[click.Option, click.Parameter], v: Tuple[str, ...]
335 ) -> List[TargetVersion]:
336 """Compute the target versions from a --target-version flag.
338 This is its own function because mypy couldn't infer the type correctly
339 when it was a lambda, causing mypyc trouble.
341 return [TargetVersion[val.upper()] for val in v]
344 @click.command(context_settings=dict(help_option_names=["-h", "--help"]))
345 @click.option("-c", "--code", type=str, help="Format the code passed in as a string.")
350 default=DEFAULT_LINE_LENGTH,
351 help="How many characters per line to allow.",
357 type=click.Choice([v.name.lower() for v in TargetVersion]),
358 callback=target_version_option_callback,
361 "Python versions that should be supported by Black's output. [default: per-file"
369 "Format all input files like typing stubs regardless of file extension (useful"
370 " when piping source on standard input)."
375 "--skip-string-normalization",
377 help="Don't normalize string quotes or prefixes.",
383 "Don't write the files back, just return the status. Return code 0 means"
384 " nothing would change. Return code 1 means some files would be reformatted."
385 " Return code 123 means there was an internal error."
391 help="Don't write the files back, just output a diff for each file on stdout.",
394 "--color/--no-color",
396 help="Show colored diff. Only applies when `--diff` is given.",
401 help="If --fast given, skip temporary sanity checks. [default: --safe]",
406 default=DEFAULT_INCLUDES,
408 "A regular expression that matches files and directories that should be"
409 " included on recursive searches. An empty value means all files are included"
410 " regardless of the name. Use forward slashes for directories on all platforms"
411 " (Windows, too). Exclusions are calculated first, inclusions later."
418 default=DEFAULT_EXCLUDES,
420 "A regular expression that matches files and directories that should be"
421 " excluded on recursive searches. An empty value means no paths are excluded."
422 " Use forward slashes for directories on all platforms (Windows, too). "
423 " Exclusions are calculated first, inclusions later."
431 "Like --exclude, but files and directories matching this regex will be "
432 "excluded even when they are passed explicitly as arguments"
440 "Don't emit non-error messages to stderr. Errors are still emitted; silence"
441 " those with 2>/dev/null."
449 "Also emit messages to stderr about files that were not changed or were ignored"
450 " due to --exclude=."
453 @click.version_option(version=__version__)
458 exists=True, file_okay=True, dir_okay=True, readable=True, allow_dash=True
473 callback=read_pyproject_toml,
474 help="Read configuration from FILE path.",
481 target_version: List[TargetVersion],
487 skip_string_normalization: bool,
492 force_exclude: Optional[str],
493 src: Tuple[str, ...],
494 config: Optional[str],
496 """The uncompromising code formatter."""
497 write_back = WriteBack.from_configuration(check=check, diff=diff, color=color)
499 versions = set(target_version)
501 # We'll autodetect later.
504 target_versions=versions,
505 line_length=line_length,
507 string_normalization=not skip_string_normalization,
509 if config and verbose:
510 out(f"Using configuration from {config}.", bold=False, fg="blue")
512 print(format_str(code, mode=mode))
514 report = Report(check=check, diff=diff, quiet=quiet, verbose=verbose)
515 sources = get_sources(
522 force_exclude=force_exclude,
528 "No Python files are present to be formatted. Nothing to do 😴",
534 if len(sources) == 1:
538 write_back=write_back,
544 sources=sources, fast=fast, write_back=write_back, mode=mode, report=report
547 if verbose or not quiet:
548 out("Oh no! 💥 💔 💥" if report.return_code else "All done! ✨ 🍰 ✨")
549 click.secho(str(report), err=True)
550 ctx.exit(report.return_code)
556 src: Tuple[str, ...],
561 force_exclude: Optional[str],
564 """Compute the set of files to be formatted."""
566 include_regex = re_compile_maybe_verbose(include)
568 err(f"Invalid regular expression for include given: {include!r}")
571 exclude_regex = re_compile_maybe_verbose(exclude)
573 err(f"Invalid regular expression for exclude given: {exclude!r}")
576 force_exclude_regex = (
577 re_compile_maybe_verbose(force_exclude) if force_exclude else None
580 err(f"Invalid regular expression for force_exclude given: {force_exclude!r}")
583 root = find_project_root(src)
584 sources: Set[Path] = set()
585 path_empty(src, "No Path provided. Nothing to do 😴", quiet, verbose, ctx)
586 exclude_regexes = [exclude_regex]
587 if force_exclude_regex is not None:
588 exclude_regexes.append(force_exclude_regex)
608 [p], root, None, exclude_regexes, report, get_gitignore(root)
612 err(f"invalid path: {s}")
617 src: Sized, msg: str, quiet: bool, verbose: bool, ctx: click.Context
620 Exit if there is no `src` provided for formatting
623 if verbose or not quiet:
629 src: Path, fast: bool, write_back: WriteBack, mode: Mode, report: "Report"
631 """Reformat a single file under `src` without spawning child processes.
633 `fast`, `write_back`, and `mode` options are passed to
634 :func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
638 if not src.is_file() and str(src) == "-":
639 if format_stdin_to_stdout(fast=fast, write_back=write_back, mode=mode):
640 changed = Changed.YES
643 if write_back != WriteBack.DIFF:
644 cache = read_cache(mode)
645 res_src = src.resolve()
646 if res_src in cache and cache[res_src] == get_cache_info(res_src):
647 changed = Changed.CACHED
648 if changed is not Changed.CACHED and format_file_in_place(
649 src, fast=fast, write_back=write_back, mode=mode
651 changed = Changed.YES
652 if (write_back is WriteBack.YES and changed is not Changed.CACHED) or (
653 write_back is WriteBack.CHECK and changed is Changed.NO
655 write_cache(cache, [src], mode)
656 report.done(src, changed)
657 except Exception as exc:
658 report.failed(src, str(exc))
662 sources: Set[Path], fast: bool, write_back: WriteBack, mode: Mode, report: "Report"
664 """Reformat multiple files using a ProcessPoolExecutor."""
666 loop = asyncio.get_event_loop()
667 worker_count = os.cpu_count()
668 if sys.platform == "win32":
669 # Work around https://bugs.python.org/issue26903
670 worker_count = min(worker_count, 61)
672 executor = ProcessPoolExecutor(max_workers=worker_count)
673 except (ImportError, OSError):
674 # we arrive here if the underlying system does not support multi-processing
675 # like in AWS Lambda or Termux, in which case we gracefully fallback to
676 # a ThreadPollExecutor with just a single worker (more workers would not do us
677 # any good due to the Global Interpreter Lock)
678 executor = ThreadPoolExecutor(max_workers=1)
681 loop.run_until_complete(
685 write_back=write_back,
694 if executor is not None:
698 async def schedule_formatting(
701 write_back: WriteBack,
704 loop: asyncio.AbstractEventLoop,
707 """Run formatting of `sources` in parallel using the provided `executor`.
709 (Use ProcessPoolExecutors for actual parallelism.)
711 `write_back`, `fast`, and `mode` options are passed to
712 :func:`format_file_in_place`.
715 if write_back != WriteBack.DIFF:
716 cache = read_cache(mode)
717 sources, cached = filter_cached(cache, sources)
718 for src in sorted(cached):
719 report.done(src, Changed.CACHED)
724 sources_to_cache = []
726 if write_back == WriteBack.DIFF:
727 # For diff output, we need locks to ensure we don't interleave output
728 # from different processes.
730 lock = manager.Lock()
732 asyncio.ensure_future(
733 loop.run_in_executor(
734 executor, format_file_in_place, src, fast, mode, write_back, lock
737 for src in sorted(sources)
739 pending: Iterable["asyncio.Future[bool]"] = tasks.keys()
741 loop.add_signal_handler(signal.SIGINT, cancel, pending)
742 loop.add_signal_handler(signal.SIGTERM, cancel, pending)
743 except NotImplementedError:
744 # There are no good alternatives for these on Windows.
747 done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
749 src = tasks.pop(task)
751 cancelled.append(task)
752 elif task.exception():
753 report.failed(src, str(task.exception()))
755 changed = Changed.YES if task.result() else Changed.NO
756 # If the file was written back or was successfully checked as
757 # well-formatted, store this information in the cache.
758 if write_back is WriteBack.YES or (
759 write_back is WriteBack.CHECK and changed is Changed.NO
761 sources_to_cache.append(src)
762 report.done(src, changed)
764 await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
766 write_cache(cache, sources_to_cache, mode)
769 def format_file_in_place(
773 write_back: WriteBack = WriteBack.NO,
774 lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
776 """Format file under `src` path. Return True if changed.
778 If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted
780 `mode` and `fast` options are passed to :func:`format_file_contents`.
782 if src.suffix == ".pyi":
783 mode = replace(mode, is_pyi=True)
785 then = datetime.utcfromtimestamp(src.stat().st_mtime)
786 with open(src, "rb") as buf:
787 src_contents, encoding, newline = decode_bytes(buf.read())
789 dst_contents = format_file_contents(src_contents, fast=fast, mode=mode)
790 except NothingChanged:
793 if write_back == WriteBack.YES:
794 with open(src, "w", encoding=encoding, newline=newline) as f:
795 f.write(dst_contents)
796 elif write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF):
797 now = datetime.utcnow()
798 src_name = f"{src}\t{then} +0000"
799 dst_name = f"{src}\t{now} +0000"
800 diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
802 if write_back == write_back.COLOR_DIFF:
803 diff_contents = color_diff(diff_contents)
805 with lock or nullcontext():
806 f = io.TextIOWrapper(
812 f = wrap_stream_for_windows(f)
813 f.write(diff_contents)
819 def color_diff(contents: str) -> str:
820 """Inject the ANSI color codes to the diff."""
821 lines = contents.split("\n")
822 for i, line in enumerate(lines):
823 if line.startswith("+++") or line.startswith("---"):
824 line = "\033[1;37m" + line + "\033[0m" # bold white, reset
825 if line.startswith("@@"):
826 line = "\033[36m" + line + "\033[0m" # cyan, reset
827 if line.startswith("+"):
828 line = "\033[32m" + line + "\033[0m" # green, reset
829 elif line.startswith("-"):
830 line = "\033[31m" + line + "\033[0m" # red, reset
832 return "\n".join(lines)
835 def wrap_stream_for_windows(
837 ) -> Union[io.TextIOWrapper, "colorama.AnsiToWin32.AnsiToWin32"]:
839 Wrap the stream in colorama's wrap_stream so colors are shown on Windows.
841 If `colorama` is not found, then no change is made. If `colorama` does
842 exist, then it handles the logic to determine whether or not to change
846 from colorama import initialise
848 # We set `strip=False` so that we can don't have to modify
849 # test_express_diff_with_color.
850 f = initialise.wrap_stream(
851 f, convert=None, strip=False, autoreset=False, wrap=True
854 # wrap_stream returns a `colorama.AnsiToWin32.AnsiToWin32` object
855 # which does not have a `detach()` method. So we fake one.
856 f.detach = lambda *args, **kwargs: None # type: ignore
863 def format_stdin_to_stdout(
864 fast: bool, *, write_back: WriteBack = WriteBack.NO, mode: Mode
866 """Format file on stdin. Return True if changed.
868 If `write_back` is YES, write reformatted code back to stdout. If it is DIFF,
869 write a diff to stdout. The `mode` argument is passed to
870 :func:`format_file_contents`.
872 then = datetime.utcnow()
873 src, encoding, newline = decode_bytes(sys.stdin.buffer.read())
876 dst = format_file_contents(src, fast=fast, mode=mode)
879 except NothingChanged:
883 f = io.TextIOWrapper(
884 sys.stdout.buffer, encoding=encoding, newline=newline, write_through=True
886 if write_back == WriteBack.YES:
888 elif write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF):
889 now = datetime.utcnow()
890 src_name = f"STDIN\t{then} +0000"
891 dst_name = f"STDOUT\t{now} +0000"
892 d = diff(src, dst, src_name, dst_name)
893 if write_back == WriteBack.COLOR_DIFF:
895 f = wrap_stream_for_windows(f)
900 def format_file_contents(src_contents: str, *, fast: bool, mode: Mode) -> FileContent:
901 """Reformat contents a file and return new contents.
903 If `fast` is False, additionally confirm that the reformatted code is
904 valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
905 `mode` is passed to :func:`format_str`.
907 if src_contents.strip() == "":
910 dst_contents = format_str(src_contents, mode=mode)
911 if src_contents == dst_contents:
915 assert_equivalent(src_contents, dst_contents)
916 assert_stable(src_contents, dst_contents, mode=mode)
920 def format_str(src_contents: str, *, mode: Mode) -> FileContent:
921 """Reformat a string and return new contents.
923 `mode` determines formatting options, such as how many characters per line are
927 >>> print(black.format_str("def f(arg:str='')->None:...", mode=Mode()))
928 def f(arg: str = "") -> None:
931 A more complex example:
933 ... black.format_str(
934 ... "def f(arg:str='')->None: hey",
936 ... target_versions={black.TargetVersion.PY36},
938 ... string_normalization=False,
949 src_node = lib2to3_parse(src_contents.lstrip(), mode.target_versions)
951 future_imports = get_future_imports(src_node)
952 if mode.target_versions:
953 versions = mode.target_versions
955 versions = detect_target_versions(src_node)
956 normalize_fmt_off(src_node)
957 lines = LineGenerator(
958 remove_u_prefix="unicode_literals" in future_imports
959 or supports_feature(versions, Feature.UNICODE_LITERALS),
961 normalize_strings=mode.string_normalization,
963 elt = EmptyLineTracker(is_pyi=mode.is_pyi)
966 split_line_features = {
968 for feature in {Feature.TRAILING_COMMA_IN_CALL, Feature.TRAILING_COMMA_IN_DEF}
969 if supports_feature(versions, feature)
971 for current_line in lines.visit(src_node):
972 dst_contents.append(str(empty_line) * after)
973 before, after = elt.maybe_empty_lines(current_line)
974 dst_contents.append(str(empty_line) * before)
975 for line in transform_line(
977 line_length=mode.line_length,
978 normalize_strings=mode.string_normalization,
979 features=split_line_features,
981 dst_contents.append(str(line))
982 return "".join(dst_contents)
985 def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine]:
986 """Return a tuple of (decoded_contents, encoding, newline).
988 `newline` is either CRLF or LF but `decoded_contents` is decoded with
989 universal newlines (i.e. only contains LF).
991 srcbuf = io.BytesIO(src)
992 encoding, lines = tokenize.detect_encoding(srcbuf.readline)
994 return "", encoding, "\n"
996 newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
998 with io.TextIOWrapper(srcbuf, encoding) as tiow:
999 return tiow.read(), encoding, newline
1002 def get_grammars(target_versions: Set[TargetVersion]) -> List[Grammar]:
1003 if not target_versions:
1004 # No target_version specified, so try all grammars.
1007 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords,
1009 pygram.python_grammar_no_print_statement_no_exec_statement,
1010 # Python 2.7 with future print_function import
1011 pygram.python_grammar_no_print_statement,
1013 pygram.python_grammar,
1016 if all(version.is_python2() for version in target_versions):
1017 # Python 2-only code, so try Python 2 grammars.
1019 # Python 2.7 with future print_function import
1020 pygram.python_grammar_no_print_statement,
1022 pygram.python_grammar,
1025 # Python 3-compatible code, so only try Python 3 grammar.
1027 # If we have to parse both, try to parse async as a keyword first
1028 if not supports_feature(target_versions, Feature.ASYNC_IDENTIFIERS):
1031 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords
1033 if not supports_feature(target_versions, Feature.ASYNC_KEYWORDS):
1035 grammars.append(pygram.python_grammar_no_print_statement_no_exec_statement)
1036 # At least one of the above branches must have been taken, because every Python
1037 # version has exactly one of the two 'ASYNC_*' flags
1041 def lib2to3_parse(src_txt: str, target_versions: Iterable[TargetVersion] = ()) -> Node:
1042 """Given a string with source, return the lib2to3 Node."""
1043 if src_txt[-1:] != "\n":
1046 for grammar in get_grammars(set(target_versions)):
1047 drv = driver.Driver(grammar, pytree.convert)
1049 result = drv.parse_string(src_txt, True)
1052 except ParseError as pe:
1053 lineno, column = pe.context[1]
1054 lines = src_txt.splitlines()
1056 faulty_line = lines[lineno - 1]
1058 faulty_line = "<line number missing in source>"
1059 exc = InvalidInput(f"Cannot parse: {lineno}:{column}: {faulty_line}")
1063 if isinstance(result, Leaf):
1064 result = Node(syms.file_input, [result])
1068 def lib2to3_unparse(node: Node) -> str:
1069 """Given a lib2to3 node, return its string representation."""
1074 class Visitor(Generic[T]):
1075 """Basic lib2to3 visitor that yields things of type `T` on `visit()`."""
1077 def visit(self, node: LN) -> Iterator[T]:
1078 """Main method to visit `node` and its children.
1080 It tries to find a `visit_*()` method for the given `node.type`, like
1081 `visit_simple_stmt` for Node objects or `visit_INDENT` for Leaf objects.
1082 If no dedicated `visit_*()` method is found, chooses `visit_default()`
1085 Then yields objects of type `T` from the selected visitor.
1088 name = token.tok_name[node.type]
1090 name = str(type_repr(node.type))
1091 # We explicitly branch on whether a visitor exists (instead of
1092 # using self.visit_default as the default arg to getattr) in order
1093 # to save needing to create a bound method object and so mypyc can
1094 # generate a native call to visit_default.
1095 visitf = getattr(self, f"visit_{name}", None)
1097 yield from visitf(node)
1099 yield from self.visit_default(node)
1101 def visit_default(self, node: LN) -> Iterator[T]:
1102 """Default `visit_*()` implementation. Recurses to children of `node`."""
1103 if isinstance(node, Node):
1104 for child in node.children:
1105 yield from self.visit(child)
1109 class DebugVisitor(Visitor[T]):
1112 def visit_default(self, node: LN) -> Iterator[T]:
1113 indent = " " * (2 * self.tree_depth)
1114 if isinstance(node, Node):
1115 _type = type_repr(node.type)
1116 out(f"{indent}{_type}", fg="yellow")
1117 self.tree_depth += 1
1118 for child in node.children:
1119 yield from self.visit(child)
1121 self.tree_depth -= 1
1122 out(f"{indent}/{_type}", fg="yellow", bold=False)
1124 _type = token.tok_name.get(node.type, str(node.type))
1125 out(f"{indent}{_type}", fg="blue", nl=False)
1127 # We don't have to handle prefixes for `Node` objects since
1128 # that delegates to the first child anyway.
1129 out(f" {node.prefix!r}", fg="green", bold=False, nl=False)
1130 out(f" {node.value!r}", fg="blue", bold=False)
1133 def show(cls, code: Union[str, Leaf, Node]) -> None:
1134 """Pretty-print the lib2to3 AST of a given string of `code`.
1136 Convenience method for debugging.
1138 v: DebugVisitor[None] = DebugVisitor()
1139 if isinstance(code, str):
1140 code = lib2to3_parse(code)
1144 WHITESPACE: Final = {token.DEDENT, token.INDENT, token.NEWLINE}
1145 STATEMENT: Final = {
1155 STANDALONE_COMMENT: Final = 153
1156 token.tok_name[STANDALONE_COMMENT] = "STANDALONE_COMMENT"
1157 LOGIC_OPERATORS: Final = {"and", "or"}
1158 COMPARATORS: Final = {
1166 MATH_OPERATORS: Final = {
1182 STARS: Final = {token.STAR, token.DOUBLESTAR}
1183 VARARGS_SPECIALS: Final = STARS | {token.SLASH}
1184 VARARGS_PARENTS: Final = {
1186 syms.argument, # double star in arglist
1187 syms.trailer, # single argument to call
1189 syms.varargslist, # lambdas
1191 UNPACKING_PARENTS: Final = {
1192 syms.atom, # single element of a list or set literal
1196 syms.testlist_star_expr,
1198 TEST_DESCENDANTS: Final = {
1215 ASSIGNMENTS: Final = {
1231 COMPREHENSION_PRIORITY: Final = 20
1232 COMMA_PRIORITY: Final = 18
1233 TERNARY_PRIORITY: Final = 16
1234 LOGIC_PRIORITY: Final = 14
1235 STRING_PRIORITY: Final = 12
1236 COMPARATOR_PRIORITY: Final = 10
1237 MATH_PRIORITIES: Final = {
1239 token.CIRCUMFLEX: 8,
1242 token.RIGHTSHIFT: 6,
1247 token.DOUBLESLASH: 4,
1251 token.DOUBLESTAR: 2,
1253 DOT_PRIORITY: Final = 1
1257 class BracketTracker:
1258 """Keeps track of brackets on a line."""
1261 bracket_match: Dict[Tuple[Depth, NodeType], Leaf] = field(default_factory=dict)
1262 delimiters: Dict[LeafID, Priority] = field(default_factory=dict)
1263 previous: Optional[Leaf] = None
1264 _for_loop_depths: List[int] = field(default_factory=list)
1265 _lambda_argument_depths: List[int] = field(default_factory=list)
1267 def mark(self, leaf: Leaf) -> None:
1268 """Mark `leaf` with bracket-related metadata. Keep track of delimiters.
1270 All leaves receive an int `bracket_depth` field that stores how deep
1271 within brackets a given leaf is. 0 means there are no enclosing brackets
1272 that started on this line.
1274 If a leaf is itself a closing bracket, it receives an `opening_bracket`
1275 field that it forms a pair with. This is a one-directional link to
1276 avoid reference cycles.
1278 If a leaf is a delimiter (a token on which Black can split the line if
1279 needed) and it's on depth 0, its `id()` is stored in the tracker's
1282 if leaf.type == token.COMMENT:
1285 self.maybe_decrement_after_for_loop_variable(leaf)
1286 self.maybe_decrement_after_lambda_arguments(leaf)
1287 if leaf.type in CLOSING_BRACKETS:
1289 opening_bracket = self.bracket_match.pop((self.depth, leaf.type))
1290 leaf.opening_bracket = opening_bracket
1291 leaf.bracket_depth = self.depth
1293 delim = is_split_before_delimiter(leaf, self.previous)
1294 if delim and self.previous is not None:
1295 self.delimiters[id(self.previous)] = delim
1297 delim = is_split_after_delimiter(leaf, self.previous)
1299 self.delimiters[id(leaf)] = delim
1300 if leaf.type in OPENING_BRACKETS:
1301 self.bracket_match[self.depth, BRACKET[leaf.type]] = leaf
1303 self.previous = leaf
1304 self.maybe_increment_lambda_arguments(leaf)
1305 self.maybe_increment_for_loop_variable(leaf)
1307 def any_open_brackets(self) -> bool:
1308 """Return True if there is an yet unmatched open bracket on the line."""
1309 return bool(self.bracket_match)
1311 def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> Priority:
1312 """Return the highest priority of a delimiter found on the line.
1314 Values are consistent with what `is_split_*_delimiter()` return.
1315 Raises ValueError on no delimiters.
1317 return max(v for k, v in self.delimiters.items() if k not in exclude)
1319 def delimiter_count_with_priority(self, priority: Priority = 0) -> int:
1320 """Return the number of delimiters with the given `priority`.
1322 If no `priority` is passed, defaults to max priority on the line.
1324 if not self.delimiters:
1327 priority = priority or self.max_delimiter_priority()
1328 return sum(1 for p in self.delimiters.values() if p == priority)
1330 def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
1331 """In a for loop, or comprehension, the variables are often unpacks.
1333 To avoid splitting on the comma in this situation, increase the depth of
1334 tokens between `for` and `in`.
1336 if leaf.type == token.NAME and leaf.value == "for":
1338 self._for_loop_depths.append(self.depth)
1343 def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
1344 """See `maybe_increment_for_loop_variable` above for explanation."""
1346 self._for_loop_depths
1347 and self._for_loop_depths[-1] == self.depth
1348 and leaf.type == token.NAME
1349 and leaf.value == "in"
1352 self._for_loop_depths.pop()
1357 def maybe_increment_lambda_arguments(self, leaf: Leaf) -> bool:
1358 """In a lambda expression, there might be more than one argument.
1360 To avoid splitting on the comma in this situation, increase the depth of
1361 tokens between `lambda` and `:`.
1363 if leaf.type == token.NAME and leaf.value == "lambda":
1365 self._lambda_argument_depths.append(self.depth)
1370 def maybe_decrement_after_lambda_arguments(self, leaf: Leaf) -> bool:
1371 """See `maybe_increment_lambda_arguments` above for explanation."""
1373 self._lambda_argument_depths
1374 and self._lambda_argument_depths[-1] == self.depth
1375 and leaf.type == token.COLON
1378 self._lambda_argument_depths.pop()
1383 def get_open_lsqb(self) -> Optional[Leaf]:
1384 """Return the most recent opening square bracket (if any)."""
1385 return self.bracket_match.get((self.depth - 1, token.RSQB))
1390 """Holds leaves and comments. Can be printed with `str(line)`."""
1393 leaves: List[Leaf] = field(default_factory=list)
1394 # keys ordered like `leaves`
1395 comments: Dict[LeafID, List[Leaf]] = field(default_factory=dict)
1396 bracket_tracker: BracketTracker = field(default_factory=BracketTracker)
1397 inside_brackets: bool = False
1398 should_explode: bool = False
1400 def append(self, leaf: Leaf, preformatted: bool = False) -> None:
1401 """Add a new `leaf` to the end of the line.
1403 Unless `preformatted` is True, the `leaf` will receive a new consistent
1404 whitespace prefix and metadata applied by :class:`BracketTracker`.
1405 Trailing commas are maybe removed, unpacked for loop variables are
1406 demoted from being delimiters.
1408 Inline comments are put aside.
1410 has_value = leaf.type in BRACKETS or bool(leaf.value.strip())
1414 if token.COLON == leaf.type and self.is_class_paren_empty:
1415 del self.leaves[-2:]
1416 if self.leaves and not preformatted:
1417 # Note: at this point leaf.prefix should be empty except for
1418 # imports, for which we only preserve newlines.
1419 leaf.prefix += whitespace(
1420 leaf, complex_subscript=self.is_complex_subscript(leaf)
1422 if self.inside_brackets or not preformatted:
1423 self.bracket_tracker.mark(leaf)
1424 self.maybe_remove_trailing_comma(leaf)
1425 if not self.append_comment(leaf):
1426 self.leaves.append(leaf)
1428 def append_safe(self, leaf: Leaf, preformatted: bool = False) -> None:
1429 """Like :func:`append()` but disallow invalid standalone comment structure.
1431 Raises ValueError when any `leaf` is appended after a standalone comment
1432 or when a standalone comment is not the first leaf on the line.
1434 if self.bracket_tracker.depth == 0:
1436 raise ValueError("cannot append to standalone comments")
1438 if self.leaves and leaf.type == STANDALONE_COMMENT:
1440 "cannot append standalone comments to a populated line"
1443 self.append(leaf, preformatted=preformatted)
1446 def is_comment(self) -> bool:
1447 """Is this line a standalone comment?"""
1448 return len(self.leaves) == 1 and self.leaves[0].type == STANDALONE_COMMENT
1451 def is_decorator(self) -> bool:
1452 """Is this line a decorator?"""
1453 return bool(self) and self.leaves[0].type == token.AT
1456 def is_import(self) -> bool:
1457 """Is this an import line?"""
1458 return bool(self) and is_import(self.leaves[0])
1461 def is_class(self) -> bool:
1462 """Is this line a class definition?"""
1465 and self.leaves[0].type == token.NAME
1466 and self.leaves[0].value == "class"
1470 def is_stub_class(self) -> bool:
1471 """Is this line a class definition with a body consisting only of "..."?"""
1472 return self.is_class and self.leaves[-3:] == [
1473 Leaf(token.DOT, ".") for _ in range(3)
1477 def is_collection_with_optional_trailing_comma(self) -> bool:
1478 """Is this line a collection literal with a trailing comma that's optional?
1480 Note that the trailing comma in a 1-tuple is not optional.
1482 if not self.leaves or len(self.leaves) < 4:
1485 # Look for and address a trailing colon.
1486 if self.leaves[-1].type == token.COLON:
1487 closer = self.leaves[-2]
1490 closer = self.leaves[-1]
1492 if closer.type not in CLOSING_BRACKETS or self.inside_brackets:
1495 if closer.type == token.RPAR:
1496 # Tuples require an extra check, because if there's only
1497 # one element in the tuple removing the comma unmakes the
1500 # We also check for parens before looking for the trailing
1501 # comma because in some cases (eg assigning a dict
1502 # literal) the literal gets wrapped in temporary parens
1503 # during parsing. This case is covered by the
1504 # collections.py test data.
1505 opener = closer.opening_bracket
1506 for _open_index, leaf in enumerate(self.leaves):
1511 # Couldn't find the matching opening paren, play it safe.
1515 comma_depth = self.leaves[close_index - 1].bracket_depth
1516 for leaf in self.leaves[_open_index + 1 : close_index]:
1517 if leaf.bracket_depth == comma_depth and leaf.type == token.COMMA:
1520 # We haven't looked yet for the trailing comma because
1521 # we might also have caught noop parens.
1522 return self.leaves[close_index - 1].type == token.COMMA
1525 return False # it's either a one-tuple or didn't have a trailing comma
1527 if self.leaves[close_index - 1].type in CLOSING_BRACKETS:
1529 closer = self.leaves[close_index]
1530 if closer.type == token.RPAR:
1531 # TODO: this is a gut feeling. Will we ever see this?
1534 if self.leaves[close_index - 1].type != token.COMMA:
1540 def is_def(self) -> bool:
1541 """Is this a function definition? (Also returns True for async defs.)"""
1543 first_leaf = self.leaves[0]
1548 second_leaf: Optional[Leaf] = self.leaves[1]
1551 return (first_leaf.type == token.NAME and first_leaf.value == "def") or (
1552 first_leaf.type == token.ASYNC
1553 and second_leaf is not None
1554 and second_leaf.type == token.NAME
1555 and second_leaf.value == "def"
1559 def is_class_paren_empty(self) -> bool:
1560 """Is this a class with no base classes but using parentheses?
1562 Those are unnecessary and should be removed.
1566 and len(self.leaves) == 4
1568 and self.leaves[2].type == token.LPAR
1569 and self.leaves[2].value == "("
1570 and self.leaves[3].type == token.RPAR
1571 and self.leaves[3].value == ")"
1575 def is_triple_quoted_string(self) -> bool:
1576 """Is the line a triple quoted string?"""
1579 and self.leaves[0].type == token.STRING
1580 and self.leaves[0].value.startswith(('"""', "'''"))
1583 def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
1584 """If so, needs to be split before emitting."""
1585 for leaf in self.leaves:
1586 if leaf.type == STANDALONE_COMMENT and leaf.bracket_depth <= depth_limit:
1591 def contains_uncollapsable_type_comments(self) -> bool:
1594 last_leaf = self.leaves[-1]
1595 ignored_ids.add(id(last_leaf))
1596 if last_leaf.type == token.COMMA or (
1597 last_leaf.type == token.RPAR and not last_leaf.value
1599 # When trailing commas or optional parens are inserted by Black for
1600 # consistency, comments after the previous last element are not moved
1601 # (they don't have to, rendering will still be correct). So we ignore
1602 # trailing commas and invisible.
1603 last_leaf = self.leaves[-2]
1604 ignored_ids.add(id(last_leaf))
1608 # A type comment is uncollapsable if it is attached to a leaf
1609 # that isn't at the end of the line (since that could cause it
1610 # to get associated to a different argument) or if there are
1611 # comments before it (since that could cause it to get hidden
1613 comment_seen = False
1614 for leaf_id, comments in self.comments.items():
1615 for comment in comments:
1616 if is_type_comment(comment):
1617 if comment_seen or (
1618 not is_type_comment(comment, " ignore")
1619 and leaf_id not in ignored_ids
1627 def contains_unsplittable_type_ignore(self) -> bool:
1631 # If a 'type: ignore' is attached to the end of a line, we
1632 # can't split the line, because we can't know which of the
1633 # subexpressions the ignore was meant to apply to.
1635 # We only want this to apply to actual physical lines from the
1636 # original source, though: we don't want the presence of a
1637 # 'type: ignore' at the end of a multiline expression to
1638 # justify pushing it all onto one line. Thus we
1639 # (unfortunately) need to check the actual source lines and
1640 # only report an unsplittable 'type: ignore' if this line was
1641 # one line in the original code.
1643 # Grab the first and last line numbers, skipping generated leaves
1644 first_line = next((leaf.lineno for leaf in self.leaves if leaf.lineno != 0), 0)
1646 (leaf.lineno for leaf in reversed(self.leaves) if leaf.lineno != 0), 0
1649 if first_line == last_line:
1650 # We look at the last two leaves since a comma or an
1651 # invisible paren could have been added at the end of the
1653 for node in self.leaves[-2:]:
1654 for comment in self.comments.get(id(node), []):
1655 if is_type_comment(comment, " ignore"):
1660 def contains_multiline_strings(self) -> bool:
1661 return any(is_multiline_string(leaf) for leaf in self.leaves)
1663 def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
1664 """Remove trailing comma if there is one and it's safe."""
1665 if not (self.leaves and self.leaves[-1].type == token.COMMA):
1668 # We remove trailing commas only in the case of importing a
1669 # single name from a module.
1673 and len(self.leaves) > 4
1674 and self.leaves[-1].type == token.COMMA
1675 and closing.type in CLOSING_BRACKETS
1676 and self.leaves[-4].type == token.NAME
1678 # regular `from foo import bar,`
1679 self.leaves[-4].value == "import"
1680 # `from foo import (bar as baz,)
1682 len(self.leaves) > 6
1683 and self.leaves[-6].value == "import"
1684 and self.leaves[-3].value == "as"
1686 # `from foo import bar as baz,`
1688 len(self.leaves) > 5
1689 and self.leaves[-5].value == "import"
1690 and self.leaves[-3].value == "as"
1693 and closing.type == token.RPAR
1697 self.remove_trailing_comma()
1700 def append_comment(self, comment: Leaf) -> bool:
1701 """Add an inline or standalone comment to the line."""
1703 comment.type == STANDALONE_COMMENT
1704 and self.bracket_tracker.any_open_brackets()
1709 if comment.type != token.COMMENT:
1713 comment.type = STANDALONE_COMMENT
1717 last_leaf = self.leaves[-1]
1719 last_leaf.type == token.RPAR
1720 and not last_leaf.value
1721 and last_leaf.parent
1722 and len(list(last_leaf.parent.leaves())) <= 3
1723 and not is_type_comment(comment)
1725 # Comments on an optional parens wrapping a single leaf should belong to
1726 # the wrapped node except if it's a type comment. Pinning the comment like
1727 # this avoids unstable formatting caused by comment migration.
1728 if len(self.leaves) < 2:
1729 comment.type = STANDALONE_COMMENT
1733 last_leaf = self.leaves[-2]
1734 self.comments.setdefault(id(last_leaf), []).append(comment)
1737 def comments_after(self, leaf: Leaf) -> List[Leaf]:
1738 """Generate comments that should appear directly after `leaf`."""
1739 return self.comments.get(id(leaf), [])
1741 def remove_trailing_comma(self) -> None:
1742 """Remove the trailing comma and moves the comments attached to it."""
1743 trailing_comma = self.leaves.pop()
1744 trailing_comma_comments = self.comments.pop(id(trailing_comma), [])
1745 self.comments.setdefault(id(self.leaves[-1]), []).extend(
1746 trailing_comma_comments
1749 def is_complex_subscript(self, leaf: Leaf) -> bool:
1750 """Return True iff `leaf` is part of a slice with non-trivial exprs."""
1751 open_lsqb = self.bracket_tracker.get_open_lsqb()
1752 if open_lsqb is None:
1755 subscript_start = open_lsqb.next_sibling
1757 if isinstance(subscript_start, Node):
1758 if subscript_start.type == syms.listmaker:
1761 if subscript_start.type == syms.subscriptlist:
1762 subscript_start = child_towards(subscript_start, leaf)
1763 return subscript_start is not None and any(
1764 n.type in TEST_DESCENDANTS for n in subscript_start.pre_order()
1767 def clone(self) -> "Line":
1770 inside_brackets=self.inside_brackets,
1771 should_explode=self.should_explode,
1774 def __str__(self) -> str:
1775 """Render the line."""
1779 indent = " " * self.depth
1780 leaves = iter(self.leaves)
1781 first = next(leaves)
1782 res = f"{first.prefix}{indent}{first.value}"
1785 for comment in itertools.chain.from_iterable(self.comments.values()):
1790 def __bool__(self) -> bool:
1791 """Return True if the line has leaves or comments."""
1792 return bool(self.leaves or self.comments)
1796 class EmptyLineTracker:
1797 """Provides a stateful method that returns the number of potential extra
1798 empty lines needed before and after the currently processed line.
1800 Note: this tracker works on lines that haven't been split yet. It assumes
1801 the prefix of the first leaf consists of optional newlines. Those newlines
1802 are consumed by `maybe_empty_lines()` and included in the computation.
1805 is_pyi: bool = False
1806 previous_line: Optional[Line] = None
1807 previous_after: int = 0
1808 previous_defs: List[int] = field(default_factory=list)
1810 def maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1811 """Return the number of extra empty lines before and after the `current_line`.
1813 This is for separating `def`, `async def` and `class` with extra empty
1814 lines (two on module-level).
1816 before, after = self._maybe_empty_lines(current_line)
1818 # Black should not insert empty lines at the beginning
1821 if self.previous_line is None
1822 else before - self.previous_after
1824 self.previous_after = after
1825 self.previous_line = current_line
1826 return before, after
1828 def _maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1830 if current_line.depth == 0:
1831 max_allowed = 1 if self.is_pyi else 2
1832 if current_line.leaves:
1833 # Consume the first leaf's extra newlines.
1834 first_leaf = current_line.leaves[0]
1835 before = first_leaf.prefix.count("\n")
1836 before = min(before, max_allowed)
1837 first_leaf.prefix = ""
1840 depth = current_line.depth
1841 while self.previous_defs and self.previous_defs[-1] >= depth:
1842 self.previous_defs.pop()
1844 before = 0 if depth else 1
1846 before = 1 if depth else 2
1847 if current_line.is_decorator or current_line.is_def or current_line.is_class:
1848 return self._maybe_empty_lines_for_class_or_def(current_line, before)
1852 and self.previous_line.is_import
1853 and not current_line.is_import
1854 and depth == self.previous_line.depth
1856 return (before or 1), 0
1860 and self.previous_line.is_class
1861 and current_line.is_triple_quoted_string
1867 def _maybe_empty_lines_for_class_or_def(
1868 self, current_line: Line, before: int
1869 ) -> Tuple[int, int]:
1870 if not current_line.is_decorator:
1871 self.previous_defs.append(current_line.depth)
1872 if self.previous_line is None:
1873 # Don't insert empty lines before the first line in the file.
1876 if self.previous_line.is_decorator:
1879 if self.previous_line.depth < current_line.depth and (
1880 self.previous_line.is_class or self.previous_line.is_def
1885 self.previous_line.is_comment
1886 and self.previous_line.depth == current_line.depth
1892 if self.previous_line.depth > current_line.depth:
1894 elif current_line.is_class or self.previous_line.is_class:
1895 if current_line.is_stub_class and self.previous_line.is_stub_class:
1896 # No blank line between classes with an empty body
1900 elif current_line.is_def and not self.previous_line.is_def:
1901 # Blank line between a block of functions and a block of non-functions
1907 if current_line.depth and newlines:
1913 class LineGenerator(Visitor[Line]):
1914 """Generates reformatted Line objects. Empty lines are not emitted.
1916 Note: destroys the tree it's visiting by mutating prefixes of its leaves
1917 in ways that will no longer stringify to valid Python code on the tree.
1920 is_pyi: bool = False
1921 normalize_strings: bool = True
1922 current_line: Line = field(default_factory=Line)
1923 remove_u_prefix: bool = False
1925 def line(self, indent: int = 0) -> Iterator[Line]:
1928 If the line is empty, only emit if it makes sense.
1929 If the line is too long, split it first and then generate.
1931 If any lines were generated, set up a new current_line.
1933 if not self.current_line:
1934 self.current_line.depth += indent
1935 return # Line is empty, don't emit. Creating a new one unnecessary.
1937 complete_line = self.current_line
1938 self.current_line = Line(depth=complete_line.depth + indent)
1941 def visit_default(self, node: LN) -> Iterator[Line]:
1942 """Default `visit_*()` implementation. Recurses to children of `node`."""
1943 if isinstance(node, Leaf):
1944 any_open_brackets = self.current_line.bracket_tracker.any_open_brackets()
1945 for comment in generate_comments(node):
1946 if any_open_brackets:
1947 # any comment within brackets is subject to splitting
1948 self.current_line.append(comment)
1949 elif comment.type == token.COMMENT:
1950 # regular trailing comment
1951 self.current_line.append(comment)
1952 yield from self.line()
1955 # regular standalone comment
1956 yield from self.line()
1958 self.current_line.append(comment)
1959 yield from self.line()
1961 normalize_prefix(node, inside_brackets=any_open_brackets)
1962 if self.normalize_strings and node.type == token.STRING:
1963 normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
1964 normalize_string_quotes(node)
1965 if node.type == token.NUMBER:
1966 normalize_numeric_literal(node)
1967 if node.type not in WHITESPACE:
1968 self.current_line.append(node)
1969 yield from super().visit_default(node)
1971 def visit_INDENT(self, node: Leaf) -> Iterator[Line]:
1972 """Increase indentation level, maybe yield a line."""
1973 # In blib2to3 INDENT never holds comments.
1974 yield from self.line(+1)
1975 yield from self.visit_default(node)
1977 def visit_DEDENT(self, node: Leaf) -> Iterator[Line]:
1978 """Decrease indentation level, maybe yield a line."""
1979 # The current line might still wait for trailing comments. At DEDENT time
1980 # there won't be any (they would be prefixes on the preceding NEWLINE).
1981 # Emit the line then.
1982 yield from self.line()
1984 # While DEDENT has no value, its prefix may contain standalone comments
1985 # that belong to the current indentation level. Get 'em.
1986 yield from self.visit_default(node)
1988 # Finally, emit the dedent.
1989 yield from self.line(-1)
1992 self, node: Node, keywords: Set[str], parens: Set[str]
1993 ) -> Iterator[Line]:
1994 """Visit a statement.
1996 This implementation is shared for `if`, `while`, `for`, `try`, `except`,
1997 `def`, `with`, `class`, `assert` and assignments.
1999 The relevant Python language `keywords` for a given statement will be
2000 NAME leaves within it. This methods puts those on a separate line.
2002 `parens` holds a set of string leaf values immediately after which
2003 invisible parens should be put.
2005 normalize_invisible_parens(node, parens_after=parens)
2006 for child in node.children:
2007 if child.type == token.NAME and child.value in keywords: # type: ignore
2008 yield from self.line()
2010 yield from self.visit(child)
2012 def visit_suite(self, node: Node) -> Iterator[Line]:
2013 """Visit a suite."""
2014 if self.is_pyi and is_stub_suite(node):
2015 yield from self.visit(node.children[2])
2017 yield from self.visit_default(node)
2019 def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
2020 """Visit a statement without nested statements."""
2021 is_suite_like = node.parent and node.parent.type in STATEMENT
2023 if self.is_pyi and is_stub_body(node):
2024 yield from self.visit_default(node)
2026 yield from self.line(+1)
2027 yield from self.visit_default(node)
2028 yield from self.line(-1)
2031 if not self.is_pyi or not node.parent or not is_stub_suite(node.parent):
2032 yield from self.line()
2033 yield from self.visit_default(node)
2035 def visit_async_stmt(self, node: Node) -> Iterator[Line]:
2036 """Visit `async def`, `async for`, `async with`."""
2037 yield from self.line()
2039 children = iter(node.children)
2040 for child in children:
2041 yield from self.visit(child)
2043 if child.type == token.ASYNC:
2046 internal_stmt = next(children)
2047 for child in internal_stmt.children:
2048 yield from self.visit(child)
2050 def visit_decorators(self, node: Node) -> Iterator[Line]:
2051 """Visit decorators."""
2052 for child in node.children:
2053 yield from self.line()
2054 yield from self.visit(child)
2056 def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
2057 """Remove a semicolon and put the other statement on a separate line."""
2058 yield from self.line()
2060 def visit_ENDMARKER(self, leaf: Leaf) -> Iterator[Line]:
2061 """End of file. Process outstanding comments and end with a newline."""
2062 yield from self.visit_default(leaf)
2063 yield from self.line()
2065 def visit_STANDALONE_COMMENT(self, leaf: Leaf) -> Iterator[Line]:
2066 if not self.current_line.bracket_tracker.any_open_brackets():
2067 yield from self.line()
2068 yield from self.visit_default(leaf)
2070 def visit_factor(self, node: Node) -> Iterator[Line]:
2071 """Force parentheses between a unary op and a binary power:
2073 -2 ** 8 -> -(2 ** 8)
2075 _operator, operand = node.children
2077 operand.type == syms.power
2078 and len(operand.children) == 3
2079 and operand.children[1].type == token.DOUBLESTAR
2081 lpar = Leaf(token.LPAR, "(")
2082 rpar = Leaf(token.RPAR, ")")
2083 index = operand.remove() or 0
2084 node.insert_child(index, Node(syms.atom, [lpar, operand, rpar]))
2085 yield from self.visit_default(node)
2087 def visit_STRING(self, leaf: Leaf) -> Iterator[Line]:
2088 # Check if it's a docstring
2089 if prev_siblings_are(
2090 leaf.parent, [None, token.NEWLINE, token.INDENT, syms.simple_stmt]
2091 ) and is_multiline_string(leaf):
2092 prefix = " " * self.current_line.depth
2093 docstring = fix_docstring(leaf.value[3:-3], prefix)
2094 leaf.value = leaf.value[0:3] + docstring + leaf.value[-3:]
2095 normalize_string_quotes(leaf)
2097 yield from self.visit_default(leaf)
2099 def __post_init__(self) -> None:
2100 """You are in a twisty little maze of passages."""
2103 self.visit_assert_stmt = partial(v, keywords={"assert"}, parens={"assert", ","})
2104 self.visit_if_stmt = partial(
2105 v, keywords={"if", "else", "elif"}, parens={"if", "elif"}
2107 self.visit_while_stmt = partial(v, keywords={"while", "else"}, parens={"while"})
2108 self.visit_for_stmt = partial(v, keywords={"for", "else"}, parens={"for", "in"})
2109 self.visit_try_stmt = partial(
2110 v, keywords={"try", "except", "else", "finally"}, parens=Ø
2112 self.visit_except_clause = partial(v, keywords={"except"}, parens=Ø)
2113 self.visit_with_stmt = partial(v, keywords={"with"}, parens=Ø)
2114 self.visit_funcdef = partial(v, keywords={"def"}, parens=Ø)
2115 self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
2116 self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
2117 self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
2118 self.visit_import_from = partial(v, keywords=Ø, parens={"import"})
2119 self.visit_del_stmt = partial(v, keywords=Ø, parens={"del"})
2120 self.visit_async_funcdef = self.visit_async_stmt
2121 self.visit_decorated = self.visit_decorators
2124 IMPLICIT_TUPLE = {syms.testlist, syms.testlist_star_expr, syms.exprlist}
2125 BRACKET = {token.LPAR: token.RPAR, token.LSQB: token.RSQB, token.LBRACE: token.RBRACE}
2126 OPENING_BRACKETS = set(BRACKET.keys())
2127 CLOSING_BRACKETS = set(BRACKET.values())
2128 BRACKETS = OPENING_BRACKETS | CLOSING_BRACKETS
2129 ALWAYS_NO_SPACE = CLOSING_BRACKETS | {token.COMMA, STANDALONE_COMMENT}
2132 def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str: # noqa: C901
2133 """Return whitespace prefix if needed for the given `leaf`.
2135 `complex_subscript` signals whether the given leaf is part of a subscription
2136 which has non-trivial arguments, like arithmetic expressions or function calls.
2144 if t in ALWAYS_NO_SPACE:
2147 if t == token.COMMENT:
2150 assert p is not None, f"INTERNAL ERROR: hand-made leaf without parent: {leaf!r}"
2151 if t == token.COLON and p.type not in {
2158 prev = leaf.prev_sibling
2160 prevp = preceding_leaf(p)
2161 if not prevp or prevp.type in OPENING_BRACKETS:
2164 if t == token.COLON:
2165 if prevp.type == token.COLON:
2168 elif prevp.type != token.COMMA and not complex_subscript:
2173 if prevp.type == token.EQUAL:
2175 if prevp.parent.type in {
2183 elif prevp.parent.type == syms.typedargslist:
2184 # A bit hacky: if the equal sign has whitespace, it means we
2185 # previously found it's a typed argument. So, we're using
2189 elif prevp.type in VARARGS_SPECIALS:
2190 if is_vararg(prevp, within=VARARGS_PARENTS | UNPACKING_PARENTS):
2193 elif prevp.type == token.COLON:
2194 if prevp.parent and prevp.parent.type in {syms.subscript, syms.sliceop}:
2195 return SPACE if complex_subscript else NO
2199 and prevp.parent.type == syms.factor
2200 and prevp.type in MATH_OPERATORS
2205 prevp.type == token.RIGHTSHIFT
2207 and prevp.parent.type == syms.shift_expr
2208 and prevp.prev_sibling
2209 and prevp.prev_sibling.type == token.NAME
2210 and prevp.prev_sibling.value == "print" # type: ignore
2212 # Python 2 print chevron
2215 elif prev.type in OPENING_BRACKETS:
2218 if p.type in {syms.parameters, syms.arglist}:
2219 # untyped function signatures or calls
2220 if not prev or prev.type != token.COMMA:
2223 elif p.type == syms.varargslist:
2225 if prev and prev.type != token.COMMA:
2228 elif p.type == syms.typedargslist:
2229 # typed function signatures
2233 if t == token.EQUAL:
2234 if prev.type != syms.tname:
2237 elif prev.type == token.EQUAL:
2238 # A bit hacky: if the equal sign has whitespace, it means we
2239 # previously found it's a typed argument. So, we're using that, too.
2242 elif prev.type != token.COMMA:
2245 elif p.type == syms.tname:
2248 prevp = preceding_leaf(p)
2249 if not prevp or prevp.type != token.COMMA:
2252 elif p.type == syms.trailer:
2253 # attributes and calls
2254 if t == token.LPAR or t == token.RPAR:
2259 prevp = preceding_leaf(p)
2260 if not prevp or prevp.type != token.NUMBER:
2263 elif t == token.LSQB:
2266 elif prev.type != token.COMMA:
2269 elif p.type == syms.argument:
2271 if t == token.EQUAL:
2275 prevp = preceding_leaf(p)
2276 if not prevp or prevp.type == token.LPAR:
2279 elif prev.type in {token.EQUAL} | VARARGS_SPECIALS:
2282 elif p.type == syms.decorator:
2286 elif p.type == syms.dotted_name:
2290 prevp = preceding_leaf(p)
2291 if not prevp or prevp.type == token.AT or prevp.type == token.DOT:
2294 elif p.type == syms.classdef:
2298 if prev and prev.type == token.LPAR:
2301 elif p.type in {syms.subscript, syms.sliceop}:
2304 assert p.parent is not None, "subscripts are always parented"
2305 if p.parent.type == syms.subscriptlist:
2310 elif not complex_subscript:
2313 elif p.type == syms.atom:
2314 if prev and t == token.DOT:
2315 # dots, but not the first one.
2318 elif p.type == syms.dictsetmaker:
2320 if prev and prev.type == token.DOUBLESTAR:
2323 elif p.type in {syms.factor, syms.star_expr}:
2326 prevp = preceding_leaf(p)
2327 if not prevp or prevp.type in OPENING_BRACKETS:
2330 prevp_parent = prevp.parent
2331 assert prevp_parent is not None
2332 if prevp.type == token.COLON and prevp_parent.type in {
2338 elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
2341 elif t in {token.NAME, token.NUMBER, token.STRING}:
2344 elif p.type == syms.import_from:
2346 if prev and prev.type == token.DOT:
2349 elif t == token.NAME:
2353 if prev and prev.type == token.DOT:
2356 elif p.type == syms.sliceop:
2362 def preceding_leaf(node: Optional[LN]) -> Optional[Leaf]:
2363 """Return the first leaf that precedes `node`, if any."""
2365 res = node.prev_sibling
2367 if isinstance(res, Leaf):
2371 return list(res.leaves())[-1]
2380 def prev_siblings_are(node: Optional[LN], tokens: List[Optional[NodeType]]) -> bool:
2381 """Return if the `node` and its previous siblings match types against the provided
2382 list of tokens; the provided `node`has its type matched against the last element in
2383 the list. `None` can be used as the first element to declare that the start of the
2384 list is anchored at the start of its parent's children."""
2387 if tokens[-1] is None:
2391 if node.type != tokens[-1]:
2393 return prev_siblings_are(node.prev_sibling, tokens[:-1])
2396 def child_towards(ancestor: Node, descendant: LN) -> Optional[LN]:
2397 """Return the child of `ancestor` that contains `descendant`."""
2398 node: Optional[LN] = descendant
2399 while node and node.parent != ancestor:
2404 def container_of(leaf: Leaf) -> LN:
2405 """Return `leaf` or one of its ancestors that is the topmost container of it.
2407 By "container" we mean a node where `leaf` is the very first child.
2409 same_prefix = leaf.prefix
2410 container: LN = leaf
2412 parent = container.parent
2416 if parent.children[0].prefix != same_prefix:
2419 if parent.type == syms.file_input:
2422 if parent.prev_sibling is not None and parent.prev_sibling.type in BRACKETS:
2429 def is_split_after_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2430 """Return the priority of the `leaf` delimiter, given a line break after it.
2432 The delimiter priorities returned here are from those delimiters that would
2433 cause a line break after themselves.
2435 Higher numbers are higher priority.
2437 if leaf.type == token.COMMA:
2438 return COMMA_PRIORITY
2443 def is_split_before_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2444 """Return the priority of the `leaf` delimiter, given a line break before it.
2446 The delimiter priorities returned here are from those delimiters that would
2447 cause a line break before themselves.
2449 Higher numbers are higher priority.
2451 if is_vararg(leaf, within=VARARGS_PARENTS | UNPACKING_PARENTS):
2452 # * and ** might also be MATH_OPERATORS but in this case they are not.
2453 # Don't treat them as a delimiter.
2457 leaf.type == token.DOT
2459 and leaf.parent.type not in {syms.import_from, syms.dotted_name}
2460 and (previous is None or previous.type in CLOSING_BRACKETS)
2465 leaf.type in MATH_OPERATORS
2467 and leaf.parent.type not in {syms.factor, syms.star_expr}
2469 return MATH_PRIORITIES[leaf.type]
2471 if leaf.type in COMPARATORS:
2472 return COMPARATOR_PRIORITY
2475 leaf.type == token.STRING
2476 and previous is not None
2477 and previous.type == token.STRING
2479 return STRING_PRIORITY
2481 if leaf.type not in {token.NAME, token.ASYNC}:
2487 and leaf.parent.type in {syms.comp_for, syms.old_comp_for}
2488 or leaf.type == token.ASYNC
2491 not isinstance(leaf.prev_sibling, Leaf)
2492 or leaf.prev_sibling.value != "async"
2494 return COMPREHENSION_PRIORITY
2499 and leaf.parent.type in {syms.comp_if, syms.old_comp_if}
2501 return COMPREHENSION_PRIORITY
2503 if leaf.value in {"if", "else"} and leaf.parent and leaf.parent.type == syms.test:
2504 return TERNARY_PRIORITY
2506 if leaf.value == "is":
2507 return COMPARATOR_PRIORITY
2512 and leaf.parent.type in {syms.comp_op, syms.comparison}
2514 previous is not None
2515 and previous.type == token.NAME
2516 and previous.value == "not"
2519 return COMPARATOR_PRIORITY
2524 and leaf.parent.type == syms.comp_op
2526 previous is not None
2527 and previous.type == token.NAME
2528 and previous.value == "is"
2531 return COMPARATOR_PRIORITY
2533 if leaf.value in LOGIC_OPERATORS and leaf.parent:
2534 return LOGIC_PRIORITY
2539 FMT_OFF = {"# fmt: off", "# fmt:off", "# yapf: disable"}
2540 FMT_ON = {"# fmt: on", "# fmt:on", "# yapf: enable"}
2543 def generate_comments(leaf: LN) -> Iterator[Leaf]:
2544 """Clean the prefix of the `leaf` and generate comments from it, if any.
2546 Comments in lib2to3 are shoved into the whitespace prefix. This happens
2547 in `pgen2/driver.py:Driver.parse_tokens()`. This was a brilliant implementation
2548 move because it does away with modifying the grammar to include all the
2549 possible places in which comments can be placed.
2551 The sad consequence for us though is that comments don't "belong" anywhere.
2552 This is why this function generates simple parentless Leaf objects for
2553 comments. We simply don't know what the correct parent should be.
2555 No matter though, we can live without this. We really only need to
2556 differentiate between inline and standalone comments. The latter don't
2557 share the line with any code.
2559 Inline comments are emitted as regular token.COMMENT leaves. Standalone
2560 are emitted with a fake STANDALONE_COMMENT token identifier.
2562 for pc in list_comments(leaf.prefix, is_endmarker=leaf.type == token.ENDMARKER):
2563 yield Leaf(pc.type, pc.value, prefix="\n" * pc.newlines)
2568 """Describes a piece of syntax that is a comment.
2570 It's not a :class:`blib2to3.pytree.Leaf` so that:
2572 * it can be cached (`Leaf` objects should not be reused more than once as
2573 they store their lineno, column, prefix, and parent information);
2574 * `newlines` and `consumed` fields are kept separate from the `value`. This
2575 simplifies handling of special marker comments like ``# fmt: off/on``.
2578 type: int # token.COMMENT or STANDALONE_COMMENT
2579 value: str # content of the comment
2580 newlines: int # how many newlines before the comment
2581 consumed: int # how many characters of the original leaf's prefix did we consume
2584 @lru_cache(maxsize=4096)
2585 def list_comments(prefix: str, *, is_endmarker: bool) -> List[ProtoComment]:
2586 """Return a list of :class:`ProtoComment` objects parsed from the given `prefix`."""
2587 result: List[ProtoComment] = []
2588 if not prefix or "#" not in prefix:
2594 for index, line in enumerate(prefix.split("\n")):
2595 consumed += len(line) + 1 # adding the length of the split '\n'
2596 line = line.lstrip()
2599 if not line.startswith("#"):
2600 # Escaped newlines outside of a comment are not really newlines at
2601 # all. We treat a single-line comment following an escaped newline
2602 # as a simple trailing comment.
2603 if line.endswith("\\"):
2607 if index == ignored_lines and not is_endmarker:
2608 comment_type = token.COMMENT # simple trailing comment
2610 comment_type = STANDALONE_COMMENT
2611 comment = make_comment(line)
2614 type=comment_type, value=comment, newlines=nlines, consumed=consumed
2621 def make_comment(content: str) -> str:
2622 """Return a consistently formatted comment from the given `content` string.
2624 All comments (except for "##", "#!", "#:", '#'", "#%%") should have a single
2625 space between the hash sign and the content.
2627 If `content` didn't start with a hash sign, one is provided.
2629 content = content.rstrip()
2633 if content[0] == "#":
2634 content = content[1:]
2635 if content and content[0] not in " !:#'%":
2636 content = " " + content
2637 return "#" + content
2643 normalize_strings: bool,
2644 features: Collection[Feature] = (),
2645 ) -> Iterator[Line]:
2646 """Transform a `line`, potentially splitting it into many lines.
2648 They should fit in the allotted `line_length` but might not be able to.
2650 `features` are syntactical features that may be used in the output.
2656 line_str = line_to_string(line)
2658 def init_st(ST: Type[StringTransformer]) -> StringTransformer:
2659 """Initialize StringTransformer"""
2660 return ST(line_length, normalize_strings)
2662 string_merge = init_st(StringMerger)
2663 string_paren_strip = init_st(StringParenStripper)
2664 string_split = init_st(StringSplitter)
2665 string_paren_wrap = init_st(StringParenWrapper)
2667 transformers: List[Transformer]
2669 not line.contains_uncollapsable_type_comments()
2670 and not line.should_explode
2671 and not line.is_collection_with_optional_trailing_comma
2673 is_line_short_enough(line, line_length=line_length, line_str=line_str)
2674 or line.contains_unsplittable_type_ignore()
2676 and not (line.contains_standalone_comments() and line.inside_brackets)
2678 # Only apply basic string preprocessing, since lines shouldn't be split here.
2679 transformers = [string_merge, string_paren_strip]
2681 transformers = [left_hand_split]
2684 def rhs(line: Line, features: Collection[Feature]) -> Iterator[Line]:
2685 for omit in generate_trailers_to_omit(line, line_length):
2686 lines = list(right_hand_split(line, line_length, features, omit=omit))
2687 if is_line_short_enough(lines[0], line_length=line_length):
2691 # All splits failed, best effort split with no omits.
2692 # This mostly happens to multiline strings that are by definition
2693 # reported as not fitting a single line.
2694 # line_length=1 here was historically a bug that somehow became a feature.
2695 # See #762 and #781 for the full story.
2696 yield from right_hand_split(line, line_length=1, features=features)
2698 if line.inside_brackets:
2703 standalone_comment_split,
2717 for transform in transformers:
2718 # We are accumulating lines in `result` because we might want to abort
2719 # mission and return the original line in the end, or attempt a different
2721 result: List[Line] = []
2723 for transformed_line in transform(line, features):
2724 if str(transformed_line).strip("\n") == line_str:
2725 raise CannotTransform(
2726 "Line transformer returned an unchanged result"
2732 line_length=line_length,
2733 normalize_strings=normalize_strings,
2737 except CannotTransform:
2747 @dataclass # type: ignore
2748 class StringTransformer(ABC):
2750 An implementation of the Transformer protocol that relies on its
2751 subclasses overriding the template methods `do_match(...)` and
2752 `do_transform(...)`.
2754 This Transformer works exclusively on strings (for example, by merging
2757 The following sections can be found among the docstrings of each concrete
2758 StringTransformer subclass.
2761 Which requirements must be met of the given Line for this
2762 StringTransformer to be applied?
2765 If the given Line meets all of the above requirements, which string
2766 transformations can you expect to be applied to it by this
2770 What contractual agreements does this StringTransformer have with other
2771 StringTransfomers? Such collaborations should be eliminated/minimized
2772 as much as possible.
2776 normalize_strings: bool
2779 def do_match(self, line: Line) -> TMatchResult:
2782 * Ok(string_idx) such that `line.leaves[string_idx]` is our target
2783 string, if a match was able to be made.
2785 * Err(CannotTransform), if a match was not able to be made.
2789 def do_transform(self, line: Line, string_idx: int) -> Iterator[TResult[Line]]:
2792 * Ok(new_line) where new_line is the new transformed line.
2794 * Err(CannotTransform) if the transformation failed for some reason. The
2795 `do_match(...)` template method should usually be used to reject
2796 the form of the given Line, but in some cases it is difficult to
2797 know whether or not a Line meets the StringTransformer's
2798 requirements until the transformation is already midway.
2801 This method should NOT mutate @line directly, but it MAY mutate the
2802 Line's underlying Node structure. (WARNING: If the underlying Node
2803 structure IS altered, then this method should NOT be allowed to
2804 yield an CannotTransform after that point.)
2807 def __call__(self, line: Line, _features: Collection[Feature]) -> Iterator[Line]:
2809 StringTransformer instances have a call signature that mirrors that of
2810 the Transformer type.
2813 CannotTransform(...) if the concrete StringTransformer class is unable
2816 # Optimization to avoid calling `self.do_match(...)` when the line does
2817 # not contain any string.
2818 if not any(leaf.type == token.STRING for leaf in line.leaves):
2819 raise CannotTransform("There are no strings in this line.")
2821 match_result = self.do_match(line)
2823 if isinstance(match_result, Err):
2824 cant_transform = match_result.err()
2825 raise CannotTransform(
2826 f"The string transformer {self.__class__.__name__} does not recognize"
2827 " this line as one that it can transform."
2828 ) from cant_transform
2830 string_idx = match_result.ok()
2832 for line_result in self.do_transform(line, string_idx):
2833 if isinstance(line_result, Err):
2834 cant_transform = line_result.err()
2835 raise CannotTransform(
2836 "StringTransformer failed while attempting to transform string."
2837 ) from cant_transform
2838 line = line_result.ok()
2844 """A custom (i.e. manual) string split.
2846 A single CustomSplit instance represents a single substring.
2849 Consider the following string:
2856 This string will correspond to the following three CustomSplit instances:
2858 CustomSplit(False, 16)
2859 CustomSplit(False, 17)
2860 CustomSplit(True, 16)
2868 class CustomSplitMapMixin:
2870 This mixin class is used to map merged strings to a sequence of
2871 CustomSplits, which will then be used to re-split the strings iff none of
2872 the resultant substrings go over the configured max line length.
2875 _Key = Tuple[StringID, str]
2876 _CUSTOM_SPLIT_MAP: Dict[_Key, Tuple[CustomSplit, ...]] = defaultdict(tuple)
2879 def _get_key(string: str) -> "CustomSplitMapMixin._Key":
2882 A unique identifier that is used internally to map @string to a
2883 group of custom splits.
2885 return (id(string), string)
2887 def add_custom_splits(
2888 self, string: str, custom_splits: Iterable[CustomSplit]
2890 """Custom Split Map Setter Method
2893 Adds a mapping from @string to the custom splits @custom_splits.
2895 key = self._get_key(string)
2896 self._CUSTOM_SPLIT_MAP[key] = tuple(custom_splits)
2898 def pop_custom_splits(self, string: str) -> List[CustomSplit]:
2899 """Custom Split Map Getter Method
2902 * A list of the custom splits that are mapped to @string, if any
2908 Deletes the mapping between @string and its associated custom
2909 splits (which are returned to the caller).
2911 key = self._get_key(string)
2913 custom_splits = self._CUSTOM_SPLIT_MAP[key]
2914 del self._CUSTOM_SPLIT_MAP[key]
2916 return list(custom_splits)
2918 def has_custom_splits(self, string: str) -> bool:
2921 True iff @string is associated with a set of custom splits.
2923 key = self._get_key(string)
2924 return key in self._CUSTOM_SPLIT_MAP
2927 class StringMerger(CustomSplitMapMixin, StringTransformer):
2928 """StringTransformer that merges strings together.
2931 (A) The line contains adjacent strings such that at most one substring
2932 has inline comments AND none of those inline comments are pragmas AND
2933 the set of all substring prefixes is either of length 1 or equal to
2934 {"", "f"} AND none of the substrings are raw strings (i.e. are prefixed
2937 (B) The line contains a string which uses line continuation backslashes.
2940 Depending on which of the two requirements above where met, either:
2942 (A) The string group associated with the target string is merged.
2944 (B) All line-continuation backslashes are removed from the target string.
2947 StringMerger provides custom split information to StringSplitter.
2950 def do_match(self, line: Line) -> TMatchResult:
2953 is_valid_index = is_valid_index_factory(LL)
2955 for (i, leaf) in enumerate(LL):
2957 leaf.type == token.STRING
2958 and is_valid_index(i + 1)
2959 and LL[i + 1].type == token.STRING
2963 if leaf.type == token.STRING and "\\\n" in leaf.value:
2966 return TErr("This line has no strings that need merging.")
2968 def do_transform(self, line: Line, string_idx: int) -> Iterator[TResult[Line]]:
2970 rblc_result = self.__remove_backslash_line_continuation_chars(
2971 new_line, string_idx
2973 if isinstance(rblc_result, Ok):
2974 new_line = rblc_result.ok()
2976 msg_result = self.__merge_string_group(new_line, string_idx)
2977 if isinstance(msg_result, Ok):
2978 new_line = msg_result.ok()
2980 if isinstance(rblc_result, Err) and isinstance(msg_result, Err):
2981 msg_cant_transform = msg_result.err()
2982 rblc_cant_transform = rblc_result.err()
2983 cant_transform = CannotTransform(
2984 "StringMerger failed to merge any strings in this line."
2987 # Chain the errors together using `__cause__`.
2988 msg_cant_transform.__cause__ = rblc_cant_transform
2989 cant_transform.__cause__ = msg_cant_transform
2991 yield Err(cant_transform)
2996 def __remove_backslash_line_continuation_chars(
2997 line: Line, string_idx: int
3000 Merge strings that were split across multiple lines using
3001 line-continuation backslashes.
3004 Ok(new_line), if @line contains backslash line-continuation
3007 Err(CannotTransform), otherwise.
3011 string_leaf = LL[string_idx]
3013 string_leaf.type == token.STRING
3014 and "\\\n" in string_leaf.value
3015 and not has_triple_quotes(string_leaf.value)
3018 f"String leaf {string_leaf} does not contain any backslash line"
3019 " continuation characters."
3022 new_line = line.clone()
3023 new_line.comments = line.comments
3024 append_leaves(new_line, line, LL)
3026 new_string_leaf = new_line.leaves[string_idx]
3027 new_string_leaf.value = new_string_leaf.value.replace("\\\n", "")
3031 def __merge_string_group(self, line: Line, string_idx: int) -> TResult[Line]:
3033 Merges string group (i.e. set of adjacent strings) where the first
3034 string in the group is `line.leaves[string_idx]`.
3037 Ok(new_line), if ALL of the validation checks found in
3038 __validate_msg(...) pass.
3040 Err(CannotTransform), otherwise.
3044 is_valid_index = is_valid_index_factory(LL)
3046 vresult = self.__validate_msg(line, string_idx)
3047 if isinstance(vresult, Err):
3050 # If the string group is wrapped inside an Atom node, we must make sure
3051 # to later replace that Atom with our new (merged) string leaf.
3052 atom_node = LL[string_idx].parent
3054 # We will place BREAK_MARK in between every two substrings that we
3055 # merge. We will then later go through our final result and use the
3056 # various instances of BREAK_MARK we find to add the right values to
3057 # the custom split map.
3058 BREAK_MARK = "@@@@@ BLACK BREAKPOINT MARKER @@@@@"
3060 QUOTE = LL[string_idx].value[-1]
3062 def make_naked(string: str, string_prefix: str) -> str:
3063 """Strip @string (i.e. make it a "naked" string)
3066 * assert_is_leaf_string(@string)
3069 A string that is identical to @string except that
3070 @string_prefix has been stripped, the surrounding QUOTE
3071 characters have been removed, and any remaining QUOTE
3072 characters have been escaped.
3074 assert_is_leaf_string(string)
3076 RE_EVEN_BACKSLASHES = r"(?:(?<!\\)(?:\\\\)*)"
3077 naked_string = string[len(string_prefix) + 1 : -1]
3078 naked_string = re.sub(
3079 "(" + RE_EVEN_BACKSLASHES + ")" + QUOTE, r"\1\\" + QUOTE, naked_string
3083 # Holds the CustomSplit objects that will later be added to the custom
3087 # Temporary storage for the 'has_prefix' part of the CustomSplit objects.
3090 # Sets the 'prefix' variable. This is the prefix that the final merged
3092 next_str_idx = string_idx
3096 and is_valid_index(next_str_idx)
3097 and LL[next_str_idx].type == token.STRING
3099 prefix = get_string_prefix(LL[next_str_idx].value)
3102 # The next loop merges the string group. The final string will be
3105 # The following convenience variables are used:
3110 # NSS: naked next string
3114 next_str_idx = string_idx
3115 while is_valid_index(next_str_idx) and LL[next_str_idx].type == token.STRING:
3118 SS = LL[next_str_idx].value
3119 next_prefix = get_string_prefix(SS)
3121 # If this is an f-string group but this substring is not prefixed
3123 if "f" in prefix and "f" not in next_prefix:
3124 # Then we must escape any braces contained in this substring.
3125 SS = re.subf(r"(\{|\})", "{1}{1}", SS)
3127 NSS = make_naked(SS, next_prefix)
3129 has_prefix = bool(next_prefix)
3130 prefix_tracker.append(has_prefix)
3132 S = prefix + QUOTE + NS + NSS + BREAK_MARK + QUOTE
3133 NS = make_naked(S, prefix)
3137 S_leaf = Leaf(token.STRING, S)
3138 if self.normalize_strings:
3139 normalize_string_quotes(S_leaf)
3141 # Fill the 'custom_splits' list with the appropriate CustomSplit objects.
3142 temp_string = S_leaf.value[len(prefix) + 1 : -1]
3143 for has_prefix in prefix_tracker:
3144 mark_idx = temp_string.find(BREAK_MARK)
3147 ), "Logic error while filling the custom string breakpoint cache."
3149 temp_string = temp_string[mark_idx + len(BREAK_MARK) :]
3150 breakpoint_idx = mark_idx + (len(prefix) if has_prefix else 0) + 1
3151 custom_splits.append(CustomSplit(has_prefix, breakpoint_idx))
3153 string_leaf = Leaf(token.STRING, S_leaf.value.replace(BREAK_MARK, ""))
3155 if atom_node is not None:
3156 replace_child(atom_node, string_leaf)
3158 # Build the final line ('new_line') that this method will later return.
3159 new_line = line.clone()
3160 for (i, leaf) in enumerate(LL):
3162 new_line.append(string_leaf)
3164 if string_idx <= i < string_idx + num_of_strings:
3165 for comment_leaf in line.comments_after(LL[i]):
3166 new_line.append(comment_leaf, preformatted=True)
3169 append_leaves(new_line, line, [leaf])
3171 self.add_custom_splits(string_leaf.value, custom_splits)
3175 def __validate_msg(line: Line, string_idx: int) -> TResult[None]:
3176 """Validate (M)erge (S)tring (G)roup
3178 Transform-time string validation logic for __merge_string_group(...).
3181 * Ok(None), if ALL validation checks (listed below) pass.
3183 * Err(CannotTransform), if any of the following are true:
3184 - The target string is not in a string group (i.e. it has no
3186 - The string group has more than one inline comment.
3187 - The string group has an inline comment that appears to be a pragma.
3188 - The set of all string prefixes in the string group is of
3189 length greater than one and is not equal to {"", "f"}.
3190 - The string group consists of raw strings.
3192 num_of_inline_string_comments = 0
3193 set_of_prefixes = set()
3195 for leaf in line.leaves[string_idx:]:
3196 if leaf.type != token.STRING:
3197 # If the string group is trailed by a comma, we count the
3198 # comments trailing the comma to be one of the string group's
3200 if leaf.type == token.COMMA and id(leaf) in line.comments:
3201 num_of_inline_string_comments += 1
3204 if has_triple_quotes(leaf.value):
3205 return TErr("StringMerger does NOT merge multiline strings.")
3208 prefix = get_string_prefix(leaf.value)
3210 return TErr("StringMerger does NOT merge raw strings.")
3212 set_of_prefixes.add(prefix)
3214 if id(leaf) in line.comments:
3215 num_of_inline_string_comments += 1
3216 if contains_pragma_comment(line.comments[id(leaf)]):
3217 return TErr("Cannot merge strings which have pragma comments.")
3219 if num_of_strings < 2:
3221 f"Not enough strings to merge (num_of_strings={num_of_strings})."
3224 if num_of_inline_string_comments > 1:
3226 f"Too many inline string comments ({num_of_inline_string_comments})."
3229 if len(set_of_prefixes) > 1 and set_of_prefixes != {"", "f"}:
3230 return TErr(f"Too many different prefixes ({set_of_prefixes}).")
3235 class StringParenStripper(StringTransformer):
3236 """StringTransformer that strips surrounding parentheses from strings.
3239 The line contains a string which is surrounded by parentheses and:
3240 - The target string is NOT the only argument to a function call).
3241 - The RPAR is NOT followed by an attribute access (i.e. a dot).
3244 The parentheses mentioned in the 'Requirements' section are stripped.
3247 StringParenStripper has its own inherent usefulness, but it is also
3248 relied on to clean up the parentheses created by StringParenWrapper (in
3249 the event that they are no longer needed).
3252 def do_match(self, line: Line) -> TMatchResult:
3255 is_valid_index = is_valid_index_factory(LL)
3257 for (idx, leaf) in enumerate(LL):
3258 # Should be a string...
3259 if leaf.type != token.STRING:
3262 # Should be preceded by a non-empty LPAR...
3264 not is_valid_index(idx - 1)
3265 or LL[idx - 1].type != token.LPAR
3266 or is_empty_lpar(LL[idx - 1])
3270 # That LPAR should NOT be preceded by a function name or a closing
3271 # bracket (which could be a function which returns a function or a
3272 # list/dictionary that contains a function)...
3273 if is_valid_index(idx - 2) and (
3274 LL[idx - 2].type == token.NAME or LL[idx - 2].type in CLOSING_BRACKETS
3280 # Skip the string trailer, if one exists.
3281 string_parser = StringParser()
3282 next_idx = string_parser.parse(LL, string_idx)
3284 # Should be followed by a non-empty RPAR...
3286 is_valid_index(next_idx)
3287 and LL[next_idx].type == token.RPAR
3288 and not is_empty_rpar(LL[next_idx])
3290 # That RPAR should NOT be followed by a '.' symbol.
3291 if is_valid_index(next_idx + 1) and LL[next_idx + 1].type == token.DOT:
3294 return Ok(string_idx)
3296 return TErr("This line has no strings wrapped in parens.")
3298 def do_transform(self, line: Line, string_idx: int) -> Iterator[TResult[Line]]:
3301 string_parser = StringParser()
3302 rpar_idx = string_parser.parse(LL, string_idx)
3304 for leaf in (LL[string_idx - 1], LL[rpar_idx]):
3305 if line.comments_after(leaf):
3307 "Will not strip parentheses which have comments attached to them."
3310 new_line = line.clone()
3311 new_line.comments = line.comments.copy()
3313 append_leaves(new_line, line, LL[: string_idx - 1])
3315 string_leaf = Leaf(token.STRING, LL[string_idx].value)
3316 LL[string_idx - 1].remove()
3317 replace_child(LL[string_idx], string_leaf)
3318 new_line.append(string_leaf)
3321 new_line, line, LL[string_idx + 1 : rpar_idx] + LL[rpar_idx + 1 :],
3324 LL[rpar_idx].remove()
3329 class BaseStringSplitter(StringTransformer):
3331 Abstract class for StringTransformers which transform a Line's strings by splitting
3332 them or placing them on their own lines where necessary to avoid going over
3333 the configured line length.
3336 * The target string value is responsible for the line going over the
3337 line length limit. It follows that after all of black's other line
3338 split methods have been exhausted, this line (or one of the resulting
3339 lines after all line splits are performed) would still be over the
3340 line_length limit unless we split this string.
3342 * The target string is NOT a "pointless" string (i.e. a string that has
3343 no parent or siblings).
3345 * The target string is not followed by an inline comment that appears
3348 * The target string is not a multiline (i.e. triple-quote) string.
3352 def do_splitter_match(self, line: Line) -> TMatchResult:
3354 BaseStringSplitter asks its clients to override this method instead of
3355 `StringTransformer.do_match(...)`.
3357 Follows the same protocol as `StringTransformer.do_match(...)`.
3359 Refer to `help(StringTransformer.do_match)` for more information.
3362 def do_match(self, line: Line) -> TMatchResult:
3363 match_result = self.do_splitter_match(line)
3364 if isinstance(match_result, Err):
3367 string_idx = match_result.ok()
3368 vresult = self.__validate(line, string_idx)
3369 if isinstance(vresult, Err):
3374 def __validate(self, line: Line, string_idx: int) -> TResult[None]:
3376 Checks that @line meets all of the requirements listed in this classes'
3377 docstring. Refer to `help(BaseStringSplitter)` for a detailed
3378 description of those requirements.
3381 * Ok(None), if ALL of the requirements are met.
3383 * Err(CannotTransform), if ANY of the requirements are NOT met.
3387 string_leaf = LL[string_idx]
3389 max_string_length = self.__get_max_string_length(line, string_idx)
3390 if len(string_leaf.value) <= max_string_length:
3392 "The string itself is not what is causing this line to be too long."
3395 if not string_leaf.parent or [L.type for L in string_leaf.parent.children] == [
3400 f"This string ({string_leaf.value}) appears to be pointless (i.e. has"
3404 if id(line.leaves[string_idx]) in line.comments and contains_pragma_comment(
3405 line.comments[id(line.leaves[string_idx])]
3408 "Line appears to end with an inline pragma comment. Splitting the line"
3409 " could modify the pragma's behavior."
3412 if has_triple_quotes(string_leaf.value):
3413 return TErr("We cannot split multiline strings.")
3417 def __get_max_string_length(self, line: Line, string_idx: int) -> int:
3419 Calculates the max string length used when attempting to determine
3420 whether or not the target string is responsible for causing the line to
3421 go over the line length limit.
3423 WARNING: This method is tightly coupled to both StringSplitter and
3424 (especially) StringParenWrapper. There is probably a better way to
3425 accomplish what is being done here.
3428 max_string_length: such that `line.leaves[string_idx].value >
3429 max_string_length` implies that the target string IS responsible
3430 for causing this line to exceed the line length limit.
3434 is_valid_index = is_valid_index_factory(LL)
3436 # We use the shorthand "WMA4" in comments to abbreviate "We must
3437 # account for". When giving examples, we use STRING to mean some/any
3440 # Finally, we use the following convenience variables:
3442 # P: The leaf that is before the target string leaf.
3443 # N: The leaf that is after the target string leaf.
3444 # NN: The leaf that is after N.
3446 # WMA4 the whitespace at the beginning of the line.
3447 offset = line.depth * 4
3449 if is_valid_index(string_idx - 1):
3450 p_idx = string_idx - 1
3452 LL[string_idx - 1].type == token.LPAR
3453 and LL[string_idx - 1].value == ""
3456 # If the previous leaf is an empty LPAR placeholder, we should skip it.
3460 if P.type == token.PLUS:
3461 # WMA4 a space and a '+' character (e.g. `+ STRING`).
3464 if P.type == token.COMMA:
3465 # WMA4 a space, a comma, and a closing bracket [e.g. `), STRING`].
3468 if P.type in [token.COLON, token.EQUAL, token.NAME]:
3469 # This conditional branch is meant to handle dictionary keys,
3470 # variable assignments, 'return STRING' statement lines, and
3471 # 'else STRING' ternary expression lines.
3473 # WMA4 a single space.
3476 # WMA4 the lengths of any leaves that came before that space.
3477 for leaf in LL[: p_idx + 1]:
3478 offset += len(str(leaf))
3480 if is_valid_index(string_idx + 1):
3481 N = LL[string_idx + 1]
3482 if N.type == token.RPAR and N.value == "" and len(LL) > string_idx + 2:
3483 # If the next leaf is an empty RPAR placeholder, we should skip it.
3484 N = LL[string_idx + 2]
3486 if N.type == token.COMMA:
3487 # WMA4 a single comma at the end of the string (e.g `STRING,`).
3490 if is_valid_index(string_idx + 2):
3491 NN = LL[string_idx + 2]
3493 if N.type == token.DOT and NN.type == token.NAME:
3494 # This conditional branch is meant to handle method calls invoked
3495 # off of a string literal up to and including the LPAR character.
3497 # WMA4 the '.' character.
3501 is_valid_index(string_idx + 3)
3502 and LL[string_idx + 3].type == token.LPAR
3504 # WMA4 the left parenthesis character.
3507 # WMA4 the length of the method's name.
3508 offset += len(NN.value)
3510 has_comments = False
3511 for comment_leaf in line.comments_after(LL[string_idx]):
3512 if not has_comments:
3514 # WMA4 two spaces before the '#' character.
3517 # WMA4 the length of the inline comment.
3518 offset += len(comment_leaf.value)
3520 max_string_length = self.line_length - offset
3521 return max_string_length
3524 class StringSplitter(CustomSplitMapMixin, BaseStringSplitter):
3526 StringTransformer that splits "atom" strings (i.e. strings which exist on
3527 lines by themselves).
3530 * The line consists ONLY of a single string (with the exception of a
3531 '+' symbol which MAY exist at the start of the line), MAYBE a string
3532 trailer, and MAYBE a trailing comma.
3534 * All of the requirements listed in BaseStringSplitter's docstring.
3537 The string mentioned in the 'Requirements' section is split into as
3538 many substrings as necessary to adhere to the configured line length.
3540 In the final set of substrings, no substring should be smaller than
3541 MIN_SUBSTR_SIZE characters.
3543 The string will ONLY be split on spaces (i.e. each new substring should
3544 start with a space).
3546 If the string is an f-string, it will NOT be split in the middle of an
3547 f-expression (e.g. in f"FooBar: {foo() if x else bar()}", {foo() if x
3548 else bar()} is an f-expression).
3550 If the string that is being split has an associated set of custom split
3551 records and those custom splits will NOT result in any line going over
3552 the configured line length, those custom splits are used. Otherwise the
3553 string is split as late as possible (from left-to-right) while still
3554 adhering to the transformation rules listed above.
3557 StringSplitter relies on StringMerger to construct the appropriate
3558 CustomSplit objects and add them to the custom split map.
3562 # Matches an "f-expression" (e.g. {var}) that might be found in an f-string.
3570 (?<!\})(?:\}\})*\}(?!\})
3573 def do_splitter_match(self, line: Line) -> TMatchResult:
3576 is_valid_index = is_valid_index_factory(LL)
3580 # The first leaf MAY be a '+' symbol...
3581 if is_valid_index(idx) and LL[idx].type == token.PLUS:
3584 # The next/first leaf MAY be an empty LPAR...
3585 if is_valid_index(idx) and is_empty_lpar(LL[idx]):
3588 # The next/first leaf MUST be a string...
3589 if not is_valid_index(idx) or LL[idx].type != token.STRING:
3590 return TErr("Line does not start with a string.")
3594 # Skip the string trailer, if one exists.
3595 string_parser = StringParser()
3596 idx = string_parser.parse(LL, string_idx)
3598 # That string MAY be followed by an empty RPAR...
3599 if is_valid_index(idx) and is_empty_rpar(LL[idx]):
3602 # That string / empty RPAR leaf MAY be followed by a comma...
3603 if is_valid_index(idx) and LL[idx].type == token.COMMA:
3606 # But no more leaves are allowed...
3607 if is_valid_index(idx):
3608 return TErr("This line does not end with a string.")
3610 return Ok(string_idx)
3612 def do_transform(self, line: Line, string_idx: int) -> Iterator[TResult[Line]]:
3615 QUOTE = LL[string_idx].value[-1]
3617 is_valid_index = is_valid_index_factory(LL)
3618 insert_str_child = insert_str_child_factory(LL[string_idx])
3620 prefix = get_string_prefix(LL[string_idx].value)
3622 # We MAY choose to drop the 'f' prefix from substrings that don't
3623 # contain any f-expressions, but ONLY if the original f-string
3624 # contains at least one f-expression. Otherwise, we will alter the AST
3626 drop_pointless_f_prefix = ("f" in prefix) and re.search(
3627 self.RE_FEXPR, LL[string_idx].value, re.VERBOSE
3630 first_string_line = True
3631 starts_with_plus = LL[0].type == token.PLUS
3633 def line_needs_plus() -> bool:
3634 return first_string_line and starts_with_plus
3636 def maybe_append_plus(new_line: Line) -> None:
3639 If @line starts with a plus and this is the first line we are
3640 constructing, this function appends a PLUS leaf to @new_line
3641 and replaces the old PLUS leaf in the node structure. Otherwise
3642 this function does nothing.
3644 if line_needs_plus():
3645 plus_leaf = Leaf(token.PLUS, "+")
3646 replace_child(LL[0], plus_leaf)
3647 new_line.append(plus_leaf)
3650 is_valid_index(string_idx + 1) and LL[string_idx + 1].type == token.COMMA
3653 def max_last_string() -> int:
3656 The max allowed length of the string value used for the last
3657 line we will construct.
3659 result = self.line_length
3660 result -= line.depth * 4
3661 result -= 1 if ends_with_comma else 0
3662 result -= 2 if line_needs_plus() else 0
3665 # --- Calculate Max Break Index (for string value)
3666 # We start with the line length limit
3667 max_break_idx = self.line_length
3668 # The last index of a string of length N is N-1.
3670 # Leading whitespace is not present in the string value (e.g. Leaf.value).
3671 max_break_idx -= line.depth * 4
3672 if max_break_idx < 0:
3674 f"Unable to split {LL[string_idx].value} at such high of a line depth:"
3679 # Check if StringMerger registered any custom splits.
3680 custom_splits = self.pop_custom_splits(LL[string_idx].value)
3681 # We use them ONLY if none of them would produce lines that exceed the
3683 use_custom_breakpoints = bool(
3685 and all(csplit.break_idx <= max_break_idx for csplit in custom_splits)
3688 # Temporary storage for the remaining chunk of the string line that
3689 # can't fit onto the line currently being constructed.
3690 rest_value = LL[string_idx].value
3692 def more_splits_should_be_made() -> bool:
3695 True iff `rest_value` (the remaining string value from the last
3696 split), should be split again.
3698 if use_custom_breakpoints:
3699 return len(custom_splits) > 1
3701 return len(rest_value) > max_last_string()
3703 string_line_results: List[Ok[Line]] = []
3704 while more_splits_should_be_made():
3705 if use_custom_breakpoints:
3706 # Custom User Split (manual)
3707 csplit = custom_splits.pop(0)
3708 break_idx = csplit.break_idx
3710 # Algorithmic Split (automatic)
3711 max_bidx = max_break_idx - 2 if line_needs_plus() else max_break_idx
3712 maybe_break_idx = self.__get_break_idx(rest_value, max_bidx)
3713 if maybe_break_idx is None:
3714 # If we are unable to algorithmically determine a good split
3715 # and this string has custom splits registered to it, we
3716 # fall back to using them--which means we have to start
3717 # over from the beginning.
3719 rest_value = LL[string_idx].value
3720 string_line_results = []
3721 first_string_line = True
3722 use_custom_breakpoints = True
3725 # Otherwise, we stop splitting here.
3728 break_idx = maybe_break_idx
3730 # --- Construct `next_value`
3731 next_value = rest_value[:break_idx] + QUOTE
3733 # Are we allowed to try to drop a pointless 'f' prefix?
3734 drop_pointless_f_prefix
3735 # If we are, will we be successful?
3736 and next_value != self.__normalize_f_string(next_value, prefix)
3738 # If the current custom split did NOT originally use a prefix,
3739 # then `csplit.break_idx` will be off by one after removing
3743 if use_custom_breakpoints and not csplit.has_prefix
3746 next_value = rest_value[:break_idx] + QUOTE
3747 next_value = self.__normalize_f_string(next_value, prefix)
3749 # --- Construct `next_leaf`
3750 next_leaf = Leaf(token.STRING, next_value)
3751 insert_str_child(next_leaf)
3752 self.__maybe_normalize_string_quotes(next_leaf)
3754 # --- Construct `next_line`
3755 next_line = line.clone()
3756 maybe_append_plus(next_line)
3757 next_line.append(next_leaf)
3758 string_line_results.append(Ok(next_line))
3760 rest_value = prefix + QUOTE + rest_value[break_idx:]
3761 first_string_line = False
3763 yield from string_line_results
3765 if drop_pointless_f_prefix:
3766 rest_value = self.__normalize_f_string(rest_value, prefix)
3768 rest_leaf = Leaf(token.STRING, rest_value)
3769 insert_str_child(rest_leaf)
3771 # NOTE: I could not find a test case that verifies that the following
3772 # line is actually necessary, but it seems to be. Otherwise we risk
3773 # not normalizing the last substring, right?
3774 self.__maybe_normalize_string_quotes(rest_leaf)
3776 last_line = line.clone()
3777 maybe_append_plus(last_line)
3779 # If there are any leaves to the right of the target string...
3780 if is_valid_index(string_idx + 1):
3781 # We use `temp_value` here to determine how long the last line
3782 # would be if we were to append all the leaves to the right of the
3783 # target string to the last string line.
3784 temp_value = rest_value
3785 for leaf in LL[string_idx + 1 :]:
3786 temp_value += str(leaf)
3787 if leaf.type == token.LPAR:
3790 # Try to fit them all on the same line with the last substring...
3792 len(temp_value) <= max_last_string()
3793 or LL[string_idx + 1].type == token.COMMA
3795 last_line.append(rest_leaf)
3796 append_leaves(last_line, line, LL[string_idx + 1 :])
3798 # Otherwise, place the last substring on one line and everything
3799 # else on a line below that...
3801 last_line.append(rest_leaf)
3804 non_string_line = line.clone()
3805 append_leaves(non_string_line, line, LL[string_idx + 1 :])
3806 yield Ok(non_string_line)
3807 # Else the target string was the last leaf...
3809 last_line.append(rest_leaf)
3810 last_line.comments = line.comments.copy()
3813 def __get_break_idx(self, string: str, max_break_idx: int) -> Optional[int]:
3815 This method contains the algorithm that StringSplitter uses to
3816 determine which character to split each string at.
3819 @string: The substring that we are attempting to split.
3820 @max_break_idx: The ideal break index. We will return this value if it
3821 meets all the necessary conditions. In the likely event that it
3822 doesn't we will try to find the closest index BELOW @max_break_idx
3823 that does. If that fails, we will expand our search by also
3824 considering all valid indices ABOVE @max_break_idx.
3827 * assert_is_leaf_string(@string)
3828 * 0 <= @max_break_idx < len(@string)
3831 break_idx, if an index is able to be found that meets all of the
3832 conditions listed in the 'Transformations' section of this classes'
3837 is_valid_index = is_valid_index_factory(string)
3839 assert is_valid_index(max_break_idx)
3840 assert_is_leaf_string(string)
3842 _fexpr_slices: Optional[List[Tuple[Index, Index]]] = None
3844 def fexpr_slices() -> Iterator[Tuple[Index, Index]]:
3847 All ranges of @string which, if @string were to be split there,
3848 would result in the splitting of an f-expression (which is NOT
3851 nonlocal _fexpr_slices
3853 if _fexpr_slices is None:
3855 for match in re.finditer(self.RE_FEXPR, string, re.VERBOSE):
3856 _fexpr_slices.append(match.span())
3858 yield from _fexpr_slices
3860 is_fstring = "f" in get_string_prefix(string)
3862 def breaks_fstring_expression(i: Index) -> bool:
3865 True iff returning @i would result in the splitting of an
3866 f-expression (which is NOT allowed).
3871 for (start, end) in fexpr_slices():
3872 if start <= i < end:
3877 def passes_all_checks(i: Index) -> bool:
3880 True iff ALL of the conditions listed in the 'Transformations'
3881 section of this classes' docstring would be be met by returning @i.
3883 is_space = string[i] == " "
3885 len(string[i:]) >= self.MIN_SUBSTR_SIZE
3886 and len(string[:i]) >= self.MIN_SUBSTR_SIZE
3888 return is_space and is_big_enough and not breaks_fstring_expression(i)
3890 # First, we check all indices BELOW @max_break_idx.
3891 break_idx = max_break_idx
3892 while is_valid_index(break_idx - 1) and not passes_all_checks(break_idx):
3895 if not passes_all_checks(break_idx):
3896 # If that fails, we check all indices ABOVE @max_break_idx.
3898 # If we are able to find a valid index here, the next line is going
3899 # to be longer than the specified line length, but it's probably
3900 # better than doing nothing at all.
3901 break_idx = max_break_idx + 1
3902 while is_valid_index(break_idx + 1) and not passes_all_checks(break_idx):
3905 if not is_valid_index(break_idx) or not passes_all_checks(break_idx):
3910 def __maybe_normalize_string_quotes(self, leaf: Leaf) -> None:
3911 if self.normalize_strings:
3912 normalize_string_quotes(leaf)
3914 def __normalize_f_string(self, string: str, prefix: str) -> str:
3917 * assert_is_leaf_string(@string)
3920 * If @string is an f-string that contains no f-expressions, we
3921 return a string identical to @string except that the 'f' prefix
3922 has been stripped and all double braces (i.e. '{{' or '}}') have
3923 been normalized (i.e. turned into '{' or '}').
3925 * Otherwise, we return @string.
3927 assert_is_leaf_string(string)
3929 if "f" in prefix and not re.search(self.RE_FEXPR, string, re.VERBOSE):
3930 new_prefix = prefix.replace("f", "")
3932 temp = string[len(prefix) :]
3933 temp = re.sub(r"\{\{", "{", temp)
3934 temp = re.sub(r"\}\}", "}", temp)
3937 return f"{new_prefix}{new_string}"
3942 class StringParenWrapper(CustomSplitMapMixin, BaseStringSplitter):
3944 StringTransformer that splits non-"atom" strings (i.e. strings that do not
3945 exist on lines by themselves).
3948 All of the requirements listed in BaseStringSplitter's docstring in
3949 addition to the requirements listed below:
3951 * The line is a return/yield statement, which returns/yields a string.
3953 * The line is part of a ternary expression (e.g. `x = y if cond else
3954 z`) such that the line starts with `else <string>`, where <string> is
3957 * The line is an assert statement, which ends with a string.
3959 * The line is an assignment statement (e.g. `x = <string>` or `x +=
3960 <string>`) such that the variable is being assigned the value of some
3963 * The line is a dictionary key assignment where some valid key is being
3964 assigned the value of some string.
3967 The chosen string is wrapped in parentheses and then split at the LPAR.
3969 We then have one line which ends with an LPAR and another line that
3970 starts with the chosen string. The latter line is then split again at
3971 the RPAR. This results in the RPAR (and possibly a trailing comma)
3972 being placed on its own line.
3974 NOTE: If any leaves exist to the right of the chosen string (except
3975 for a trailing comma, which would be placed after the RPAR), those
3976 leaves are placed inside the parentheses. In effect, the chosen
3977 string is not necessarily being "wrapped" by parentheses. We can,
3978 however, count on the LPAR being placed directly before the chosen
3981 In other words, StringParenWrapper creates "atom" strings. These
3982 can then be split again by StringSplitter, if necessary.
3985 In the event that a string line split by StringParenWrapper is
3986 changed such that it no longer needs to be given its own line,
3987 StringParenWrapper relies on StringParenStripper to clean up the
3988 parentheses it created.
3991 def do_splitter_match(self, line: Line) -> TMatchResult:
3995 string_idx = string_idx or self._return_match(LL)
3996 string_idx = string_idx or self._else_match(LL)
3997 string_idx = string_idx or self._assert_match(LL)
3998 string_idx = string_idx or self._assign_match(LL)
3999 string_idx = string_idx or self._dict_match(LL)
4001 if string_idx is not None:
4002 string_value = line.leaves[string_idx].value
4003 # If the string has no spaces...
4004 if " " not in string_value:
4005 # And will still violate the line length limit when split...
4006 max_string_length = self.line_length - ((line.depth + 1) * 4)
4007 if len(string_value) > max_string_length:
4008 # And has no associated custom splits...
4009 if not self.has_custom_splits(string_value):
4010 # Then we should NOT put this string on its own line.
4012 "We do not wrap long strings in parentheses when the"
4013 " resultant line would still be over the specified line"
4014 " length and can't be split further by StringSplitter."
4016 return Ok(string_idx)
4018 return TErr("This line does not contain any non-atomic strings.")
4021 def _return_match(LL: List[Leaf]) -> Optional[int]:
4024 string_idx such that @LL[string_idx] is equal to our target (i.e.
4025 matched) string, if this line matches the return/yield statement
4026 requirements listed in the 'Requirements' section of this classes'
4031 # If this line is apart of a return/yield statement and the first leaf
4032 # contains either the "return" or "yield" keywords...
4033 if parent_type(LL[0]) in [syms.return_stmt, syms.yield_expr] and LL[
4035 ].value in ["return", "yield"]:
4036 is_valid_index = is_valid_index_factory(LL)
4038 idx = 2 if is_valid_index(1) and is_empty_par(LL[1]) else 1
4039 # The next visible leaf MUST contain a string...
4040 if is_valid_index(idx) and LL[idx].type == token.STRING:
4046 def _else_match(LL: List[Leaf]) -> Optional[int]:
4049 string_idx such that @LL[string_idx] is equal to our target (i.e.
4050 matched) string, if this line matches the ternary expression
4051 requirements listed in the 'Requirements' section of this classes'
4056 # If this line is apart of a ternary expression and the first leaf
4057 # contains the "else" keyword...
4059 parent_type(LL[0]) == syms.test
4060 and LL[0].type == token.NAME
4061 and LL[0].value == "else"
4063 is_valid_index = is_valid_index_factory(LL)
4065 idx = 2 if is_valid_index(1) and is_empty_par(LL[1]) else 1
4066 # The next visible leaf MUST contain a string...
4067 if is_valid_index(idx) and LL[idx].type == token.STRING:
4073 def _assert_match(LL: List[Leaf]) -> Optional[int]:
4076 string_idx such that @LL[string_idx] is equal to our target (i.e.
4077 matched) string, if this line matches the assert statement
4078 requirements listed in the 'Requirements' section of this classes'
4083 # If this line is apart of an assert statement and the first leaf
4084 # contains the "assert" keyword...
4085 if parent_type(LL[0]) == syms.assert_stmt and LL[0].value == "assert":
4086 is_valid_index = is_valid_index_factory(LL)
4088 for (i, leaf) in enumerate(LL):
4089 # We MUST find a comma...
4090 if leaf.type == token.COMMA:
4091 idx = i + 2 if is_empty_par(LL[i + 1]) else i + 1
4093 # That comma MUST be followed by a string...
4094 if is_valid_index(idx) and LL[idx].type == token.STRING:
4097 # Skip the string trailer, if one exists.
4098 string_parser = StringParser()
4099 idx = string_parser.parse(LL, string_idx)
4101 # But no more leaves are allowed...
4102 if not is_valid_index(idx):
4108 def _assign_match(LL: List[Leaf]) -> Optional[int]:
4111 string_idx such that @LL[string_idx] is equal to our target (i.e.
4112 matched) string, if this line matches the assignment statement
4113 requirements listed in the 'Requirements' section of this classes'
4118 # If this line is apart of an expression statement or is a function
4119 # argument AND the first leaf contains a variable name...
4121 parent_type(LL[0]) in [syms.expr_stmt, syms.argument, syms.power]
4122 and LL[0].type == token.NAME
4124 is_valid_index = is_valid_index_factory(LL)
4126 for (i, leaf) in enumerate(LL):
4127 # We MUST find either an '=' or '+=' symbol...
4128 if leaf.type in [token.EQUAL, token.PLUSEQUAL]:
4129 idx = i + 2 if is_empty_par(LL[i + 1]) else i + 1
4131 # That symbol MUST be followed by a string...
4132 if is_valid_index(idx) and LL[idx].type == token.STRING:
4135 # Skip the string trailer, if one exists.
4136 string_parser = StringParser()
4137 idx = string_parser.parse(LL, string_idx)
4139 # The next leaf MAY be a comma iff this line is apart
4140 # of a function argument...
4142 parent_type(LL[0]) == syms.argument
4143 and is_valid_index(idx)
4144 and LL[idx].type == token.COMMA
4148 # But no more leaves are allowed...
4149 if not is_valid_index(idx):
4155 def _dict_match(LL: List[Leaf]) -> Optional[int]:
4158 string_idx such that @LL[string_idx] is equal to our target (i.e.
4159 matched) string, if this line matches the dictionary key assignment
4160 statement requirements listed in the 'Requirements' section of this
4165 # If this line is apart of a dictionary key assignment...
4166 if syms.dictsetmaker in [parent_type(LL[0]), parent_type(LL[0].parent)]:
4167 is_valid_index = is_valid_index_factory(LL)
4169 for (i, leaf) in enumerate(LL):
4170 # We MUST find a colon...
4171 if leaf.type == token.COLON:
4172 idx = i + 2 if is_empty_par(LL[i + 1]) else i + 1
4174 # That colon MUST be followed by a string...
4175 if is_valid_index(idx) and LL[idx].type == token.STRING:
4178 # Skip the string trailer, if one exists.
4179 string_parser = StringParser()
4180 idx = string_parser.parse(LL, string_idx)
4182 # That string MAY be followed by a comma...
4183 if is_valid_index(idx) and LL[idx].type == token.COMMA:
4186 # But no more leaves are allowed...
4187 if not is_valid_index(idx):
4192 def do_transform(self, line: Line, string_idx: int) -> Iterator[TResult[Line]]:
4195 is_valid_index = is_valid_index_factory(LL)
4196 insert_str_child = insert_str_child_factory(LL[string_idx])
4198 comma_idx = len(LL) - 1
4199 ends_with_comma = False
4200 if LL[comma_idx].type == token.COMMA:
4201 ends_with_comma = True
4203 leaves_to_steal_comments_from = [LL[string_idx]]
4205 leaves_to_steal_comments_from.append(LL[comma_idx])
4208 first_line = line.clone()
4209 left_leaves = LL[:string_idx]
4211 # We have to remember to account for (possibly invisible) LPAR and RPAR
4212 # leaves that already wrapped the target string. If these leaves do
4213 # exist, we will replace them with our own LPAR and RPAR leaves.
4214 old_parens_exist = False
4215 if left_leaves and left_leaves[-1].type == token.LPAR:
4216 old_parens_exist = True
4217 leaves_to_steal_comments_from.append(left_leaves[-1])
4220 append_leaves(first_line, line, left_leaves)
4222 lpar_leaf = Leaf(token.LPAR, "(")
4223 if old_parens_exist:
4224 replace_child(LL[string_idx - 1], lpar_leaf)
4226 insert_str_child(lpar_leaf)
4227 first_line.append(lpar_leaf)
4229 # We throw inline comments that were originally to the right of the
4230 # target string to the top line. They will now be shown to the right of
4232 for leaf in leaves_to_steal_comments_from:
4233 for comment_leaf in line.comments_after(leaf):
4234 first_line.append(comment_leaf, preformatted=True)
4236 yield Ok(first_line)
4238 # --- Middle (String) Line
4239 # We only need to yield one (possibly too long) string line, since the
4240 # `StringSplitter` will break it down further if necessary.
4241 string_value = LL[string_idx].value
4243 depth=line.depth + 1,
4244 inside_brackets=True,
4245 should_explode=line.should_explode,
4247 string_leaf = Leaf(token.STRING, string_value)
4248 insert_str_child(string_leaf)
4249 string_line.append(string_leaf)
4251 old_rpar_leaf = None
4252 if is_valid_index(string_idx + 1):
4253 right_leaves = LL[string_idx + 1 :]
4257 if old_parens_exist:
4259 right_leaves and right_leaves[-1].type == token.RPAR
4260 ), "Apparently, old parentheses do NOT exist?!"
4261 old_rpar_leaf = right_leaves.pop()
4263 append_leaves(string_line, line, right_leaves)
4265 yield Ok(string_line)
4268 last_line = line.clone()
4269 last_line.bracket_tracker = first_line.bracket_tracker
4271 new_rpar_leaf = Leaf(token.RPAR, ")")
4272 if old_rpar_leaf is not None:
4273 replace_child(old_rpar_leaf, new_rpar_leaf)
4275 insert_str_child(new_rpar_leaf)
4276 last_line.append(new_rpar_leaf)
4278 # If the target string ended with a comma, we place this comma to the
4279 # right of the RPAR on the last line.
4281 comma_leaf = Leaf(token.COMMA, ",")
4282 replace_child(LL[comma_idx], comma_leaf)
4283 last_line.append(comma_leaf)
4290 A state machine that aids in parsing a string's "trailer", which can be
4291 either non-existent, an old-style formatting sequence (e.g. `% varX` or `%
4292 (varX, varY)`), or a method-call / attribute access (e.g. `.format(varX,
4295 NOTE: A new StringParser object MUST be instantiated for each string
4296 trailer we need to parse.
4299 We shall assume that `line` equals the `Line` object that corresponds
4300 to the following line of python code:
4302 x = "Some {}.".format("String") + some_other_string
4305 Furthermore, we will assume that `string_idx` is some index such that:
4307 assert line.leaves[string_idx].value == "Some {}."
4310 The following code snippet then holds:
4312 string_parser = StringParser()
4313 idx = string_parser.parse(line.leaves, string_idx)
4314 assert line.leaves[idx].type == token.PLUS
4320 # String Parser States
4330 # Lookup Table for Next State
4331 _goto: Dict[Tuple[ParserState, NodeType], ParserState] = {
4332 # A string trailer may start with '.' OR '%'.
4333 (START, token.DOT): DOT,
4334 (START, token.PERCENT): PERCENT,
4335 (START, DEFAULT_TOKEN): DONE,
4336 # A '.' MUST be followed by an attribute or method name.
4337 (DOT, token.NAME): NAME,
4338 # A method name MUST be followed by an '(', whereas an attribute name
4339 # is the last symbol in the string trailer.
4340 (NAME, token.LPAR): LPAR,
4341 (NAME, DEFAULT_TOKEN): DONE,
4342 # A '%' symbol can be followed by an '(' or a single argument (e.g. a
4343 # string or variable name).
4344 (PERCENT, token.LPAR): LPAR,
4345 (PERCENT, DEFAULT_TOKEN): SINGLE_FMT_ARG,
4346 # If a '%' symbol is followed by a single argument, that argument is
4347 # the last leaf in the string trailer.
4348 (SINGLE_FMT_ARG, DEFAULT_TOKEN): DONE,
4349 # If present, a ')' symbol is the last symbol in a string trailer.
4350 # (NOTE: LPARS and nested RPARS are not included in this lookup table,
4351 # since they are treated as a special case by the parsing logic in this
4352 # classes' implementation.)
4353 (RPAR, DEFAULT_TOKEN): DONE,
4356 def __init__(self) -> None:
4357 self._state = self.START
4358 self._unmatched_lpars = 0
4360 def parse(self, leaves: List[Leaf], string_idx: int) -> int:
4363 * @leaves[@string_idx].type == token.STRING
4366 The index directly after the last leaf which is apart of the string
4367 trailer, if a "trailer" exists.
4369 @string_idx + 1, if no string "trailer" exists.
4371 assert leaves[string_idx].type == token.STRING
4373 idx = string_idx + 1
4374 while idx < len(leaves) and self._next_state(leaves[idx]):
4378 def _next_state(self, leaf: Leaf) -> bool:
4381 * On the first call to this function, @leaf MUST be the leaf that
4382 was directly after the string leaf in question (e.g. if our target
4383 string is `line.leaves[i]` then the first call to this method must
4384 be `line.leaves[i + 1]`).
4385 * On the next call to this function, the leaf parameter passed in
4386 MUST be the leaf directly following @leaf.
4389 True iff @leaf is apart of the string's trailer.
4391 # We ignore empty LPAR or RPAR leaves.
4392 if is_empty_par(leaf):
4395 next_token = leaf.type
4396 if next_token == token.LPAR:
4397 self._unmatched_lpars += 1
4399 current_state = self._state
4401 # The LPAR parser state is a special case. We will return True until we
4402 # find the matching RPAR token.
4403 if current_state == self.LPAR:
4404 if next_token == token.RPAR:
4405 self._unmatched_lpars -= 1
4406 if self._unmatched_lpars == 0:
4407 self._state = self.RPAR
4408 # Otherwise, we use a lookup table to determine the next state.
4410 # If the lookup table matches the current state to the next
4411 # token, we use the lookup table.
4412 if (current_state, next_token) in self._goto:
4413 self._state = self._goto[current_state, next_token]
4415 # Otherwise, we check if a the current state was assigned a
4417 if (current_state, self.DEFAULT_TOKEN) in self._goto:
4418 self._state = self._goto[current_state, self.DEFAULT_TOKEN]
4419 # If no default has been assigned, then this parser has a logic
4422 raise RuntimeError(f"{self.__class__.__name__} LOGIC ERROR!")
4424 if self._state == self.DONE:
4430 def TErr(err_msg: str) -> Err[CannotTransform]:
4433 Convenience function used when working with the TResult type.
4435 cant_transform = CannotTransform(err_msg)
4436 return Err(cant_transform)
4439 def contains_pragma_comment(comment_list: List[Leaf]) -> bool:
4442 True iff one of the comments in @comment_list is a pragma used by one
4443 of the more common static analysis tools for python (e.g. mypy, flake8,
4446 for comment in comment_list:
4447 if comment.value.startswith(("# type:", "# noqa", "# pylint:")):
4453 def insert_str_child_factory(string_leaf: Leaf) -> Callable[[LN], None]:
4455 Factory for a convenience function that is used to orphan @string_leaf
4456 and then insert multiple new leaves into the same part of the node
4457 structure that @string_leaf had originally occupied.
4460 Let `string_leaf = Leaf(token.STRING, '"foo"')` and `N =
4461 string_leaf.parent`. Assume the node `N` has the following
4468 Leaf(STRING, '"foo"'),
4472 We then run the code snippet shown below.
4474 insert_str_child = insert_str_child_factory(string_leaf)
4476 lpar = Leaf(token.LPAR, '(')
4477 insert_str_child(lpar)
4479 bar = Leaf(token.STRING, '"bar"')
4480 insert_str_child(bar)
4482 rpar = Leaf(token.RPAR, ')')
4483 insert_str_child(rpar)
4486 After which point, it follows that `string_leaf.parent is None` and
4487 the node `N` now has the following structure:
4494 Leaf(STRING, '"bar"'),
4499 string_parent = string_leaf.parent
4500 string_child_idx = string_leaf.remove()
4502 def insert_str_child(child: LN) -> None:
4503 nonlocal string_child_idx
4505 assert string_parent is not None
4506 assert string_child_idx is not None
4508 string_parent.insert_child(string_child_idx, child)
4509 string_child_idx += 1
4511 return insert_str_child
4514 def has_triple_quotes(string: str) -> bool:
4517 True iff @string starts with three quotation characters.
4519 raw_string = string.lstrip(STRING_PREFIX_CHARS)
4520 return raw_string[:3] in {'"""', "'''"}
4523 def parent_type(node: Optional[LN]) -> Optional[NodeType]:
4526 @node.parent.type, if @node is not None and has a parent.
4530 if node is None or node.parent is None:
4533 return node.parent.type
4536 def is_empty_par(leaf: Leaf) -> bool:
4537 return is_empty_lpar(leaf) or is_empty_rpar(leaf)
4540 def is_empty_lpar(leaf: Leaf) -> bool:
4541 return leaf.type == token.LPAR and leaf.value == ""
4544 def is_empty_rpar(leaf: Leaf) -> bool:
4545 return leaf.type == token.RPAR and leaf.value == ""
4548 def is_valid_index_factory(seq: Sequence[Any]) -> Callable[[int], bool]:
4554 is_valid_index = is_valid_index_factory(my_list)
4556 assert is_valid_index(0)
4557 assert is_valid_index(2)
4559 assert not is_valid_index(3)
4560 assert not is_valid_index(-1)
4564 def is_valid_index(idx: int) -> bool:
4567 True iff @idx is positive AND seq[@idx] does NOT raise an
4570 return 0 <= idx < len(seq)
4572 return is_valid_index
4575 def line_to_string(line: Line) -> str:
4576 """Returns the string representation of @line.
4578 WARNING: This is known to be computationally expensive.
4580 return str(line).strip("\n")
4583 def append_leaves(new_line: Line, old_line: Line, leaves: List[Leaf]) -> None:
4585 Append leaves (taken from @old_line) to @new_line, making sure to fix the
4586 underlying Node structure where appropriate.
4588 All of the leaves in @leaves are duplicated. The duplicates are then
4589 appended to @new_line and used to replace their originals in the underlying
4590 Node structure. Any comments attached to the old leaves are reattached to
4594 set(@leaves) is a subset of set(@old_line.leaves).
4596 for old_leaf in leaves:
4597 assert old_leaf in old_line.leaves
4599 new_leaf = Leaf(old_leaf.type, old_leaf.value)
4600 replace_child(old_leaf, new_leaf)
4601 new_line.append(new_leaf)
4603 for comment_leaf in old_line.comments_after(old_leaf):
4604 new_line.append(comment_leaf, preformatted=True)
4607 def replace_child(old_child: LN, new_child: LN) -> None:
4610 * If @old_child.parent is set, replace @old_child with @new_child in
4611 @old_child's underlying Node structure.
4613 * Otherwise, this function does nothing.
4615 parent = old_child.parent
4619 child_idx = old_child.remove()
4620 if child_idx is not None:
4621 parent.insert_child(child_idx, new_child)
4624 def get_string_prefix(string: str) -> str:
4627 * assert_is_leaf_string(@string)
4630 @string's prefix (e.g. '', 'r', 'f', or 'rf').
4632 assert_is_leaf_string(string)
4636 while string[prefix_idx] in STRING_PREFIX_CHARS:
4637 prefix += string[prefix_idx].lower()
4643 def assert_is_leaf_string(string: str) -> None:
4645 Checks the pre-condition that @string has the format that you would expect
4646 of `leaf.value` where `leaf` is some Leaf such that `leaf.type ==
4647 token.STRING`. A more precise description of the pre-conditions that are
4648 checked are listed below.
4651 * @string starts with either ', ", <prefix>', or <prefix>" where
4652 `set(<prefix>)` is some subset of `set(STRING_PREFIX_CHARS)`.
4653 * @string ends with a quote character (' or ").
4656 AssertionError(...) if the pre-conditions listed above are not
4659 dquote_idx = string.find('"')
4660 squote_idx = string.find("'")
4661 if -1 in [dquote_idx, squote_idx]:
4662 quote_idx = max(dquote_idx, squote_idx)
4664 quote_idx = min(squote_idx, dquote_idx)
4667 0 <= quote_idx < len(string) - 1
4668 ), f"{string!r} is missing a starting quote character (' or \")."
4669 assert string[-1] in (
4672 ), f"{string!r} is missing an ending quote character (' or \")."
4673 assert set(string[:quote_idx]).issubset(
4674 set(STRING_PREFIX_CHARS)
4675 ), f"{set(string[:quote_idx])} is NOT a subset of {set(STRING_PREFIX_CHARS)}."
4678 def left_hand_split(line: Line, _features: Collection[Feature] = ()) -> Iterator[Line]:
4679 """Split line into many lines, starting with the first matching bracket pair.
4681 Note: this usually looks weird, only use this for function definitions.
4682 Prefer RHS otherwise. This is why this function is not symmetrical with
4683 :func:`right_hand_split` which also handles optional parentheses.
4685 tail_leaves: List[Leaf] = []
4686 body_leaves: List[Leaf] = []
4687 head_leaves: List[Leaf] = []
4688 current_leaves = head_leaves
4689 matching_bracket: Optional[Leaf] = None
4690 for leaf in line.leaves:
4692 current_leaves is body_leaves
4693 and leaf.type in CLOSING_BRACKETS
4694 and leaf.opening_bracket is matching_bracket
4696 current_leaves = tail_leaves if body_leaves else head_leaves
4697 current_leaves.append(leaf)
4698 if current_leaves is head_leaves:
4699 if leaf.type in OPENING_BRACKETS:
4700 matching_bracket = leaf
4701 current_leaves = body_leaves
4702 if not matching_bracket:
4703 raise CannotSplit("No brackets found")
4705 head = bracket_split_build_line(head_leaves, line, matching_bracket)
4706 body = bracket_split_build_line(body_leaves, line, matching_bracket, is_body=True)
4707 tail = bracket_split_build_line(tail_leaves, line, matching_bracket)
4708 bracket_split_succeeded_or_raise(head, body, tail)
4709 for result in (head, body, tail):
4714 def right_hand_split(
4717 features: Collection[Feature] = (),
4718 omit: Collection[LeafID] = (),
4719 ) -> Iterator[Line]:
4720 """Split line into many lines, starting with the last matching bracket pair.
4722 If the split was by optional parentheses, attempt splitting without them, too.
4723 `omit` is a collection of closing bracket IDs that shouldn't be considered for
4726 Note: running this function modifies `bracket_depth` on the leaves of `line`.
4728 tail_leaves: List[Leaf] = []
4729 body_leaves: List[Leaf] = []
4730 head_leaves: List[Leaf] = []
4731 current_leaves = tail_leaves
4732 opening_bracket: Optional[Leaf] = None
4733 closing_bracket: Optional[Leaf] = None
4734 for leaf in reversed(line.leaves):
4735 if current_leaves is body_leaves:
4736 if leaf is opening_bracket:
4737 current_leaves = head_leaves if body_leaves else tail_leaves
4738 current_leaves.append(leaf)
4739 if current_leaves is tail_leaves:
4740 if leaf.type in CLOSING_BRACKETS and id(leaf) not in omit:
4741 opening_bracket = leaf.opening_bracket
4742 closing_bracket = leaf
4743 current_leaves = body_leaves
4744 if not (opening_bracket and closing_bracket and head_leaves):
4745 # If there is no opening or closing_bracket that means the split failed and
4746 # all content is in the tail. Otherwise, if `head_leaves` are empty, it means
4747 # the matching `opening_bracket` wasn't available on `line` anymore.
4748 raise CannotSplit("No brackets found")
4750 tail_leaves.reverse()
4751 body_leaves.reverse()
4752 head_leaves.reverse()
4753 head = bracket_split_build_line(head_leaves, line, opening_bracket)
4754 body = bracket_split_build_line(body_leaves, line, opening_bracket, is_body=True)
4755 tail = bracket_split_build_line(tail_leaves, line, opening_bracket)
4756 bracket_split_succeeded_or_raise(head, body, tail)
4758 # the body shouldn't be exploded
4759 not body.should_explode
4760 # the opening bracket is an optional paren
4761 and opening_bracket.type == token.LPAR
4762 and not opening_bracket.value
4763 # the closing bracket is an optional paren
4764 and closing_bracket.type == token.RPAR
4765 and not closing_bracket.value
4766 # it's not an import (optional parens are the only thing we can split on
4767 # in this case; attempting a split without them is a waste of time)
4768 and not line.is_import
4769 # there are no standalone comments in the body
4770 and not body.contains_standalone_comments(0)
4771 # and we can actually remove the parens
4772 and can_omit_invisible_parens(body, line_length)
4774 omit = {id(closing_bracket), *omit}
4776 yield from right_hand_split(line, line_length, features=features, omit=omit)
4782 or is_line_short_enough(body, line_length=line_length)
4785 "Splitting failed, body is still too long and can't be split."
4788 elif head.contains_multiline_strings() or tail.contains_multiline_strings():
4790 "The current optional pair of parentheses is bound to fail to"
4791 " satisfy the splitting algorithm because the head or the tail"
4792 " contains multiline strings which by definition never fit one"
4796 ensure_visible(opening_bracket)
4797 ensure_visible(closing_bracket)
4798 for result in (head, body, tail):
4803 def bracket_split_succeeded_or_raise(head: Line, body: Line, tail: Line) -> None:
4804 """Raise :exc:`CannotSplit` if the last left- or right-hand split failed.
4806 Do nothing otherwise.
4808 A left- or right-hand split is based on a pair of brackets. Content before
4809 (and including) the opening bracket is left on one line, content inside the
4810 brackets is put on a separate line, and finally content starting with and
4811 following the closing bracket is put on a separate line.
4813 Those are called `head`, `body`, and `tail`, respectively. If the split
4814 produced the same line (all content in `head`) or ended up with an empty `body`
4815 and the `tail` is just the closing bracket, then it's considered failed.
4817 tail_len = len(str(tail).strip())
4820 raise CannotSplit("Splitting brackets produced the same line")
4824 f"Splitting brackets on an empty body to save {tail_len} characters is"
4829 def bracket_split_build_line(
4830 leaves: List[Leaf], original: Line, opening_bracket: Leaf, *, is_body: bool = False
4832 """Return a new line with given `leaves` and respective comments from `original`.
4834 If `is_body` is True, the result line is one-indented inside brackets and as such
4835 has its first leaf's prefix normalized and a trailing comma added when expected.
4837 result = Line(depth=original.depth)
4839 result.inside_brackets = True
4842 # Since body is a new indent level, remove spurious leading whitespace.
4843 normalize_prefix(leaves[0], inside_brackets=True)
4844 # Ensure a trailing comma for imports and standalone function arguments, but
4845 # be careful not to add one after any comments or within type annotations.
4848 and opening_bracket.value == "("
4849 and not any(leaf.type == token.COMMA for leaf in leaves)
4852 if original.is_import or no_commas:
4853 for i in range(len(leaves) - 1, -1, -1):
4854 if leaves[i].type == STANDALONE_COMMENT:
4857 if leaves[i].type != token.COMMA:
4858 leaves.insert(i + 1, Leaf(token.COMMA, ","))
4863 result.append(leaf, preformatted=True)
4864 for comment_after in original.comments_after(leaf):
4865 result.append(comment_after, preformatted=True)
4867 result.should_explode = should_explode(result, opening_bracket)
4871 def dont_increase_indentation(split_func: Transformer) -> Transformer:
4872 """Normalize prefix of the first leaf in every line returned by `split_func`.
4874 This is a decorator over relevant split functions.
4878 def split_wrapper(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
4879 for line in split_func(line, features):
4880 normalize_prefix(line.leaves[0], inside_brackets=True)
4883 return split_wrapper
4886 @dont_increase_indentation
4887 def delimiter_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
4888 """Split according to delimiters of the highest priority.
4890 If the appropriate Features are given, the split will add trailing commas
4891 also in function signatures and calls that contain `*` and `**`.
4894 last_leaf = line.leaves[-1]
4896 raise CannotSplit("Line empty")
4898 bt = line.bracket_tracker
4900 delimiter_priority = bt.max_delimiter_priority(exclude={id(last_leaf)})
4902 raise CannotSplit("No delimiters found")
4904 if delimiter_priority == DOT_PRIORITY:
4905 if bt.delimiter_count_with_priority(delimiter_priority) == 1:
4906 raise CannotSplit("Splitting a single attribute from its owner looks wrong")
4908 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
4909 lowest_depth = sys.maxsize
4910 trailing_comma_safe = True
4912 def append_to_line(leaf: Leaf) -> Iterator[Line]:
4913 """Append `leaf` to current line or to new line if appending impossible."""
4914 nonlocal current_line
4916 current_line.append_safe(leaf, preformatted=True)
4920 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
4921 current_line.append(leaf)
4923 for leaf in line.leaves:
4924 yield from append_to_line(leaf)
4926 for comment_after in line.comments_after(leaf):
4927 yield from append_to_line(comment_after)
4929 lowest_depth = min(lowest_depth, leaf.bracket_depth)
4930 if leaf.bracket_depth == lowest_depth:
4931 if is_vararg(leaf, within={syms.typedargslist}):
4932 trailing_comma_safe = (
4933 trailing_comma_safe and Feature.TRAILING_COMMA_IN_DEF in features
4935 elif is_vararg(leaf, within={syms.arglist, syms.argument}):
4936 trailing_comma_safe = (
4937 trailing_comma_safe and Feature.TRAILING_COMMA_IN_CALL in features
4940 leaf_priority = bt.delimiters.get(id(leaf))
4941 if leaf_priority == delimiter_priority:
4944 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
4948 and delimiter_priority == COMMA_PRIORITY
4949 and current_line.leaves[-1].type != token.COMMA
4950 and current_line.leaves[-1].type != STANDALONE_COMMENT
4952 current_line.append(Leaf(token.COMMA, ","))
4956 @dont_increase_indentation
4957 def standalone_comment_split(
4958 line: Line, features: Collection[Feature] = ()
4959 ) -> Iterator[Line]:
4960 """Split standalone comments from the rest of the line."""
4961 if not line.contains_standalone_comments(0):
4962 raise CannotSplit("Line does not have any standalone comments")
4964 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
4966 def append_to_line(leaf: Leaf) -> Iterator[Line]:
4967 """Append `leaf` to current line or to new line if appending impossible."""
4968 nonlocal current_line
4970 current_line.append_safe(leaf, preformatted=True)
4974 current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
4975 current_line.append(leaf)
4977 for leaf in line.leaves:
4978 yield from append_to_line(leaf)
4980 for comment_after in line.comments_after(leaf):
4981 yield from append_to_line(comment_after)
4987 def is_import(leaf: Leaf) -> bool:
4988 """Return True if the given leaf starts an import statement."""
4995 (v == "import" and p and p.type == syms.import_name)
4996 or (v == "from" and p and p.type == syms.import_from)
5001 def is_type_comment(leaf: Leaf, suffix: str = "") -> bool:
5002 """Return True if the given leaf is a special comment.
5003 Only returns true for type comments for now."""
5006 return t in {token.COMMENT, STANDALONE_COMMENT} and v.startswith("# type:" + suffix)
5009 def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
5010 """Leave existing extra newlines if not `inside_brackets`. Remove everything
5013 Note: don't use backslashes for formatting or you'll lose your voting rights.
5015 if not inside_brackets:
5016 spl = leaf.prefix.split("#")
5017 if "\\" not in spl[0]:
5018 nl_count = spl[-1].count("\n")
5021 leaf.prefix = "\n" * nl_count
5027 def normalize_string_prefix(leaf: Leaf, remove_u_prefix: bool = False) -> None:
5028 """Make all string prefixes lowercase.
5030 If remove_u_prefix is given, also removes any u prefix from the string.
5032 Note: Mutates its argument.
5034 match = re.match(r"^([" + STRING_PREFIX_CHARS + r"]*)(.*)$", leaf.value, re.DOTALL)
5035 assert match is not None, f"failed to match string {leaf.value!r}"
5036 orig_prefix = match.group(1)
5037 new_prefix = orig_prefix.replace("F", "f").replace("B", "b").replace("U", "u")
5039 new_prefix = new_prefix.replace("u", "")
5040 leaf.value = f"{new_prefix}{match.group(2)}"
5043 def normalize_string_quotes(leaf: Leaf) -> None:
5044 """Prefer double quotes but only if it doesn't cause more escaping.
5046 Adds or removes backslashes as appropriate. Doesn't parse and fix
5047 strings nested in f-strings (yet).
5049 Note: Mutates its argument.
5051 value = leaf.value.lstrip(STRING_PREFIX_CHARS)
5052 if value[:3] == '"""':
5055 elif value[:3] == "'''":
5058 elif value[0] == '"':
5064 first_quote_pos = leaf.value.find(orig_quote)
5065 if first_quote_pos == -1:
5066 return # There's an internal error
5068 prefix = leaf.value[:first_quote_pos]
5069 unescaped_new_quote = re.compile(rf"(([^\\]|^)(\\\\)*){new_quote}")
5070 escaped_new_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){new_quote}")
5071 escaped_orig_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){orig_quote}")
5072 body = leaf.value[first_quote_pos + len(orig_quote) : -len(orig_quote)]
5073 if "r" in prefix.casefold():
5074 if unescaped_new_quote.search(body):
5075 # There's at least one unescaped new_quote in this raw string
5076 # so converting is impossible
5079 # Do not introduce or remove backslashes in raw strings
5082 # remove unnecessary escapes
5083 new_body = sub_twice(escaped_new_quote, rf"\1\2{new_quote}", body)
5084 if body != new_body:
5085 # Consider the string without unnecessary escapes as the original
5087 leaf.value = f"{prefix}{orig_quote}{body}{orig_quote}"
5088 new_body = sub_twice(escaped_orig_quote, rf"\1\2{orig_quote}", new_body)
5089 new_body = sub_twice(unescaped_new_quote, rf"\1\\{new_quote}", new_body)
5090 if "f" in prefix.casefold():
5091 matches = re.findall(
5093 (?:[^{]|^)\{ # start of the string or a non-{ followed by a single {
5094 ([^{].*?) # contents of the brackets except if begins with {{
5095 \}(?:[^}]|$) # A } followed by end of the string or a non-}
5102 # Do not introduce backslashes in interpolated expressions
5105 if new_quote == '"""' and new_body[-1:] == '"':
5107 new_body = new_body[:-1] + '\\"'
5108 orig_escape_count = body.count("\\")
5109 new_escape_count = new_body.count("\\")
5110 if new_escape_count > orig_escape_count:
5111 return # Do not introduce more escaping
5113 if new_escape_count == orig_escape_count and orig_quote == '"':
5114 return # Prefer double quotes
5116 leaf.value = f"{prefix}{new_quote}{new_body}{new_quote}"
5119 def normalize_numeric_literal(leaf: Leaf) -> None:
5120 """Normalizes numeric (float, int, and complex) literals.
5122 All letters used in the representation are normalized to lowercase (except
5123 in Python 2 long literals).
5125 text = leaf.value.lower()
5126 if text.startswith(("0o", "0b")):
5127 # Leave octal and binary literals alone.
5129 elif text.startswith("0x"):
5130 # Change hex literals to upper case.
5131 before, after = text[:2], text[2:]
5132 text = f"{before}{after.upper()}"
5134 before, after = text.split("e")
5136 if after.startswith("-"):
5139 elif after.startswith("+"):
5141 before = format_float_or_int_string(before)
5142 text = f"{before}e{sign}{after}"
5143 elif text.endswith(("j", "l")):
5146 # Capitalize in "2L" because "l" looks too similar to "1".
5149 text = f"{format_float_or_int_string(number)}{suffix}"
5151 text = format_float_or_int_string(text)
5155 def format_float_or_int_string(text: str) -> str:
5156 """Formats a float string like "1.0"."""
5160 before, after = text.split(".")
5161 return f"{before or 0}.{after or 0}"
5164 def normalize_invisible_parens(node: Node, parens_after: Set[str]) -> None:
5165 """Make existing optional parentheses invisible or create new ones.
5167 `parens_after` is a set of string leaf values immediately after which parens
5170 Standardizes on visible parentheses for single-element tuples, and keeps
5171 existing visible parentheses for other tuples and generator expressions.
5173 for pc in list_comments(node.prefix, is_endmarker=False):
5174 if pc.value in FMT_OFF:
5175 # This `node` has a prefix with `# fmt: off`, don't mess with parens.
5178 for index, child in enumerate(list(node.children)):
5179 # Fixes a bug where invisible parens are not properly stripped from
5180 # assignment statements that contain type annotations.
5181 if isinstance(child, Node) and child.type == syms.annassign:
5182 normalize_invisible_parens(child, parens_after=parens_after)
5184 # Add parentheses around long tuple unpacking in assignments.
5187 and isinstance(child, Node)
5188 and child.type == syms.testlist_star_expr
5193 if is_walrus_assignment(child):
5196 if child.type == syms.atom:
5197 if maybe_make_parens_invisible_in_atom(child, parent=node):
5198 wrap_in_parentheses(node, child, visible=False)
5199 elif is_one_tuple(child):
5200 wrap_in_parentheses(node, child, visible=True)
5201 elif node.type == syms.import_from:
5202 # "import from" nodes store parentheses directly as part of
5204 if child.type == token.LPAR:
5205 # make parentheses invisible
5206 child.value = "" # type: ignore
5207 node.children[-1].value = "" # type: ignore
5208 elif child.type != token.STAR:
5209 # insert invisible parentheses
5210 node.insert_child(index, Leaf(token.LPAR, ""))
5211 node.append_child(Leaf(token.RPAR, ""))
5214 elif not (isinstance(child, Leaf) and is_multiline_string(child)):
5215 wrap_in_parentheses(node, child, visible=False)
5217 check_lpar = isinstance(child, Leaf) and child.value in parens_after
5220 def normalize_fmt_off(node: Node) -> None:
5221 """Convert content between `# fmt: off`/`# fmt: on` into standalone comments."""
5224 try_again = convert_one_fmt_off_pair(node)
5227 def convert_one_fmt_off_pair(node: Node) -> bool:
5228 """Convert content of a single `# fmt: off`/`# fmt: on` into a standalone comment.
5230 Returns True if a pair was converted.
5232 for leaf in node.leaves():
5233 previous_consumed = 0
5234 for comment in list_comments(leaf.prefix, is_endmarker=False):
5235 if comment.value in FMT_OFF:
5236 # We only want standalone comments. If there's no previous leaf or
5237 # the previous leaf is indentation, it's a standalone comment in
5239 if comment.type != STANDALONE_COMMENT:
5240 prev = preceding_leaf(leaf)
5241 if prev and prev.type not in WHITESPACE:
5244 ignored_nodes = list(generate_ignored_nodes(leaf))
5245 if not ignored_nodes:
5248 first = ignored_nodes[0] # Can be a container node with the `leaf`.
5249 parent = first.parent
5250 prefix = first.prefix
5251 first.prefix = prefix[comment.consumed :]
5253 comment.value + "\n" + "".join(str(n) for n in ignored_nodes)
5255 if hidden_value.endswith("\n"):
5256 # That happens when one of the `ignored_nodes` ended with a NEWLINE
5257 # leaf (possibly followed by a DEDENT).
5258 hidden_value = hidden_value[:-1]
5259 first_idx: Optional[int] = None
5260 for ignored in ignored_nodes:
5261 index = ignored.remove()
5262 if first_idx is None:
5264 assert parent is not None, "INTERNAL ERROR: fmt: on/off handling (1)"
5265 assert first_idx is not None, "INTERNAL ERROR: fmt: on/off handling (2)"
5266 parent.insert_child(
5271 prefix=prefix[:previous_consumed] + "\n" * comment.newlines,
5276 previous_consumed = comment.consumed
5281 def generate_ignored_nodes(leaf: Leaf) -> Iterator[LN]:
5282 """Starting from the container of `leaf`, generate all leaves until `# fmt: on`.
5284 Stops at the end of the block.
5286 container: Optional[LN] = container_of(leaf)
5287 while container is not None and container.type != token.ENDMARKER:
5288 if is_fmt_on(container):
5291 # fix for fmt: on in children
5292 if contains_fmt_on_at_column(container, leaf.column):
5293 for child in container.children:
5294 if contains_fmt_on_at_column(child, leaf.column):
5299 container = container.next_sibling
5302 def is_fmt_on(container: LN) -> bool:
5303 """Determine whether formatting is switched on within a container.
5304 Determined by whether the last `# fmt:` comment is `on` or `off`.
5307 for comment in list_comments(container.prefix, is_endmarker=False):
5308 if comment.value in FMT_ON:
5310 elif comment.value in FMT_OFF:
5315 def contains_fmt_on_at_column(container: LN, column: int) -> bool:
5316 """Determine if children at a given column have formatting switched on."""
5317 for child in container.children:
5319 isinstance(child, Node)
5320 and first_leaf_column(child) == column
5321 or isinstance(child, Leaf)
5322 and child.column == column
5324 if is_fmt_on(child):
5330 def first_leaf_column(node: Node) -> Optional[int]:
5331 """Returns the column of the first leaf child of a node."""
5332 for child in node.children:
5333 if isinstance(child, Leaf):
5338 def maybe_make_parens_invisible_in_atom(node: LN, parent: LN) -> bool:
5339 """If it's safe, make the parens in the atom `node` invisible, recursively.
5340 Additionally, remove repeated, adjacent invisible parens from the atom `node`
5341 as they are redundant.
5343 Returns whether the node should itself be wrapped in invisible parentheses.
5347 node.type != syms.atom
5348 or is_empty_tuple(node)
5349 or is_one_tuple(node)
5350 or (is_yield(node) and parent.type != syms.expr_stmt)
5351 or max_delimiter_priority_in_atom(node) >= COMMA_PRIORITY
5355 first = node.children[0]
5356 last = node.children[-1]
5357 if first.type == token.LPAR and last.type == token.RPAR:
5358 middle = node.children[1]
5359 # make parentheses invisible
5360 first.value = "" # type: ignore
5361 last.value = "" # type: ignore
5362 maybe_make_parens_invisible_in_atom(middle, parent=parent)
5364 if is_atom_with_invisible_parens(middle):
5365 # Strip the invisible parens from `middle` by replacing
5366 # it with the child in-between the invisible parens
5367 middle.replace(middle.children[1])
5374 def is_atom_with_invisible_parens(node: LN) -> bool:
5375 """Given a `LN`, determines whether it's an atom `node` with invisible
5376 parens. Useful in dedupe-ing and normalizing parens.
5378 if isinstance(node, Leaf) or node.type != syms.atom:
5381 first, last = node.children[0], node.children[-1]
5383 isinstance(first, Leaf)
5384 and first.type == token.LPAR
5385 and first.value == ""
5386 and isinstance(last, Leaf)
5387 and last.type == token.RPAR
5388 and last.value == ""
5392 def is_empty_tuple(node: LN) -> bool:
5393 """Return True if `node` holds an empty tuple."""
5395 node.type == syms.atom
5396 and len(node.children) == 2
5397 and node.children[0].type == token.LPAR
5398 and node.children[1].type == token.RPAR
5402 def unwrap_singleton_parenthesis(node: LN) -> Optional[LN]:
5403 """Returns `wrapped` if `node` is of the shape ( wrapped ).
5405 Parenthesis can be optional. Returns None otherwise"""
5406 if len(node.children) != 3:
5409 lpar, wrapped, rpar = node.children
5410 if not (lpar.type == token.LPAR and rpar.type == token.RPAR):
5416 def wrap_in_parentheses(parent: Node, child: LN, *, visible: bool = True) -> None:
5417 """Wrap `child` in parentheses.
5419 This replaces `child` with an atom holding the parentheses and the old
5420 child. That requires moving the prefix.
5422 If `visible` is False, the leaves will be valueless (and thus invisible).
5424 lpar = Leaf(token.LPAR, "(" if visible else "")
5425 rpar = Leaf(token.RPAR, ")" if visible else "")
5426 prefix = child.prefix
5428 index = child.remove() or 0
5429 new_child = Node(syms.atom, [lpar, child, rpar])
5430 new_child.prefix = prefix
5431 parent.insert_child(index, new_child)
5434 def is_one_tuple(node: LN) -> bool:
5435 """Return True if `node` holds a tuple with one element, with or without parens."""
5436 if node.type == syms.atom:
5437 gexp = unwrap_singleton_parenthesis(node)
5438 if gexp is None or gexp.type != syms.testlist_gexp:
5441 return len(gexp.children) == 2 and gexp.children[1].type == token.COMMA
5444 node.type in IMPLICIT_TUPLE
5445 and len(node.children) == 2
5446 and node.children[1].type == token.COMMA
5450 def is_walrus_assignment(node: LN) -> bool:
5451 """Return True iff `node` is of the shape ( test := test )"""
5452 inner = unwrap_singleton_parenthesis(node)
5453 return inner is not None and inner.type == syms.namedexpr_test
5456 def is_yield(node: LN) -> bool:
5457 """Return True if `node` holds a `yield` or `yield from` expression."""
5458 if node.type == syms.yield_expr:
5461 if node.type == token.NAME and node.value == "yield": # type: ignore
5464 if node.type != syms.atom:
5467 if len(node.children) != 3:
5470 lpar, expr, rpar = node.children
5471 if lpar.type == token.LPAR and rpar.type == token.RPAR:
5472 return is_yield(expr)
5477 def is_vararg(leaf: Leaf, within: Set[NodeType]) -> bool:
5478 """Return True if `leaf` is a star or double star in a vararg or kwarg.
5480 If `within` includes VARARGS_PARENTS, this applies to function signatures.
5481 If `within` includes UNPACKING_PARENTS, it applies to right hand-side
5482 extended iterable unpacking (PEP 3132) and additional unpacking
5483 generalizations (PEP 448).
5485 if leaf.type not in VARARGS_SPECIALS or not leaf.parent:
5489 if p.type == syms.star_expr:
5490 # Star expressions are also used as assignment targets in extended
5491 # iterable unpacking (PEP 3132). See what its parent is instead.
5497 return p.type in within
5500 def is_multiline_string(leaf: Leaf) -> bool:
5501 """Return True if `leaf` is a multiline string that actually spans many lines."""
5502 return has_triple_quotes(leaf.value) and "\n" in leaf.value
5505 def is_stub_suite(node: Node) -> bool:
5506 """Return True if `node` is a suite with a stub body."""
5508 len(node.children) != 4
5509 or node.children[0].type != token.NEWLINE
5510 or node.children[1].type != token.INDENT
5511 or node.children[3].type != token.DEDENT
5515 return is_stub_body(node.children[2])
5518 def is_stub_body(node: LN) -> bool:
5519 """Return True if `node` is a simple statement containing an ellipsis."""
5520 if not isinstance(node, Node) or node.type != syms.simple_stmt:
5523 if len(node.children) != 2:
5526 child = node.children[0]
5528 child.type == syms.atom
5529 and len(child.children) == 3
5530 and all(leaf == Leaf(token.DOT, ".") for leaf in child.children)
5534 def max_delimiter_priority_in_atom(node: LN) -> Priority:
5535 """Return maximum delimiter priority inside `node`.
5537 This is specific to atoms with contents contained in a pair of parentheses.
5538 If `node` isn't an atom or there are no enclosing parentheses, returns 0.
5540 if node.type != syms.atom:
5543 first = node.children[0]
5544 last = node.children[-1]
5545 if not (first.type == token.LPAR and last.type == token.RPAR):
5548 bt = BracketTracker()
5549 for c in node.children[1:-1]:
5550 if isinstance(c, Leaf):
5553 for leaf in c.leaves():
5556 return bt.max_delimiter_priority()
5562 def ensure_visible(leaf: Leaf) -> None:
5563 """Make sure parentheses are visible.
5565 They could be invisible as part of some statements (see
5566 :func:`normalize_invisible_parens` and :func:`visit_import_from`).
5568 if leaf.type == token.LPAR:
5570 elif leaf.type == token.RPAR:
5574 def should_explode(line: Line, opening_bracket: Leaf) -> bool:
5575 """Should `line` immediately be split with `delimiter_split()` after RHS?"""
5578 opening_bracket.parent
5579 and opening_bracket.parent.type in {syms.atom, syms.import_from}
5580 and opening_bracket.value in "[{("
5585 last_leaf = line.leaves[-1]
5586 exclude = {id(last_leaf)} if last_leaf.type == token.COMMA else set()
5587 max_priority = line.bracket_tracker.max_delimiter_priority(exclude=exclude)
5588 except (IndexError, ValueError):
5591 return max_priority == COMMA_PRIORITY
5594 def get_features_used(node: Node) -> Set[Feature]:
5595 """Return a set of (relatively) new Python features used in this file.
5597 Currently looking for:
5599 - underscores in numeric literals;
5600 - trailing commas after * or ** in function signatures and calls;
5601 - positional only arguments in function signatures and lambdas;
5603 features: Set[Feature] = set()
5604 for n in node.pre_order():
5605 if n.type == token.STRING:
5606 value_head = n.value[:2] # type: ignore
5607 if value_head in {'f"', 'F"', "f'", "F'", "rf", "fr", "RF", "FR"}:
5608 features.add(Feature.F_STRINGS)
5610 elif n.type == token.NUMBER:
5611 if "_" in n.value: # type: ignore
5612 features.add(Feature.NUMERIC_UNDERSCORES)
5614 elif n.type == token.SLASH:
5615 if n.parent and n.parent.type in {syms.typedargslist, syms.arglist}:
5616 features.add(Feature.POS_ONLY_ARGUMENTS)
5618 elif n.type == token.COLONEQUAL:
5619 features.add(Feature.ASSIGNMENT_EXPRESSIONS)
5622 n.type in {syms.typedargslist, syms.arglist}
5624 and n.children[-1].type == token.COMMA
5626 if n.type == syms.typedargslist:
5627 feature = Feature.TRAILING_COMMA_IN_DEF
5629 feature = Feature.TRAILING_COMMA_IN_CALL
5631 for ch in n.children:
5632 if ch.type in STARS:
5633 features.add(feature)
5635 if ch.type == syms.argument:
5636 for argch in ch.children:
5637 if argch.type in STARS:
5638 features.add(feature)
5643 def detect_target_versions(node: Node) -> Set[TargetVersion]:
5644 """Detect the version to target based on the nodes used."""
5645 features = get_features_used(node)
5647 version for version in TargetVersion if features <= VERSION_TO_FEATURES[version]
5651 def generate_trailers_to_omit(line: Line, line_length: int) -> Iterator[Set[LeafID]]:
5652 """Generate sets of closing bracket IDs that should be omitted in a RHS.
5654 Brackets can be omitted if the entire trailer up to and including
5655 a preceding closing bracket fits in one line.
5657 Yielded sets are cumulative (contain results of previous yields, too). First
5661 omit: Set[LeafID] = set()
5664 length = 4 * line.depth
5665 opening_bracket: Optional[Leaf] = None
5666 closing_bracket: Optional[Leaf] = None
5667 inner_brackets: Set[LeafID] = set()
5668 for index, leaf, leaf_length in enumerate_with_length(line, reversed=True):
5669 length += leaf_length
5670 if length > line_length:
5673 has_inline_comment = leaf_length > len(leaf.value) + len(leaf.prefix)
5674 if leaf.type == STANDALONE_COMMENT or has_inline_comment:
5678 if leaf is opening_bracket:
5679 opening_bracket = None
5680 elif leaf.type in CLOSING_BRACKETS:
5681 inner_brackets.add(id(leaf))
5682 elif leaf.type in CLOSING_BRACKETS:
5683 if index > 0 and line.leaves[index - 1].type in OPENING_BRACKETS:
5684 # Empty brackets would fail a split so treat them as "inner"
5685 # brackets (e.g. only add them to the `omit` set if another
5686 # pair of brackets was good enough.
5687 inner_brackets.add(id(leaf))
5691 omit.add(id(closing_bracket))
5692 omit.update(inner_brackets)
5693 inner_brackets.clear()
5697 opening_bracket = leaf.opening_bracket
5698 closing_bracket = leaf
5701 def get_future_imports(node: Node) -> Set[str]:
5702 """Return a set of __future__ imports in the file."""
5703 imports: Set[str] = set()
5705 def get_imports_from_children(children: List[LN]) -> Generator[str, None, None]:
5706 for child in children:
5707 if isinstance(child, Leaf):
5708 if child.type == token.NAME:
5711 elif child.type == syms.import_as_name:
5712 orig_name = child.children[0]
5713 assert isinstance(orig_name, Leaf), "Invalid syntax parsing imports"
5714 assert orig_name.type == token.NAME, "Invalid syntax parsing imports"
5715 yield orig_name.value
5717 elif child.type == syms.import_as_names:
5718 yield from get_imports_from_children(child.children)
5721 raise AssertionError("Invalid syntax parsing imports")
5723 for child in node.children:
5724 if child.type != syms.simple_stmt:
5727 first_child = child.children[0]
5728 if isinstance(first_child, Leaf):
5729 # Continue looking if we see a docstring; otherwise stop.
5731 len(child.children) == 2
5732 and first_child.type == token.STRING
5733 and child.children[1].type == token.NEWLINE
5739 elif first_child.type == syms.import_from:
5740 module_name = first_child.children[1]
5741 if not isinstance(module_name, Leaf) or module_name.value != "__future__":
5744 imports |= set(get_imports_from_children(first_child.children[3:]))
5752 def get_gitignore(root: Path) -> PathSpec:
5753 """ Return a PathSpec matching gitignore content if present."""
5754 gitignore = root / ".gitignore"
5755 lines: List[str] = []
5756 if gitignore.is_file():
5757 with gitignore.open() as gf:
5758 lines = gf.readlines()
5759 return PathSpec.from_lines("gitwildmatch", lines)
5762 def gen_python_files(
5763 paths: Iterable[Path],
5765 include: Optional[Pattern[str]],
5766 exclude_regexes: Iterable[Pattern[str]],
5768 gitignore: PathSpec,
5769 ) -> Iterator[Path]:
5770 """Generate all files under `path` whose paths are not excluded by the
5771 `exclude` regex, but are included by the `include` regex.
5773 Symbolic links pointing outside of the `root` directory are ignored.
5775 `report` is where output about exclusions goes.
5777 assert root.is_absolute(), f"INTERNAL ERROR: `root` must be absolute but is {root}"
5779 # Then ignore with `exclude` option.
5781 normalized_path = child.resolve().relative_to(root).as_posix()
5782 except OSError as e:
5783 report.path_ignored(child, f"cannot be read because {e}")
5786 if child.is_symlink():
5787 report.path_ignored(
5788 child, f"is a symbolic link that points outside {root}"
5794 # First ignore files matching .gitignore
5795 if gitignore.match_file(normalized_path):
5796 report.path_ignored(child, "matches the .gitignore file content")
5799 normalized_path = "/" + normalized_path
5801 normalized_path += "/"
5804 for exclude in exclude_regexes:
5805 exclude_match = exclude.search(normalized_path) if exclude else None
5806 if exclude_match and exclude_match.group(0):
5807 report.path_ignored(child, "matches the --exclude regular expression")
5814 yield from gen_python_files(
5815 child.iterdir(), root, include, exclude_regexes, report, gitignore
5818 elif child.is_file():
5819 include_match = include.search(normalized_path) if include else True
5825 def find_project_root(srcs: Iterable[str]) -> Path:
5826 """Return a directory containing .git, .hg, or pyproject.toml.
5828 That directory will be a common parent of all files and directories
5831 If no directory in the tree contains a marker that would specify it's the
5832 project root, the root of the file system is returned.
5835 return Path("/").resolve()
5837 path_srcs = [Path(src).resolve() for src in srcs]
5839 # A list of lists of parents for each 'src'. 'src' is included as a
5840 # "parent" of itself if it is a directory
5842 list(path.parents) + ([path] if path.is_dir() else []) for path in path_srcs
5846 set.intersection(*(set(parents) for parents in src_parents)),
5847 key=lambda path: path.parts,
5850 for directory in (common_base, *common_base.parents):
5851 if (directory / ".git").exists():
5854 if (directory / ".hg").is_dir():
5857 if (directory / "pyproject.toml").is_file():
5865 """Provides a reformatting counter. Can be rendered with `str(report)`."""
5870 verbose: bool = False
5871 change_count: int = 0
5873 failure_count: int = 0
5875 def done(self, src: Path, changed: Changed) -> None:
5876 """Increment the counter for successful reformatting. Write out a message."""
5877 if changed is Changed.YES:
5878 reformatted = "would reformat" if self.check or self.diff else "reformatted"
5879 if self.verbose or not self.quiet:
5880 out(f"{reformatted} {src}")
5881 self.change_count += 1
5884 if changed is Changed.NO:
5885 msg = f"{src} already well formatted, good job."
5887 msg = f"{src} wasn't modified on disk since last run."
5888 out(msg, bold=False)
5889 self.same_count += 1
5891 def failed(self, src: Path, message: str) -> None:
5892 """Increment the counter for failed reformatting. Write out a message."""
5893 err(f"error: cannot format {src}: {message}")
5894 self.failure_count += 1
5896 def path_ignored(self, path: Path, message: str) -> None:
5898 out(f"{path} ignored: {message}", bold=False)
5901 def return_code(self) -> int:
5902 """Return the exit code that the app should use.
5904 This considers the current state of changed files and failures:
5905 - if there were any failures, return 123;
5906 - if any files were changed and --check is being used, return 1;
5907 - otherwise return 0.
5909 # According to http://tldp.org/LDP/abs/html/exitcodes.html starting with
5910 # 126 we have special return codes reserved by the shell.
5911 if self.failure_count:
5914 elif self.change_count and self.check:
5919 def __str__(self) -> str:
5920 """Render a color report of the current state.
5922 Use `click.unstyle` to remove colors.
5924 if self.check or self.diff:
5925 reformatted = "would be reformatted"
5926 unchanged = "would be left unchanged"
5927 failed = "would fail to reformat"
5929 reformatted = "reformatted"
5930 unchanged = "left unchanged"
5931 failed = "failed to reformat"
5933 if self.change_count:
5934 s = "s" if self.change_count > 1 else ""
5936 click.style(f"{self.change_count} file{s} {reformatted}", bold=True)
5939 s = "s" if self.same_count > 1 else ""
5940 report.append(f"{self.same_count} file{s} {unchanged}")
5941 if self.failure_count:
5942 s = "s" if self.failure_count > 1 else ""
5944 click.style(f"{self.failure_count} file{s} {failed}", fg="red")
5946 return ", ".join(report) + "."
5949 def parse_ast(src: str) -> Union[ast.AST, ast3.AST, ast27.AST]:
5950 filename = "<unknown>"
5951 if sys.version_info >= (3, 8):
5952 # TODO: support Python 4+ ;)
5953 for minor_version in range(sys.version_info[1], 4, -1):
5955 return ast.parse(src, filename, feature_version=(3, minor_version))
5959 for feature_version in (7, 6):
5961 return ast3.parse(src, filename, feature_version=feature_version)
5965 return ast27.parse(src)
5968 def _fixup_ast_constants(
5969 node: Union[ast.AST, ast3.AST, ast27.AST]
5970 ) -> Union[ast.AST, ast3.AST, ast27.AST]:
5971 """Map ast nodes deprecated in 3.8 to Constant."""
5972 if isinstance(node, (ast.Str, ast3.Str, ast27.Str, ast.Bytes, ast3.Bytes)):
5973 return ast.Constant(value=node.s)
5975 if isinstance(node, (ast.Num, ast3.Num, ast27.Num)):
5976 return ast.Constant(value=node.n)
5978 if isinstance(node, (ast.NameConstant, ast3.NameConstant)):
5979 return ast.Constant(value=node.value)
5985 node: Union[ast.AST, ast3.AST, ast27.AST], depth: int = 0
5987 """Simple visitor generating strings to compare ASTs by content."""
5989 node = _fixup_ast_constants(node)
5991 yield f"{' ' * depth}{node.__class__.__name__}("
5993 for field in sorted(node._fields): # noqa: F402
5994 # TypeIgnore has only one field 'lineno' which breaks this comparison
5995 type_ignore_classes = (ast3.TypeIgnore, ast27.TypeIgnore)
5996 if sys.version_info >= (3, 8):
5997 type_ignore_classes += (ast.TypeIgnore,)
5998 if isinstance(node, type_ignore_classes):
6002 value = getattr(node, field)
6003 except AttributeError:
6006 yield f"{' ' * (depth+1)}{field}="
6008 if isinstance(value, list):
6010 # Ignore nested tuples within del statements, because we may insert
6011 # parentheses and they change the AST.
6014 and isinstance(node, (ast.Delete, ast3.Delete, ast27.Delete))
6015 and isinstance(item, (ast.Tuple, ast3.Tuple, ast27.Tuple))
6017 for item in item.elts:
6018 yield from _stringify_ast(item, depth + 2)
6020 elif isinstance(item, (ast.AST, ast3.AST, ast27.AST)):
6021 yield from _stringify_ast(item, depth + 2)
6023 elif isinstance(value, (ast.AST, ast3.AST, ast27.AST)):
6024 yield from _stringify_ast(value, depth + 2)
6027 # Constant strings may be indented across newlines, if they are
6028 # docstrings; fold spaces after newlines when comparing. Similarly,
6029 # trailing and leading space may be removed.
6031 isinstance(node, ast.Constant)
6032 and field == "value"
6033 and isinstance(value, str)
6035 normalized = re.sub(r" *\n[ \t]+", "\n ", value).strip()
6038 yield f"{' ' * (depth+2)}{normalized!r}, # {value.__class__.__name__}"
6040 yield f"{' ' * depth}) # /{node.__class__.__name__}"
6043 def assert_equivalent(src: str, dst: str) -> None:
6044 """Raise AssertionError if `src` and `dst` aren't equivalent."""
6046 src_ast = parse_ast(src)
6047 except Exception as exc:
6048 raise AssertionError(
6049 "cannot use --safe with this file; failed to parse source file. AST"
6050 f" error message: {exc}"
6054 dst_ast = parse_ast(dst)
6055 except Exception as exc:
6056 log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
6057 raise AssertionError(
6058 f"INTERNAL ERROR: Black produced invalid code: {exc}. Please report a bug"
6059 " on https://github.com/psf/black/issues. This invalid output might be"
6063 src_ast_str = "\n".join(_stringify_ast(src_ast))
6064 dst_ast_str = "\n".join(_stringify_ast(dst_ast))
6065 if src_ast_str != dst_ast_str:
6066 log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst"))
6067 raise AssertionError(
6068 "INTERNAL ERROR: Black produced code that is not equivalent to the"
6069 " source. Please report a bug on https://github.com/psf/black/issues. "
6070 f" This diff might be helpful: {log}"
6074 def assert_stable(src: str, dst: str, mode: Mode) -> None:
6075 """Raise AssertionError if `dst` reformats differently the second time."""
6076 newdst = format_str(dst, mode=mode)
6079 diff(src, dst, "source", "first pass"),
6080 diff(dst, newdst, "first pass", "second pass"),
6082 raise AssertionError(
6083 "INTERNAL ERROR: Black produced different code on the second pass of the"
6084 " formatter. Please report a bug on https://github.com/psf/black/issues."
6085 f" This diff might be helpful: {log}"
6089 @mypyc_attr(patchable=True)
6090 def dump_to_file(*output: str) -> str:
6091 """Dump `output` to a temporary file. Return path to the file."""
6092 with tempfile.NamedTemporaryFile(
6093 mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
6095 for lines in output:
6097 if lines and lines[-1] != "\n":
6103 def nullcontext() -> Iterator[None]:
6104 """Return an empty context manager.
6106 To be used like `nullcontext` in Python 3.7.
6111 def diff(a: str, b: str, a_name: str, b_name: str) -> str:
6112 """Return a unified diff string between strings `a` and `b`."""
6115 a_lines = [line + "\n" for line in a.splitlines()]
6116 b_lines = [line + "\n" for line in b.splitlines()]
6118 difflib.unified_diff(a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5)
6122 def cancel(tasks: Iterable["asyncio.Task[Any]"]) -> None:
6123 """asyncio signal handler that cancels all `tasks` and reports to stderr."""
6129 def shutdown(loop: asyncio.AbstractEventLoop) -> None:
6130 """Cancel all pending tasks on `loop`, wait for them, and close the loop."""
6132 if sys.version_info[:2] >= (3, 7):
6133 all_tasks = asyncio.all_tasks
6135 all_tasks = asyncio.Task.all_tasks
6136 # This part is borrowed from asyncio/runners.py in Python 3.7b2.
6137 to_cancel = [task for task in all_tasks(loop) if not task.done()]
6141 for task in to_cancel:
6143 loop.run_until_complete(
6144 asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
6147 # `concurrent.futures.Future` objects cannot be cancelled once they
6148 # are already running. There might be some when the `shutdown()` happened.
6149 # Silence their logger's spew about the event loop being closed.
6150 cf_logger = logging.getLogger("concurrent.futures")
6151 cf_logger.setLevel(logging.CRITICAL)
6155 def sub_twice(regex: Pattern[str], replacement: str, original: str) -> str:
6156 """Replace `regex` with `replacement` twice on `original`.
6158 This is used by string normalization to perform replaces on
6159 overlapping matches.
6161 return regex.sub(replacement, regex.sub(replacement, original))
6164 def re_compile_maybe_verbose(regex: str) -> Pattern[str]:
6165 """Compile a regular expression string in `regex`.
6167 If it contains newlines, use verbose mode.
6170 regex = "(?x)" + regex
6171 compiled: Pattern[str] = re.compile(regex)
6175 def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
6176 """Like `reversed(enumerate(sequence))` if that were possible."""
6177 index = len(sequence) - 1
6178 for element in reversed(sequence):
6179 yield (index, element)
6183 def enumerate_with_length(
6184 line: Line, reversed: bool = False
6185 ) -> Iterator[Tuple[Index, Leaf, int]]:
6186 """Return an enumeration of leaves with their length.
6188 Stops prematurely on multiline strings and standalone comments.
6191 Callable[[Sequence[Leaf]], Iterator[Tuple[Index, Leaf]]],
6192 enumerate_reversed if reversed else enumerate,
6194 for index, leaf in op(line.leaves):
6195 length = len(leaf.prefix) + len(leaf.value)
6196 if "\n" in leaf.value:
6197 return # Multiline strings, we can't continue.
6199 for comment in line.comments_after(leaf):
6200 length += len(comment.value)
6202 yield index, leaf, length
6205 def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
6206 """Return True if `line` is no longer than `line_length`.
6208 Uses the provided `line_str` rendering, if any, otherwise computes a new one.
6211 line_str = line_to_string(line)
6213 len(line_str) <= line_length
6214 and "\n" not in line_str # multiline strings
6215 and not line.contains_standalone_comments()
6219 def can_be_split(line: Line) -> bool:
6220 """Return False if the line cannot be split *for sure*.
6222 This is not an exhaustive search but a cheap heuristic that we can use to
6223 avoid some unfortunate formattings (mostly around wrapping unsplittable code
6224 in unnecessary parentheses).
6226 leaves = line.leaves
6230 if leaves[0].type == token.STRING and leaves[1].type == token.DOT:
6234 for leaf in leaves[-2::-1]:
6235 if leaf.type in OPENING_BRACKETS:
6236 if next.type not in CLOSING_BRACKETS:
6240 elif leaf.type == token.DOT:
6242 elif leaf.type == token.NAME:
6243 if not (next.type == token.DOT or next.type in OPENING_BRACKETS):
6246 elif leaf.type not in CLOSING_BRACKETS:
6249 if dot_count > 1 and call_count > 1:
6255 def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
6256 """Does `line` have a shape safe to reformat without optional parens around it?
6258 Returns True for only a subset of potentially nice looking formattings but
6259 the point is to not return false positives that end up producing lines that
6262 bt = line.bracket_tracker
6263 if not bt.delimiters:
6264 # Without delimiters the optional parentheses are useless.
6267 max_priority = bt.max_delimiter_priority()
6268 if bt.delimiter_count_with_priority(max_priority) > 1:
6269 # With more than one delimiter of a kind the optional parentheses read better.
6272 if max_priority == DOT_PRIORITY:
6273 # A single stranded method call doesn't require optional parentheses.
6276 assert len(line.leaves) >= 2, "Stranded delimiter"
6278 first = line.leaves[0]
6279 second = line.leaves[1]
6280 penultimate = line.leaves[-2]
6281 last = line.leaves[-1]
6283 # With a single delimiter, omit if the expression starts or ends with
6285 if first.type in OPENING_BRACKETS and second.type not in CLOSING_BRACKETS:
6287 length = 4 * line.depth
6288 for _index, leaf, leaf_length in enumerate_with_length(line):
6289 if leaf.type in CLOSING_BRACKETS and leaf.opening_bracket is first:
6292 length += leaf_length
6293 if length > line_length:
6296 if leaf.type in OPENING_BRACKETS:
6297 # There are brackets we can further split on.
6301 # checked the entire string and line length wasn't exceeded
6302 if len(line.leaves) == _index + 1:
6305 # Note: we are not returning False here because a line might have *both*
6306 # a leading opening bracket and a trailing closing bracket. If the
6307 # opening bracket doesn't match our rule, maybe the closing will.
6310 last.type == token.RPAR
6311 or last.type == token.RBRACE
6313 # don't use indexing for omitting optional parentheses;
6315 last.type == token.RSQB
6317 and last.parent.type != syms.trailer
6320 if penultimate.type in OPENING_BRACKETS:
6321 # Empty brackets don't help.
6324 if is_multiline_string(first):
6325 # Additional wrapping of a multiline string in this situation is
6329 length = 4 * line.depth
6330 seen_other_brackets = False
6331 for _index, leaf, leaf_length in enumerate_with_length(line):
6332 length += leaf_length
6333 if leaf is last.opening_bracket:
6334 if seen_other_brackets or length <= line_length:
6337 elif leaf.type in OPENING_BRACKETS:
6338 # There are brackets we can further split on.
6339 seen_other_brackets = True
6344 def get_cache_file(mode: Mode) -> Path:
6345 return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
6348 def read_cache(mode: Mode) -> Cache:
6349 """Read the cache if it exists and is well formed.
6351 If it is not well formed, the call to write_cache later should resolve the issue.
6353 cache_file = get_cache_file(mode)
6354 if not cache_file.exists():
6357 with cache_file.open("rb") as fobj:
6359 cache: Cache = pickle.load(fobj)
6360 except (pickle.UnpicklingError, ValueError):
6366 def get_cache_info(path: Path) -> CacheInfo:
6367 """Return the information used to check if a file is already formatted or not."""
6369 return stat.st_mtime, stat.st_size
6372 def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
6373 """Split an iterable of paths in `sources` into two sets.
6375 The first contains paths of files that modified on disk or are not in the
6376 cache. The other contains paths to non-modified files.
6378 todo, done = set(), set()
6381 if cache.get(src) != get_cache_info(src):
6388 def write_cache(cache: Cache, sources: Iterable[Path], mode: Mode) -> None:
6389 """Update the cache file."""
6390 cache_file = get_cache_file(mode)
6392 CACHE_DIR.mkdir(parents=True, exist_ok=True)
6393 new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
6394 with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
6395 pickle.dump(new_cache, f, protocol=4)
6396 os.replace(f.name, cache_file)
6401 def patch_click() -> None:
6402 """Make Click not crash.
6404 On certain misconfigured environments, Python 3 selects the ASCII encoding as the
6405 default which restricts paths that it can access during the lifetime of the
6406 application. Click refuses to work in this scenario by raising a RuntimeError.
6408 In case of Black the likelihood that non-ASCII characters are going to be used in
6409 file paths is minimal since it's Python source code. Moreover, this crash was
6410 spurious on Python 3.7 thanks to PEP 538 and PEP 540.
6413 from click import core
6414 from click import _unicodefun # type: ignore
6415 except ModuleNotFoundError:
6418 for module in (core, _unicodefun):
6419 if hasattr(module, "_verify_python3_env"):
6420 module._verify_python3_env = lambda: None
6423 def patched_main() -> None:
6429 def fix_docstring(docstring: str, prefix: str) -> str:
6430 # https://www.python.org/dev/peps/pep-0257/#handling-docstring-indentation
6433 # Convert tabs to spaces (following the normal Python rules)
6434 # and split into a list of lines:
6435 lines = docstring.expandtabs().splitlines()
6436 # Determine minimum indentation (first line doesn't count):
6437 indent = sys.maxsize
6438 for line in lines[1:]:
6439 stripped = line.lstrip()
6441 indent = min(indent, len(line) - len(stripped))
6442 # Remove indentation (first line is special):
6443 trimmed = [lines[0].strip()]
6444 if indent < sys.maxsize:
6445 last_line_idx = len(lines) - 2
6446 for i, line in enumerate(lines[1:]):
6447 stripped_line = line[indent:].rstrip()
6448 if stripped_line or i == last_line_idx:
6449 trimmed.append(prefix + stripped_line)
6452 # Return a single string:
6453 return "\n".join(trimmed)
6456 if __name__ == "__main__":