]> git.madduck.net Git - etc/vim.git/blob - black.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

fix doc generation
[etc/vim.git] / black.py
1 import ast
2 import asyncio
3 from concurrent.futures import Executor, ProcessPoolExecutor
4 from contextlib import contextmanager
5 from datetime import datetime
6 from enum import Enum
7 from functools import lru_cache, partial, wraps
8 import io
9 import itertools
10 import logging
11 from multiprocessing import Manager, freeze_support
12 import os
13 from pathlib import Path
14 import pickle
15 import re
16 import signal
17 import sys
18 import tempfile
19 import tokenize
20 import traceback
21 from typing import (
22     Any,
23     Callable,
24     Collection,
25     Dict,
26     Generator,
27     Generic,
28     Iterable,
29     Iterator,
30     List,
31     Optional,
32     Pattern,
33     Sequence,
34     Set,
35     Tuple,
36     TypeVar,
37     Union,
38     cast,
39 )
40
41 from appdirs import user_cache_dir
42 from attr import dataclass, evolve, Factory
43 import click
44 import toml
45 from typed_ast import ast3, ast27
46
47 # lib2to3 fork
48 from blib2to3.pytree import Node, Leaf, type_repr
49 from blib2to3 import pygram, pytree
50 from blib2to3.pgen2 import driver, token
51 from blib2to3.pgen2.grammar import Grammar
52 from blib2to3.pgen2.parse import ParseError
53
54 from _version import version as __version__
55
56 DEFAULT_LINE_LENGTH = 88
57 DEFAULT_EXCLUDES = (
58     r"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|_build|buck-out|build|dist)/"
59 )
60 DEFAULT_INCLUDES = r"\.pyi?$"
61 CACHE_DIR = Path(user_cache_dir("black", version=__version__))
62
63
64 # types
65 FileContent = str
66 Encoding = str
67 NewLine = str
68 Depth = int
69 NodeType = int
70 LeafID = int
71 Priority = int
72 Index = int
73 LN = Union[Leaf, Node]
74 SplitFunc = Callable[["Line", Collection["Feature"]], Iterator["Line"]]
75 Timestamp = float
76 FileSize = int
77 CacheInfo = Tuple[Timestamp, FileSize]
78 Cache = Dict[Path, CacheInfo]
79 out = partial(click.secho, bold=True, err=True)
80 err = partial(click.secho, fg="red", err=True)
81
82 pygram.initialize(CACHE_DIR)
83 syms = pygram.python_symbols
84
85
86 class NothingChanged(UserWarning):
87     """Raised when reformatted code is the same as source."""
88
89
90 class CannotSplit(Exception):
91     """A readable split that fits the allotted line length is impossible."""
92
93
94 class InvalidInput(ValueError):
95     """Raised when input source code fails all parse attempts."""
96
97
98 class WriteBack(Enum):
99     NO = 0
100     YES = 1
101     DIFF = 2
102     CHECK = 3
103
104     @classmethod
105     def from_configuration(cls, *, check: bool, diff: bool) -> "WriteBack":
106         if check and not diff:
107             return cls.CHECK
108
109         return cls.DIFF if diff else cls.YES
110
111
112 class Changed(Enum):
113     NO = 0
114     CACHED = 1
115     YES = 2
116
117
118 class TargetVersion(Enum):
119     PY27 = 2
120     PY33 = 3
121     PY34 = 4
122     PY35 = 5
123     PY36 = 6
124     PY37 = 7
125     PY38 = 8
126
127     def is_python2(self) -> bool:
128         return self is TargetVersion.PY27
129
130
131 PY36_VERSIONS = {TargetVersion.PY36, TargetVersion.PY37, TargetVersion.PY38}
132
133
134 class Feature(Enum):
135     # All string literals are unicode
136     UNICODE_LITERALS = 1
137     F_STRINGS = 2
138     NUMERIC_UNDERSCORES = 3
139     TRAILING_COMMA_IN_CALL = 4
140     TRAILING_COMMA_IN_DEF = 5
141     # The following two feature-flags are mutually exclusive, and exactly one should be
142     # set for every version of python.
143     ASYNC_IDENTIFIERS = 6
144     ASYNC_KEYWORDS = 7
145     ASSIGNMENT_EXPRESSIONS = 8
146     POS_ONLY_ARGUMENTS = 9
147
148
149 VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
150     TargetVersion.PY27: {Feature.ASYNC_IDENTIFIERS},
151     TargetVersion.PY33: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
152     TargetVersion.PY34: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
153     TargetVersion.PY35: {
154         Feature.UNICODE_LITERALS,
155         Feature.TRAILING_COMMA_IN_CALL,
156         Feature.ASYNC_IDENTIFIERS,
157     },
158     TargetVersion.PY36: {
159         Feature.UNICODE_LITERALS,
160         Feature.F_STRINGS,
161         Feature.NUMERIC_UNDERSCORES,
162         Feature.TRAILING_COMMA_IN_CALL,
163         Feature.TRAILING_COMMA_IN_DEF,
164         Feature.ASYNC_IDENTIFIERS,
165     },
166     TargetVersion.PY37: {
167         Feature.UNICODE_LITERALS,
168         Feature.F_STRINGS,
169         Feature.NUMERIC_UNDERSCORES,
170         Feature.TRAILING_COMMA_IN_CALL,
171         Feature.TRAILING_COMMA_IN_DEF,
172         Feature.ASYNC_KEYWORDS,
173     },
174     TargetVersion.PY38: {
175         Feature.UNICODE_LITERALS,
176         Feature.F_STRINGS,
177         Feature.NUMERIC_UNDERSCORES,
178         Feature.TRAILING_COMMA_IN_CALL,
179         Feature.TRAILING_COMMA_IN_DEF,
180         Feature.ASYNC_KEYWORDS,
181         Feature.ASSIGNMENT_EXPRESSIONS,
182         Feature.POS_ONLY_ARGUMENTS,
183     },
184 }
185
186
187 @dataclass
188 class FileMode:
189     target_versions: Set[TargetVersion] = Factory(set)
190     line_length: int = DEFAULT_LINE_LENGTH
191     string_normalization: bool = True
192     is_pyi: bool = False
193
194     def get_cache_key(self) -> str:
195         if self.target_versions:
196             version_str = ",".join(
197                 str(version.value)
198                 for version in sorted(self.target_versions, key=lambda v: v.value)
199             )
200         else:
201             version_str = "-"
202         parts = [
203             version_str,
204             str(self.line_length),
205             str(int(self.string_normalization)),
206             str(int(self.is_pyi)),
207         ]
208         return ".".join(parts)
209
210
211 def supports_feature(target_versions: Set[TargetVersion], feature: Feature) -> bool:
212     return all(feature in VERSION_TO_FEATURES[version] for version in target_versions)
213
214
215 def read_pyproject_toml(
216     ctx: click.Context, param: click.Parameter, value: Union[str, int, bool, None]
217 ) -> Optional[str]:
218     """Inject Black configuration from "pyproject.toml" into defaults in `ctx`.
219
220     Returns the path to a successfully found and read configuration file, None
221     otherwise.
222     """
223     assert not isinstance(value, (int, bool)), "Invalid parameter type passed"
224     if not value:
225         root = find_project_root(ctx.params.get("src", ()))
226         path = root / "pyproject.toml"
227         if path.is_file():
228             value = str(path)
229         else:
230             return None
231
232     try:
233         pyproject_toml = toml.load(value)
234         config = pyproject_toml.get("tool", {}).get("black", {})
235     except (toml.TomlDecodeError, OSError) as e:
236         raise click.FileError(
237             filename=value, hint=f"Error reading configuration file: {e}"
238         )
239
240     if not config:
241         return None
242
243     if ctx.default_map is None:
244         ctx.default_map = {}
245     ctx.default_map.update(  # type: ignore  # bad types in .pyi
246         {k.replace("--", "").replace("-", "_"): v for k, v in config.items()}
247     )
248     return value
249
250
251 @click.command(context_settings=dict(help_option_names=["-h", "--help"]))
252 @click.option("-c", "--code", type=str, help="Format the code passed in as a string.")
253 @click.option(
254     "-l",
255     "--line-length",
256     type=int,
257     default=DEFAULT_LINE_LENGTH,
258     help="How many characters per line to allow.",
259     show_default=True,
260 )
261 @click.option(
262     "-t",
263     "--target-version",
264     type=click.Choice([v.name.lower() for v in TargetVersion]),
265     callback=lambda c, p, v: [TargetVersion[val.upper()] for val in v],
266     multiple=True,
267     help=(
268         "Python versions that should be supported by Black's output. [default: "
269         "per-file auto-detection]"
270     ),
271 )
272 @click.option(
273     "--py36",
274     is_flag=True,
275     help=(
276         "Allow using Python 3.6-only syntax on all input files.  This will put "
277         "trailing commas in function signatures and calls also after *args and "
278         "**kwargs. Deprecated; use --target-version instead. "
279         "[default: per-file auto-detection]"
280     ),
281 )
282 @click.option(
283     "--pyi",
284     is_flag=True,
285     help=(
286         "Format all input files like typing stubs regardless of file extension "
287         "(useful when piping source on standard input)."
288     ),
289 )
290 @click.option(
291     "-S",
292     "--skip-string-normalization",
293     is_flag=True,
294     help="Don't normalize string quotes or prefixes.",
295 )
296 @click.option(
297     "--check",
298     is_flag=True,
299     help=(
300         "Don't write the files back, just return the status.  Return code 0 "
301         "means nothing would change.  Return code 1 means some files would be "
302         "reformatted.  Return code 123 means there was an internal error."
303     ),
304 )
305 @click.option(
306     "--diff",
307     is_flag=True,
308     help="Don't write the files back, just output a diff for each file on stdout.",
309 )
310 @click.option(
311     "--fast/--safe",
312     is_flag=True,
313     help="If --fast given, skip temporary sanity checks. [default: --safe]",
314 )
315 @click.option(
316     "--include",
317     type=str,
318     default=DEFAULT_INCLUDES,
319     help=(
320         "A regular expression that matches files and directories that should be "
321         "included on recursive searches.  An empty value means all files are "
322         "included regardless of the name.  Use forward slashes for directories on "
323         "all platforms (Windows, too).  Exclusions are calculated first, inclusions "
324         "later."
325     ),
326     show_default=True,
327 )
328 @click.option(
329     "--exclude",
330     type=str,
331     default=DEFAULT_EXCLUDES,
332     help=(
333         "A regular expression that matches files and directories that should be "
334         "excluded on recursive searches.  An empty value means no paths are excluded. "
335         "Use forward slashes for directories on all platforms (Windows, too).  "
336         "Exclusions are calculated first, inclusions later."
337     ),
338     show_default=True,
339 )
340 @click.option(
341     "-q",
342     "--quiet",
343     is_flag=True,
344     help=(
345         "Don't emit non-error messages to stderr. Errors are still emitted; "
346         "silence those with 2>/dev/null."
347     ),
348 )
349 @click.option(
350     "-v",
351     "--verbose",
352     is_flag=True,
353     help=(
354         "Also emit messages to stderr about files that were not changed or were "
355         "ignored due to --exclude=."
356     ),
357 )
358 @click.version_option(version=__version__)
359 @click.argument(
360     "src",
361     nargs=-1,
362     type=click.Path(
363         exists=True, file_okay=True, dir_okay=True, readable=True, allow_dash=True
364     ),
365     is_eager=True,
366 )
367 @click.option(
368     "--config",
369     type=click.Path(
370         exists=False, file_okay=True, dir_okay=False, readable=True, allow_dash=False
371     ),
372     is_eager=True,
373     callback=read_pyproject_toml,
374     help="Read configuration from PATH.",
375 )
376 @click.pass_context
377 def main(
378     ctx: click.Context,
379     code: Optional[str],
380     line_length: int,
381     target_version: List[TargetVersion],
382     check: bool,
383     diff: bool,
384     fast: bool,
385     pyi: bool,
386     py36: bool,
387     skip_string_normalization: bool,
388     quiet: bool,
389     verbose: bool,
390     include: str,
391     exclude: str,
392     src: Tuple[str],
393     config: Optional[str],
394 ) -> None:
395     """The uncompromising code formatter."""
396     write_back = WriteBack.from_configuration(check=check, diff=diff)
397     if target_version:
398         if py36:
399             err(f"Cannot use both --target-version and --py36")
400             ctx.exit(2)
401         else:
402             versions = set(target_version)
403     elif py36:
404         err(
405             "--py36 is deprecated and will be removed in a future version. "
406             "Use --target-version py36 instead."
407         )
408         versions = PY36_VERSIONS
409     else:
410         # We'll autodetect later.
411         versions = set()
412     mode = FileMode(
413         target_versions=versions,
414         line_length=line_length,
415         is_pyi=pyi,
416         string_normalization=not skip_string_normalization,
417     )
418     if config and verbose:
419         out(f"Using configuration from {config}.", bold=False, fg="blue")
420     if code is not None:
421         print(format_str(code, mode=mode))
422         ctx.exit(0)
423     try:
424         include_regex = re_compile_maybe_verbose(include)
425     except re.error:
426         err(f"Invalid regular expression for include given: {include!r}")
427         ctx.exit(2)
428     try:
429         exclude_regex = re_compile_maybe_verbose(exclude)
430     except re.error:
431         err(f"Invalid regular expression for exclude given: {exclude!r}")
432         ctx.exit(2)
433     report = Report(check=check, quiet=quiet, verbose=verbose)
434     root = find_project_root(src)
435     sources: Set[Path] = set()
436     path_empty(src, quiet, verbose, ctx)
437     for s in src:
438         p = Path(s)
439         if p.is_dir():
440             sources.update(
441                 gen_python_files_in_dir(p, root, include_regex, exclude_regex, report)
442             )
443         elif p.is_file() or s == "-":
444             # if a file was explicitly given, we don't care about its extension
445             sources.add(p)
446         else:
447             err(f"invalid path: {s}")
448     if len(sources) == 0:
449         if verbose or not quiet:
450             out("No Python files are present to be formatted. Nothing to do 😴")
451         ctx.exit(0)
452
453     if len(sources) == 1:
454         reformat_one(
455             src=sources.pop(),
456             fast=fast,
457             write_back=write_back,
458             mode=mode,
459             report=report,
460         )
461     else:
462         reformat_many(
463             sources=sources, fast=fast, write_back=write_back, mode=mode, report=report
464         )
465
466     if verbose or not quiet:
467         out("Oh no! 💥 💔 💥" if report.return_code else "All done! ✨ 🍰 ✨")
468         click.secho(str(report), err=True)
469     ctx.exit(report.return_code)
470
471
472 def path_empty(src: Tuple[str], quiet: bool, verbose: bool, ctx: click.Context) -> None:
473     """
474     Exit if there is no `src` provided for formatting
475     """
476     if not src:
477         if verbose or not quiet:
478             out("No Path provided. Nothing to do 😴")
479             ctx.exit(0)
480
481
482 def reformat_one(
483     src: Path, fast: bool, write_back: WriteBack, mode: FileMode, report: "Report"
484 ) -> None:
485     """Reformat a single file under `src` without spawning child processes.
486
487     `fast`, `write_back`, and `mode` options are passed to
488     :func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
489     """
490     try:
491         changed = Changed.NO
492         if not src.is_file() and str(src) == "-":
493             if format_stdin_to_stdout(fast=fast, write_back=write_back, mode=mode):
494                 changed = Changed.YES
495         else:
496             cache: Cache = {}
497             if write_back != WriteBack.DIFF:
498                 cache = read_cache(mode)
499                 res_src = src.resolve()
500                 if res_src in cache and cache[res_src] == get_cache_info(res_src):
501                     changed = Changed.CACHED
502             if changed is not Changed.CACHED and format_file_in_place(
503                 src, fast=fast, write_back=write_back, mode=mode
504             ):
505                 changed = Changed.YES
506             if (write_back is WriteBack.YES and changed is not Changed.CACHED) or (
507                 write_back is WriteBack.CHECK and changed is Changed.NO
508             ):
509                 write_cache(cache, [src], mode)
510         report.done(src, changed)
511     except Exception as exc:
512         report.failed(src, str(exc))
513
514
515 def reformat_many(
516     sources: Set[Path],
517     fast: bool,
518     write_back: WriteBack,
519     mode: FileMode,
520     report: "Report",
521 ) -> None:
522     """Reformat multiple files using a ProcessPoolExecutor."""
523     loop = asyncio.get_event_loop()
524     worker_count = os.cpu_count()
525     if sys.platform == "win32":
526         # Work around https://bugs.python.org/issue26903
527         worker_count = min(worker_count, 61)
528     executor = ProcessPoolExecutor(max_workers=worker_count)
529     try:
530         loop.run_until_complete(
531             schedule_formatting(
532                 sources=sources,
533                 fast=fast,
534                 write_back=write_back,
535                 mode=mode,
536                 report=report,
537                 loop=loop,
538                 executor=executor,
539             )
540         )
541     finally:
542         shutdown(loop)
543         executor.shutdown()
544
545
546 async def schedule_formatting(
547     sources: Set[Path],
548     fast: bool,
549     write_back: WriteBack,
550     mode: FileMode,
551     report: "Report",
552     loop: asyncio.AbstractEventLoop,
553     executor: Executor,
554 ) -> None:
555     """Run formatting of `sources` in parallel using the provided `executor`.
556
557     (Use ProcessPoolExecutors for actual parallelism.)
558
559     `write_back`, `fast`, and `mode` options are passed to
560     :func:`format_file_in_place`.
561     """
562     cache: Cache = {}
563     if write_back != WriteBack.DIFF:
564         cache = read_cache(mode)
565         sources, cached = filter_cached(cache, sources)
566         for src in sorted(cached):
567             report.done(src, Changed.CACHED)
568     if not sources:
569         return
570
571     cancelled = []
572     sources_to_cache = []
573     lock = None
574     if write_back == WriteBack.DIFF:
575         # For diff output, we need locks to ensure we don't interleave output
576         # from different processes.
577         manager = Manager()
578         lock = manager.Lock()
579     tasks = {
580         asyncio.ensure_future(
581             loop.run_in_executor(
582                 executor, format_file_in_place, src, fast, mode, write_back, lock
583             )
584         ): src
585         for src in sorted(sources)
586     }
587     pending: Iterable[asyncio.Future] = tasks.keys()
588     try:
589         loop.add_signal_handler(signal.SIGINT, cancel, pending)
590         loop.add_signal_handler(signal.SIGTERM, cancel, pending)
591     except NotImplementedError:
592         # There are no good alternatives for these on Windows.
593         pass
594     while pending:
595         done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
596         for task in done:
597             src = tasks.pop(task)
598             if task.cancelled():
599                 cancelled.append(task)
600             elif task.exception():
601                 report.failed(src, str(task.exception()))
602             else:
603                 changed = Changed.YES if task.result() else Changed.NO
604                 # If the file was written back or was successfully checked as
605                 # well-formatted, store this information in the cache.
606                 if write_back is WriteBack.YES or (
607                     write_back is WriteBack.CHECK and changed is Changed.NO
608                 ):
609                     sources_to_cache.append(src)
610                 report.done(src, changed)
611     if cancelled:
612         await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
613     if sources_to_cache:
614         write_cache(cache, sources_to_cache, mode)
615
616
617 def format_file_in_place(
618     src: Path,
619     fast: bool,
620     mode: FileMode,
621     write_back: WriteBack = WriteBack.NO,
622     lock: Any = None,  # multiprocessing.Manager().Lock() is some crazy proxy
623 ) -> bool:
624     """Format file under `src` path. Return True if changed.
625
626     If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted
627     code to the file.
628     `mode` and `fast` options are passed to :func:`format_file_contents`.
629     """
630     if src.suffix == ".pyi":
631         mode = evolve(mode, is_pyi=True)
632
633     then = datetime.utcfromtimestamp(src.stat().st_mtime)
634     with open(src, "rb") as buf:
635         src_contents, encoding, newline = decode_bytes(buf.read())
636     try:
637         dst_contents = format_file_contents(src_contents, fast=fast, mode=mode)
638     except NothingChanged:
639         return False
640
641     if write_back == write_back.YES:
642         with open(src, "w", encoding=encoding, newline=newline) as f:
643             f.write(dst_contents)
644     elif write_back == write_back.DIFF:
645         now = datetime.utcnow()
646         src_name = f"{src}\t{then} +0000"
647         dst_name = f"{src}\t{now} +0000"
648         diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
649
650         with lock or nullcontext():
651             f = io.TextIOWrapper(
652                 sys.stdout.buffer,
653                 encoding=encoding,
654                 newline=newline,
655                 write_through=True,
656             )
657             f.write(diff_contents)
658             f.detach()
659
660     return True
661
662
663 def format_stdin_to_stdout(
664     fast: bool, *, write_back: WriteBack = WriteBack.NO, mode: FileMode
665 ) -> bool:
666     """Format file on stdin. Return True if changed.
667
668     If `write_back` is YES, write reformatted code back to stdout. If it is DIFF,
669     write a diff to stdout. The `mode` argument is passed to
670     :func:`format_file_contents`.
671     """
672     then = datetime.utcnow()
673     src, encoding, newline = decode_bytes(sys.stdin.buffer.read())
674     dst = src
675     try:
676         dst = format_file_contents(src, fast=fast, mode=mode)
677         return True
678
679     except NothingChanged:
680         return False
681
682     finally:
683         f = io.TextIOWrapper(
684             sys.stdout.buffer, encoding=encoding, newline=newline, write_through=True
685         )
686         if write_back == WriteBack.YES:
687             f.write(dst)
688         elif write_back == WriteBack.DIFF:
689             now = datetime.utcnow()
690             src_name = f"STDIN\t{then} +0000"
691             dst_name = f"STDOUT\t{now} +0000"
692             f.write(diff(src, dst, src_name, dst_name))
693         f.detach()
694
695
696 def format_file_contents(
697     src_contents: str, *, fast: bool, mode: FileMode
698 ) -> FileContent:
699     """Reformat contents a file and return new contents.
700
701     If `fast` is False, additionally confirm that the reformatted code is
702     valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
703     `mode` is passed to :func:`format_str`.
704     """
705     if src_contents.strip() == "":
706         raise NothingChanged
707
708     dst_contents = format_str(src_contents, mode=mode)
709     if src_contents == dst_contents:
710         raise NothingChanged
711
712     if not fast:
713         assert_equivalent(src_contents, dst_contents)
714         assert_stable(src_contents, dst_contents, mode=mode)
715     return dst_contents
716
717
718 def format_str(src_contents: str, *, mode: FileMode) -> FileContent:
719     """Reformat a string and return new contents.
720
721     `mode` determines formatting options, such as how many characters per line are
722     allowed.
723     """
724     src_node = lib2to3_parse(src_contents.lstrip(), mode.target_versions)
725     dst_contents = []
726     future_imports = get_future_imports(src_node)
727     if mode.target_versions:
728         versions = mode.target_versions
729     else:
730         versions = detect_target_versions(src_node)
731     normalize_fmt_off(src_node)
732     lines = LineGenerator(
733         remove_u_prefix="unicode_literals" in future_imports
734         or supports_feature(versions, Feature.UNICODE_LITERALS),
735         is_pyi=mode.is_pyi,
736         normalize_strings=mode.string_normalization,
737     )
738     elt = EmptyLineTracker(is_pyi=mode.is_pyi)
739     empty_line = Line()
740     after = 0
741     split_line_features = {
742         feature
743         for feature in {Feature.TRAILING_COMMA_IN_CALL, Feature.TRAILING_COMMA_IN_DEF}
744         if supports_feature(versions, feature)
745     }
746     for current_line in lines.visit(src_node):
747         for _ in range(after):
748             dst_contents.append(str(empty_line))
749         before, after = elt.maybe_empty_lines(current_line)
750         for _ in range(before):
751             dst_contents.append(str(empty_line))
752         for line in split_line(
753             current_line, line_length=mode.line_length, features=split_line_features
754         ):
755             dst_contents.append(str(line))
756     return "".join(dst_contents)
757
758
759 def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine]:
760     """Return a tuple of (decoded_contents, encoding, newline).
761
762     `newline` is either CRLF or LF but `decoded_contents` is decoded with
763     universal newlines (i.e. only contains LF).
764     """
765     srcbuf = io.BytesIO(src)
766     encoding, lines = tokenize.detect_encoding(srcbuf.readline)
767     if not lines:
768         return "", encoding, "\n"
769
770     newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
771     srcbuf.seek(0)
772     with io.TextIOWrapper(srcbuf, encoding) as tiow:
773         return tiow.read(), encoding, newline
774
775
776 def get_grammars(target_versions: Set[TargetVersion]) -> List[Grammar]:
777     if not target_versions:
778         # No target_version specified, so try all grammars.
779         return [
780             # Python 3.7+
781             pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords,
782             # Python 3.0-3.6
783             pygram.python_grammar_no_print_statement_no_exec_statement,
784             # Python 2.7 with future print_function import
785             pygram.python_grammar_no_print_statement,
786             # Python 2.7
787             pygram.python_grammar,
788         ]
789     elif all(version.is_python2() for version in target_versions):
790         # Python 2-only code, so try Python 2 grammars.
791         return [
792             # Python 2.7 with future print_function import
793             pygram.python_grammar_no_print_statement,
794             # Python 2.7
795             pygram.python_grammar,
796         ]
797     else:
798         # Python 3-compatible code, so only try Python 3 grammar.
799         grammars = []
800         # If we have to parse both, try to parse async as a keyword first
801         if not supports_feature(target_versions, Feature.ASYNC_IDENTIFIERS):
802             # Python 3.7+
803             grammars.append(
804                 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords  # noqa: B950
805             )
806         if not supports_feature(target_versions, Feature.ASYNC_KEYWORDS):
807             # Python 3.0-3.6
808             grammars.append(pygram.python_grammar_no_print_statement_no_exec_statement)
809         # At least one of the above branches must have been taken, because every Python
810         # version has exactly one of the two 'ASYNC_*' flags
811         return grammars
812
813
814 def lib2to3_parse(src_txt: str, target_versions: Iterable[TargetVersion] = ()) -> Node:
815     """Given a string with source, return the lib2to3 Node."""
816     if src_txt[-1:] != "\n":
817         src_txt += "\n"
818
819     for grammar in get_grammars(set(target_versions)):
820         drv = driver.Driver(grammar, pytree.convert)
821         try:
822             result = drv.parse_string(src_txt, True)
823             break
824
825         except ParseError as pe:
826             lineno, column = pe.context[1]
827             lines = src_txt.splitlines()
828             try:
829                 faulty_line = lines[lineno - 1]
830             except IndexError:
831                 faulty_line = "<line number missing in source>"
832             exc = InvalidInput(f"Cannot parse: {lineno}:{column}: {faulty_line}")
833     else:
834         raise exc from None
835
836     if isinstance(result, Leaf):
837         result = Node(syms.file_input, [result])
838     return result
839
840
841 def lib2to3_unparse(node: Node) -> str:
842     """Given a lib2to3 node, return its string representation."""
843     code = str(node)
844     return code
845
846
847 T = TypeVar("T")
848
849
850 class Visitor(Generic[T]):
851     """Basic lib2to3 visitor that yields things of type `T` on `visit()`."""
852
853     def visit(self, node: LN) -> Iterator[T]:
854         """Main method to visit `node` and its children.
855
856         It tries to find a `visit_*()` method for the given `node.type`, like
857         `visit_simple_stmt` for Node objects or `visit_INDENT` for Leaf objects.
858         If no dedicated `visit_*()` method is found, chooses `visit_default()`
859         instead.
860
861         Then yields objects of type `T` from the selected visitor.
862         """
863         if node.type < 256:
864             name = token.tok_name[node.type]
865         else:
866             name = type_repr(node.type)
867         yield from getattr(self, f"visit_{name}", self.visit_default)(node)
868
869     def visit_default(self, node: LN) -> Iterator[T]:
870         """Default `visit_*()` implementation. Recurses to children of `node`."""
871         if isinstance(node, Node):
872             for child in node.children:
873                 yield from self.visit(child)
874
875
876 @dataclass
877 class DebugVisitor(Visitor[T]):
878     tree_depth: int = 0
879
880     def visit_default(self, node: LN) -> Iterator[T]:
881         indent = " " * (2 * self.tree_depth)
882         if isinstance(node, Node):
883             _type = type_repr(node.type)
884             out(f"{indent}{_type}", fg="yellow")
885             self.tree_depth += 1
886             for child in node.children:
887                 yield from self.visit(child)
888
889             self.tree_depth -= 1
890             out(f"{indent}/{_type}", fg="yellow", bold=False)
891         else:
892             _type = token.tok_name.get(node.type, str(node.type))
893             out(f"{indent}{_type}", fg="blue", nl=False)
894             if node.prefix:
895                 # We don't have to handle prefixes for `Node` objects since
896                 # that delegates to the first child anyway.
897                 out(f" {node.prefix!r}", fg="green", bold=False, nl=False)
898             out(f" {node.value!r}", fg="blue", bold=False)
899
900     @classmethod
901     def show(cls, code: Union[str, Leaf, Node]) -> None:
902         """Pretty-print the lib2to3 AST of a given string of `code`.
903
904         Convenience method for debugging.
905         """
906         v: DebugVisitor[None] = DebugVisitor()
907         if isinstance(code, str):
908             code = lib2to3_parse(code)
909         list(v.visit(code))
910
911
912 WHITESPACE = {token.DEDENT, token.INDENT, token.NEWLINE}
913 STATEMENT = {
914     syms.if_stmt,
915     syms.while_stmt,
916     syms.for_stmt,
917     syms.try_stmt,
918     syms.except_clause,
919     syms.with_stmt,
920     syms.funcdef,
921     syms.classdef,
922 }
923 STANDALONE_COMMENT = 153
924 token.tok_name[STANDALONE_COMMENT] = "STANDALONE_COMMENT"
925 LOGIC_OPERATORS = {"and", "or"}
926 COMPARATORS = {
927     token.LESS,
928     token.GREATER,
929     token.EQEQUAL,
930     token.NOTEQUAL,
931     token.LESSEQUAL,
932     token.GREATEREQUAL,
933 }
934 MATH_OPERATORS = {
935     token.VBAR,
936     token.CIRCUMFLEX,
937     token.AMPER,
938     token.LEFTSHIFT,
939     token.RIGHTSHIFT,
940     token.PLUS,
941     token.MINUS,
942     token.STAR,
943     token.SLASH,
944     token.DOUBLESLASH,
945     token.PERCENT,
946     token.AT,
947     token.TILDE,
948     token.DOUBLESTAR,
949 }
950 STARS = {token.STAR, token.DOUBLESTAR}
951 VARARGS_SPECIALS = STARS | {token.SLASH}
952 VARARGS_PARENTS = {
953     syms.arglist,
954     syms.argument,  # double star in arglist
955     syms.trailer,  # single argument to call
956     syms.typedargslist,
957     syms.varargslist,  # lambdas
958 }
959 UNPACKING_PARENTS = {
960     syms.atom,  # single element of a list or set literal
961     syms.dictsetmaker,
962     syms.listmaker,
963     syms.testlist_gexp,
964     syms.testlist_star_expr,
965 }
966 TEST_DESCENDANTS = {
967     syms.test,
968     syms.lambdef,
969     syms.or_test,
970     syms.and_test,
971     syms.not_test,
972     syms.comparison,
973     syms.star_expr,
974     syms.expr,
975     syms.xor_expr,
976     syms.and_expr,
977     syms.shift_expr,
978     syms.arith_expr,
979     syms.trailer,
980     syms.term,
981     syms.power,
982 }
983 ASSIGNMENTS = {
984     "=",
985     "+=",
986     "-=",
987     "*=",
988     "@=",
989     "/=",
990     "%=",
991     "&=",
992     "|=",
993     "^=",
994     "<<=",
995     ">>=",
996     "**=",
997     "//=",
998 }
999 COMPREHENSION_PRIORITY = 20
1000 COMMA_PRIORITY = 18
1001 TERNARY_PRIORITY = 16
1002 LOGIC_PRIORITY = 14
1003 STRING_PRIORITY = 12
1004 COMPARATOR_PRIORITY = 10
1005 MATH_PRIORITIES = {
1006     token.VBAR: 9,
1007     token.CIRCUMFLEX: 8,
1008     token.AMPER: 7,
1009     token.LEFTSHIFT: 6,
1010     token.RIGHTSHIFT: 6,
1011     token.PLUS: 5,
1012     token.MINUS: 5,
1013     token.STAR: 4,
1014     token.SLASH: 4,
1015     token.DOUBLESLASH: 4,
1016     token.PERCENT: 4,
1017     token.AT: 4,
1018     token.TILDE: 3,
1019     token.DOUBLESTAR: 2,
1020 }
1021 DOT_PRIORITY = 1
1022
1023
1024 @dataclass
1025 class BracketTracker:
1026     """Keeps track of brackets on a line."""
1027
1028     depth: int = 0
1029     bracket_match: Dict[Tuple[Depth, NodeType], Leaf] = Factory(dict)
1030     delimiters: Dict[LeafID, Priority] = Factory(dict)
1031     previous: Optional[Leaf] = None
1032     _for_loop_depths: List[int] = Factory(list)
1033     _lambda_argument_depths: List[int] = Factory(list)
1034
1035     def mark(self, leaf: Leaf) -> None:
1036         """Mark `leaf` with bracket-related metadata. Keep track of delimiters.
1037
1038         All leaves receive an int `bracket_depth` field that stores how deep
1039         within brackets a given leaf is. 0 means there are no enclosing brackets
1040         that started on this line.
1041
1042         If a leaf is itself a closing bracket, it receives an `opening_bracket`
1043         field that it forms a pair with. This is a one-directional link to
1044         avoid reference cycles.
1045
1046         If a leaf is a delimiter (a token on which Black can split the line if
1047         needed) and it's on depth 0, its `id()` is stored in the tracker's
1048         `delimiters` field.
1049         """
1050         if leaf.type == token.COMMENT:
1051             return
1052
1053         self.maybe_decrement_after_for_loop_variable(leaf)
1054         self.maybe_decrement_after_lambda_arguments(leaf)
1055         if leaf.type in CLOSING_BRACKETS:
1056             self.depth -= 1
1057             opening_bracket = self.bracket_match.pop((self.depth, leaf.type))
1058             leaf.opening_bracket = opening_bracket
1059         leaf.bracket_depth = self.depth
1060         if self.depth == 0:
1061             delim = is_split_before_delimiter(leaf, self.previous)
1062             if delim and self.previous is not None:
1063                 self.delimiters[id(self.previous)] = delim
1064             else:
1065                 delim = is_split_after_delimiter(leaf, self.previous)
1066                 if delim:
1067                     self.delimiters[id(leaf)] = delim
1068         if leaf.type in OPENING_BRACKETS:
1069             self.bracket_match[self.depth, BRACKET[leaf.type]] = leaf
1070             self.depth += 1
1071         self.previous = leaf
1072         self.maybe_increment_lambda_arguments(leaf)
1073         self.maybe_increment_for_loop_variable(leaf)
1074
1075     def any_open_brackets(self) -> bool:
1076         """Return True if there is an yet unmatched open bracket on the line."""
1077         return bool(self.bracket_match)
1078
1079     def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> Priority:
1080         """Return the highest priority of a delimiter found on the line.
1081
1082         Values are consistent with what `is_split_*_delimiter()` return.
1083         Raises ValueError on no delimiters.
1084         """
1085         return max(v for k, v in self.delimiters.items() if k not in exclude)
1086
1087     def delimiter_count_with_priority(self, priority: Priority = 0) -> int:
1088         """Return the number of delimiters with the given `priority`.
1089
1090         If no `priority` is passed, defaults to max priority on the line.
1091         """
1092         if not self.delimiters:
1093             return 0
1094
1095         priority = priority or self.max_delimiter_priority()
1096         return sum(1 for p in self.delimiters.values() if p == priority)
1097
1098     def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
1099         """In a for loop, or comprehension, the variables are often unpacks.
1100
1101         To avoid splitting on the comma in this situation, increase the depth of
1102         tokens between `for` and `in`.
1103         """
1104         if leaf.type == token.NAME and leaf.value == "for":
1105             self.depth += 1
1106             self._for_loop_depths.append(self.depth)
1107             return True
1108
1109         return False
1110
1111     def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
1112         """See `maybe_increment_for_loop_variable` above for explanation."""
1113         if (
1114             self._for_loop_depths
1115             and self._for_loop_depths[-1] == self.depth
1116             and leaf.type == token.NAME
1117             and leaf.value == "in"
1118         ):
1119             self.depth -= 1
1120             self._for_loop_depths.pop()
1121             return True
1122
1123         return False
1124
1125     def maybe_increment_lambda_arguments(self, leaf: Leaf) -> bool:
1126         """In a lambda expression, there might be more than one argument.
1127
1128         To avoid splitting on the comma in this situation, increase the depth of
1129         tokens between `lambda` and `:`.
1130         """
1131         if leaf.type == token.NAME and leaf.value == "lambda":
1132             self.depth += 1
1133             self._lambda_argument_depths.append(self.depth)
1134             return True
1135
1136         return False
1137
1138     def maybe_decrement_after_lambda_arguments(self, leaf: Leaf) -> bool:
1139         """See `maybe_increment_lambda_arguments` above for explanation."""
1140         if (
1141             self._lambda_argument_depths
1142             and self._lambda_argument_depths[-1] == self.depth
1143             and leaf.type == token.COLON
1144         ):
1145             self.depth -= 1
1146             self._lambda_argument_depths.pop()
1147             return True
1148
1149         return False
1150
1151     def get_open_lsqb(self) -> Optional[Leaf]:
1152         """Return the most recent opening square bracket (if any)."""
1153         return self.bracket_match.get((self.depth - 1, token.RSQB))
1154
1155
1156 @dataclass
1157 class Line:
1158     """Holds leaves and comments. Can be printed with `str(line)`."""
1159
1160     depth: int = 0
1161     leaves: List[Leaf] = Factory(list)
1162     comments: Dict[LeafID, List[Leaf]] = Factory(dict)  # keys ordered like `leaves`
1163     bracket_tracker: BracketTracker = Factory(BracketTracker)
1164     inside_brackets: bool = False
1165     should_explode: bool = False
1166
1167     def append(self, leaf: Leaf, preformatted: bool = False) -> None:
1168         """Add a new `leaf` to the end of the line.
1169
1170         Unless `preformatted` is True, the `leaf` will receive a new consistent
1171         whitespace prefix and metadata applied by :class:`BracketTracker`.
1172         Trailing commas are maybe removed, unpacked for loop variables are
1173         demoted from being delimiters.
1174
1175         Inline comments are put aside.
1176         """
1177         has_value = leaf.type in BRACKETS or bool(leaf.value.strip())
1178         if not has_value:
1179             return
1180
1181         if token.COLON == leaf.type and self.is_class_paren_empty:
1182             del self.leaves[-2:]
1183         if self.leaves and not preformatted:
1184             # Note: at this point leaf.prefix should be empty except for
1185             # imports, for which we only preserve newlines.
1186             leaf.prefix += whitespace(
1187                 leaf, complex_subscript=self.is_complex_subscript(leaf)
1188             )
1189         if self.inside_brackets or not preformatted:
1190             self.bracket_tracker.mark(leaf)
1191             self.maybe_remove_trailing_comma(leaf)
1192         if not self.append_comment(leaf):
1193             self.leaves.append(leaf)
1194
1195     def append_safe(self, leaf: Leaf, preformatted: bool = False) -> None:
1196         """Like :func:`append()` but disallow invalid standalone comment structure.
1197
1198         Raises ValueError when any `leaf` is appended after a standalone comment
1199         or when a standalone comment is not the first leaf on the line.
1200         """
1201         if self.bracket_tracker.depth == 0:
1202             if self.is_comment:
1203                 raise ValueError("cannot append to standalone comments")
1204
1205             if self.leaves and leaf.type == STANDALONE_COMMENT:
1206                 raise ValueError(
1207                     "cannot append standalone comments to a populated line"
1208                 )
1209
1210         self.append(leaf, preformatted=preformatted)
1211
1212     @property
1213     def is_comment(self) -> bool:
1214         """Is this line a standalone comment?"""
1215         return len(self.leaves) == 1 and self.leaves[0].type == STANDALONE_COMMENT
1216
1217     @property
1218     def is_decorator(self) -> bool:
1219         """Is this line a decorator?"""
1220         return bool(self) and self.leaves[0].type == token.AT
1221
1222     @property
1223     def is_import(self) -> bool:
1224         """Is this an import line?"""
1225         return bool(self) and is_import(self.leaves[0])
1226
1227     @property
1228     def is_class(self) -> bool:
1229         """Is this line a class definition?"""
1230         return (
1231             bool(self)
1232             and self.leaves[0].type == token.NAME
1233             and self.leaves[0].value == "class"
1234         )
1235
1236     @property
1237     def is_stub_class(self) -> bool:
1238         """Is this line a class definition with a body consisting only of "..."?"""
1239         return self.is_class and self.leaves[-3:] == [
1240             Leaf(token.DOT, ".") for _ in range(3)
1241         ]
1242
1243     @property
1244     def is_def(self) -> bool:
1245         """Is this a function definition? (Also returns True for async defs.)"""
1246         try:
1247             first_leaf = self.leaves[0]
1248         except IndexError:
1249             return False
1250
1251         try:
1252             second_leaf: Optional[Leaf] = self.leaves[1]
1253         except IndexError:
1254             second_leaf = None
1255         return (first_leaf.type == token.NAME and first_leaf.value == "def") or (
1256             first_leaf.type == token.ASYNC
1257             and second_leaf is not None
1258             and second_leaf.type == token.NAME
1259             and second_leaf.value == "def"
1260         )
1261
1262     @property
1263     def is_class_paren_empty(self) -> bool:
1264         """Is this a class with no base classes but using parentheses?
1265
1266         Those are unnecessary and should be removed.
1267         """
1268         return (
1269             bool(self)
1270             and len(self.leaves) == 4
1271             and self.is_class
1272             and self.leaves[2].type == token.LPAR
1273             and self.leaves[2].value == "("
1274             and self.leaves[3].type == token.RPAR
1275             and self.leaves[3].value == ")"
1276         )
1277
1278     @property
1279     def is_triple_quoted_string(self) -> bool:
1280         """Is the line a triple quoted string?"""
1281         return (
1282             bool(self)
1283             and self.leaves[0].type == token.STRING
1284             and self.leaves[0].value.startswith(('"""', "'''"))
1285         )
1286
1287     def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
1288         """If so, needs to be split before emitting."""
1289         for leaf in self.leaves:
1290             if leaf.type == STANDALONE_COMMENT:
1291                 if leaf.bracket_depth <= depth_limit:
1292                     return True
1293         return False
1294
1295     def contains_uncollapsable_type_comments(self) -> bool:
1296         ignored_ids = set()
1297         try:
1298             last_leaf = self.leaves[-1]
1299             ignored_ids.add(id(last_leaf))
1300             if last_leaf.type == token.COMMA or (
1301                 last_leaf.type == token.RPAR and not last_leaf.value
1302             ):
1303                 # When trailing commas or optional parens are inserted by Black for
1304                 # consistency, comments after the previous last element are not moved
1305                 # (they don't have to, rendering will still be correct).  So we ignore
1306                 # trailing commas and invisible.
1307                 last_leaf = self.leaves[-2]
1308                 ignored_ids.add(id(last_leaf))
1309         except IndexError:
1310             return False
1311
1312         # A type comment is uncollapsable if it is attached to a leaf
1313         # that isn't at the end of the line (since that could cause it
1314         # to get associated to a different argument) or if there are
1315         # comments before it (since that could cause it to get hidden
1316         # behind a comment.
1317         comment_seen = False
1318         for leaf_id, comments in self.comments.items():
1319             for comment in comments:
1320                 if is_type_comment(comment):
1321                     if leaf_id not in ignored_ids or comment_seen:
1322                         return True
1323
1324             comment_seen = True
1325
1326         return False
1327
1328     def contains_multiline_strings(self) -> bool:
1329         for leaf in self.leaves:
1330             if is_multiline_string(leaf):
1331                 return True
1332
1333         return False
1334
1335     def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
1336         """Remove trailing comma if there is one and it's safe."""
1337         if not (
1338             self.leaves
1339             and self.leaves[-1].type == token.COMMA
1340             and closing.type in CLOSING_BRACKETS
1341         ):
1342             return False
1343
1344         if closing.type == token.RBRACE:
1345             self.remove_trailing_comma()
1346             return True
1347
1348         if closing.type == token.RSQB:
1349             comma = self.leaves[-1]
1350             if comma.parent and comma.parent.type == syms.listmaker:
1351                 self.remove_trailing_comma()
1352                 return True
1353
1354         # For parens let's check if it's safe to remove the comma.
1355         # Imports are always safe.
1356         if self.is_import:
1357             self.remove_trailing_comma()
1358             return True
1359
1360         # Otherwise, if the trailing one is the only one, we might mistakenly
1361         # change a tuple into a different type by removing the comma.
1362         depth = closing.bracket_depth + 1
1363         commas = 0
1364         opening = closing.opening_bracket
1365         for _opening_index, leaf in enumerate(self.leaves):
1366             if leaf is opening:
1367                 break
1368
1369         else:
1370             return False
1371
1372         for leaf in self.leaves[_opening_index + 1 :]:
1373             if leaf is closing:
1374                 break
1375
1376             bracket_depth = leaf.bracket_depth
1377             if bracket_depth == depth and leaf.type == token.COMMA:
1378                 commas += 1
1379                 if leaf.parent and leaf.parent.type in {
1380                     syms.arglist,
1381                     syms.typedargslist,
1382                 }:
1383                     commas += 1
1384                     break
1385
1386         if commas > 1:
1387             self.remove_trailing_comma()
1388             return True
1389
1390         return False
1391
1392     def append_comment(self, comment: Leaf) -> bool:
1393         """Add an inline or standalone comment to the line."""
1394         if (
1395             comment.type == STANDALONE_COMMENT
1396             and self.bracket_tracker.any_open_brackets()
1397         ):
1398             comment.prefix = ""
1399             return False
1400
1401         if comment.type != token.COMMENT:
1402             return False
1403
1404         if not self.leaves:
1405             comment.type = STANDALONE_COMMENT
1406             comment.prefix = ""
1407             return False
1408
1409         last_leaf = self.leaves[-1]
1410         if (
1411             last_leaf.type == token.RPAR
1412             and not last_leaf.value
1413             and last_leaf.parent
1414             and len(list(last_leaf.parent.leaves())) <= 3
1415             and not is_type_comment(comment)
1416         ):
1417             # Comments on an optional parens wrapping a single leaf should belong to
1418             # the wrapped node except if it's a type comment. Pinning the comment like
1419             # this avoids unstable formatting caused by comment migration.
1420             if len(self.leaves) < 2:
1421                 comment.type = STANDALONE_COMMENT
1422                 comment.prefix = ""
1423                 return False
1424             last_leaf = self.leaves[-2]
1425         self.comments.setdefault(id(last_leaf), []).append(comment)
1426         return True
1427
1428     def comments_after(self, leaf: Leaf) -> List[Leaf]:
1429         """Generate comments that should appear directly after `leaf`."""
1430         return self.comments.get(id(leaf), [])
1431
1432     def remove_trailing_comma(self) -> None:
1433         """Remove the trailing comma and moves the comments attached to it."""
1434         trailing_comma = self.leaves.pop()
1435         trailing_comma_comments = self.comments.pop(id(trailing_comma), [])
1436         self.comments.setdefault(id(self.leaves[-1]), []).extend(
1437             trailing_comma_comments
1438         )
1439
1440     def is_complex_subscript(self, leaf: Leaf) -> bool:
1441         """Return True iff `leaf` is part of a slice with non-trivial exprs."""
1442         open_lsqb = self.bracket_tracker.get_open_lsqb()
1443         if open_lsqb is None:
1444             return False
1445
1446         subscript_start = open_lsqb.next_sibling
1447
1448         if isinstance(subscript_start, Node):
1449             if subscript_start.type == syms.listmaker:
1450                 return False
1451
1452             if subscript_start.type == syms.subscriptlist:
1453                 subscript_start = child_towards(subscript_start, leaf)
1454         return subscript_start is not None and any(
1455             n.type in TEST_DESCENDANTS for n in subscript_start.pre_order()
1456         )
1457
1458     def __str__(self) -> str:
1459         """Render the line."""
1460         if not self:
1461             return "\n"
1462
1463         indent = "    " * self.depth
1464         leaves = iter(self.leaves)
1465         first = next(leaves)
1466         res = f"{first.prefix}{indent}{first.value}"
1467         for leaf in leaves:
1468             res += str(leaf)
1469         for comment in itertools.chain.from_iterable(self.comments.values()):
1470             res += str(comment)
1471         return res + "\n"
1472
1473     def __bool__(self) -> bool:
1474         """Return True if the line has leaves or comments."""
1475         return bool(self.leaves or self.comments)
1476
1477
1478 @dataclass
1479 class EmptyLineTracker:
1480     """Provides a stateful method that returns the number of potential extra
1481     empty lines needed before and after the currently processed line.
1482
1483     Note: this tracker works on lines that haven't been split yet.  It assumes
1484     the prefix of the first leaf consists of optional newlines.  Those newlines
1485     are consumed by `maybe_empty_lines()` and included in the computation.
1486     """
1487
1488     is_pyi: bool = False
1489     previous_line: Optional[Line] = None
1490     previous_after: int = 0
1491     previous_defs: List[int] = Factory(list)
1492
1493     def maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1494         """Return the number of extra empty lines before and after the `current_line`.
1495
1496         This is for separating `def`, `async def` and `class` with extra empty
1497         lines (two on module-level).
1498         """
1499         before, after = self._maybe_empty_lines(current_line)
1500         before = (
1501             # Black should not insert empty lines at the beginning
1502             # of the file
1503             0
1504             if self.previous_line is None
1505             else before - self.previous_after
1506         )
1507         self.previous_after = after
1508         self.previous_line = current_line
1509         return before, after
1510
1511     def _maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1512         max_allowed = 1
1513         if current_line.depth == 0:
1514             max_allowed = 1 if self.is_pyi else 2
1515         if current_line.leaves:
1516             # Consume the first leaf's extra newlines.
1517             first_leaf = current_line.leaves[0]
1518             before = first_leaf.prefix.count("\n")
1519             before = min(before, max_allowed)
1520             first_leaf.prefix = ""
1521         else:
1522             before = 0
1523         depth = current_line.depth
1524         while self.previous_defs and self.previous_defs[-1] >= depth:
1525             self.previous_defs.pop()
1526             if self.is_pyi:
1527                 before = 0 if depth else 1
1528             else:
1529                 before = 1 if depth else 2
1530         if current_line.is_decorator or current_line.is_def or current_line.is_class:
1531             return self._maybe_empty_lines_for_class_or_def(current_line, before)
1532
1533         if (
1534             self.previous_line
1535             and self.previous_line.is_import
1536             and not current_line.is_import
1537             and depth == self.previous_line.depth
1538         ):
1539             return (before or 1), 0
1540
1541         if (
1542             self.previous_line
1543             and self.previous_line.is_class
1544             and current_line.is_triple_quoted_string
1545         ):
1546             return before, 1
1547
1548         return before, 0
1549
1550     def _maybe_empty_lines_for_class_or_def(
1551         self, current_line: Line, before: int
1552     ) -> Tuple[int, int]:
1553         if not current_line.is_decorator:
1554             self.previous_defs.append(current_line.depth)
1555         if self.previous_line is None:
1556             # Don't insert empty lines before the first line in the file.
1557             return 0, 0
1558
1559         if self.previous_line.is_decorator:
1560             return 0, 0
1561
1562         if self.previous_line.depth < current_line.depth and (
1563             self.previous_line.is_class or self.previous_line.is_def
1564         ):
1565             return 0, 0
1566
1567         if (
1568             self.previous_line.is_comment
1569             and self.previous_line.depth == current_line.depth
1570             and before == 0
1571         ):
1572             return 0, 0
1573
1574         if self.is_pyi:
1575             if self.previous_line.depth > current_line.depth:
1576                 newlines = 1
1577             elif current_line.is_class or self.previous_line.is_class:
1578                 if current_line.is_stub_class and self.previous_line.is_stub_class:
1579                     # No blank line between classes with an empty body
1580                     newlines = 0
1581                 else:
1582                     newlines = 1
1583             elif current_line.is_def and not self.previous_line.is_def:
1584                 # Blank line between a block of functions and a block of non-functions
1585                 newlines = 1
1586             else:
1587                 newlines = 0
1588         else:
1589             newlines = 2
1590         if current_line.depth and newlines:
1591             newlines -= 1
1592         return newlines, 0
1593
1594
1595 @dataclass
1596 class LineGenerator(Visitor[Line]):
1597     """Generates reformatted Line objects.  Empty lines are not emitted.
1598
1599     Note: destroys the tree it's visiting by mutating prefixes of its leaves
1600     in ways that will no longer stringify to valid Python code on the tree.
1601     """
1602
1603     is_pyi: bool = False
1604     normalize_strings: bool = True
1605     current_line: Line = Factory(Line)
1606     remove_u_prefix: bool = False
1607
1608     def line(self, indent: int = 0) -> Iterator[Line]:
1609         """Generate a line.
1610
1611         If the line is empty, only emit if it makes sense.
1612         If the line is too long, split it first and then generate.
1613
1614         If any lines were generated, set up a new current_line.
1615         """
1616         if not self.current_line:
1617             self.current_line.depth += indent
1618             return  # Line is empty, don't emit. Creating a new one unnecessary.
1619
1620         complete_line = self.current_line
1621         self.current_line = Line(depth=complete_line.depth + indent)
1622         yield complete_line
1623
1624     def visit_default(self, node: LN) -> Iterator[Line]:
1625         """Default `visit_*()` implementation. Recurses to children of `node`."""
1626         if isinstance(node, Leaf):
1627             any_open_brackets = self.current_line.bracket_tracker.any_open_brackets()
1628             for comment in generate_comments(node):
1629                 if any_open_brackets:
1630                     # any comment within brackets is subject to splitting
1631                     self.current_line.append(comment)
1632                 elif comment.type == token.COMMENT:
1633                     # regular trailing comment
1634                     self.current_line.append(comment)
1635                     yield from self.line()
1636
1637                 else:
1638                     # regular standalone comment
1639                     yield from self.line()
1640
1641                     self.current_line.append(comment)
1642                     yield from self.line()
1643
1644             normalize_prefix(node, inside_brackets=any_open_brackets)
1645             if self.normalize_strings and node.type == token.STRING:
1646                 normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
1647                 normalize_string_quotes(node)
1648             if node.type == token.NUMBER:
1649                 normalize_numeric_literal(node)
1650             if node.type not in WHITESPACE:
1651                 self.current_line.append(node)
1652         yield from super().visit_default(node)
1653
1654     def visit_atom(self, node: Node) -> Iterator[Line]:
1655         # Always make parentheses invisible around a single node, because it should
1656         # not be needed (except in the case of yield, where removing the parentheses
1657         # produces a SyntaxError).
1658         if (
1659             len(node.children) == 3
1660             and isinstance(node.children[0], Leaf)
1661             and node.children[0].type == token.LPAR
1662             and isinstance(node.children[2], Leaf)
1663             and node.children[2].type == token.RPAR
1664             and isinstance(node.children[1], Leaf)
1665             and not (
1666                 node.children[1].type == token.NAME
1667                 and node.children[1].value == "yield"
1668             )
1669         ):
1670             node.children[0].value = ""
1671             node.children[2].value = ""
1672         yield from super().visit_default(node)
1673
1674     def visit_factor(self, node: Node) -> Iterator[Line]:
1675         """Force parentheses between a unary op and a binary power:
1676
1677         -2 ** 8 -> -(2 ** 8)
1678         """
1679         child = node.children[1]
1680         if child.type == syms.power and len(child.children) == 3:
1681             lpar = Leaf(token.LPAR, "(")
1682             rpar = Leaf(token.RPAR, ")")
1683             index = child.remove() or 0
1684             node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
1685         yield from self.visit_default(node)
1686
1687     def visit_INDENT(self, node: Node) -> Iterator[Line]:
1688         """Increase indentation level, maybe yield a line."""
1689         # In blib2to3 INDENT never holds comments.
1690         yield from self.line(+1)
1691         yield from self.visit_default(node)
1692
1693     def visit_DEDENT(self, node: Node) -> Iterator[Line]:
1694         """Decrease indentation level, maybe yield a line."""
1695         # The current line might still wait for trailing comments.  At DEDENT time
1696         # there won't be any (they would be prefixes on the preceding NEWLINE).
1697         # Emit the line then.
1698         yield from self.line()
1699
1700         # While DEDENT has no value, its prefix may contain standalone comments
1701         # that belong to the current indentation level.  Get 'em.
1702         yield from self.visit_default(node)
1703
1704         # Finally, emit the dedent.
1705         yield from self.line(-1)
1706
1707     def visit_stmt(
1708         self, node: Node, keywords: Set[str], parens: Set[str]
1709     ) -> Iterator[Line]:
1710         """Visit a statement.
1711
1712         This implementation is shared for `if`, `while`, `for`, `try`, `except`,
1713         `def`, `with`, `class`, `assert` and assignments.
1714
1715         The relevant Python language `keywords` for a given statement will be
1716         NAME leaves within it. This methods puts those on a separate line.
1717
1718         `parens` holds a set of string leaf values immediately after which
1719         invisible parens should be put.
1720         """
1721         normalize_invisible_parens(node, parens_after=parens)
1722         for child in node.children:
1723             if child.type == token.NAME and child.value in keywords:  # type: ignore
1724                 yield from self.line()
1725
1726             yield from self.visit(child)
1727
1728     def visit_suite(self, node: Node) -> Iterator[Line]:
1729         """Visit a suite."""
1730         if self.is_pyi and is_stub_suite(node):
1731             yield from self.visit(node.children[2])
1732         else:
1733             yield from self.visit_default(node)
1734
1735     def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
1736         """Visit a statement without nested statements."""
1737         is_suite_like = node.parent and node.parent.type in STATEMENT
1738         if is_suite_like:
1739             if self.is_pyi and is_stub_body(node):
1740                 yield from self.visit_default(node)
1741             else:
1742                 yield from self.line(+1)
1743                 yield from self.visit_default(node)
1744                 yield from self.line(-1)
1745
1746         else:
1747             if not self.is_pyi or not node.parent or not is_stub_suite(node.parent):
1748                 yield from self.line()
1749             yield from self.visit_default(node)
1750
1751     def visit_async_stmt(self, node: Node) -> Iterator[Line]:
1752         """Visit `async def`, `async for`, `async with`."""
1753         yield from self.line()
1754
1755         children = iter(node.children)
1756         for child in children:
1757             yield from self.visit(child)
1758
1759             if child.type == token.ASYNC:
1760                 break
1761
1762         internal_stmt = next(children)
1763         for child in internal_stmt.children:
1764             yield from self.visit(child)
1765
1766     def visit_decorators(self, node: Node) -> Iterator[Line]:
1767         """Visit decorators."""
1768         for child in node.children:
1769             yield from self.line()
1770             yield from self.visit(child)
1771
1772     def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
1773         """Remove a semicolon and put the other statement on a separate line."""
1774         yield from self.line()
1775
1776     def visit_ENDMARKER(self, leaf: Leaf) -> Iterator[Line]:
1777         """End of file. Process outstanding comments and end with a newline."""
1778         yield from self.visit_default(leaf)
1779         yield from self.line()
1780
1781     def visit_STANDALONE_COMMENT(self, leaf: Leaf) -> Iterator[Line]:
1782         if not self.current_line.bracket_tracker.any_open_brackets():
1783             yield from self.line()
1784         yield from self.visit_default(leaf)
1785
1786     def __attrs_post_init__(self) -> None:
1787         """You are in a twisty little maze of passages."""
1788         v = self.visit_stmt
1789         Ø: Set[str] = set()
1790         self.visit_assert_stmt = partial(v, keywords={"assert"}, parens={"assert", ","})
1791         self.visit_if_stmt = partial(
1792             v, keywords={"if", "else", "elif"}, parens={"if", "elif"}
1793         )
1794         self.visit_while_stmt = partial(v, keywords={"while", "else"}, parens={"while"})
1795         self.visit_for_stmt = partial(v, keywords={"for", "else"}, parens={"for", "in"})
1796         self.visit_try_stmt = partial(
1797             v, keywords={"try", "except", "else", "finally"}, parens=Ø
1798         )
1799         self.visit_except_clause = partial(v, keywords={"except"}, parens=Ø)
1800         self.visit_with_stmt = partial(v, keywords={"with"}, parens=Ø)
1801         self.visit_funcdef = partial(v, keywords={"def"}, parens=Ø)
1802         self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
1803         self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
1804         self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
1805         self.visit_import_from = partial(v, keywords=Ø, parens={"import"})
1806         self.visit_del_stmt = partial(v, keywords=Ø, parens={"del"})
1807         self.visit_async_funcdef = self.visit_async_stmt
1808         self.visit_decorated = self.visit_decorators
1809
1810
1811 IMPLICIT_TUPLE = {syms.testlist, syms.testlist_star_expr, syms.exprlist}
1812 BRACKET = {token.LPAR: token.RPAR, token.LSQB: token.RSQB, token.LBRACE: token.RBRACE}
1813 OPENING_BRACKETS = set(BRACKET.keys())
1814 CLOSING_BRACKETS = set(BRACKET.values())
1815 BRACKETS = OPENING_BRACKETS | CLOSING_BRACKETS
1816 ALWAYS_NO_SPACE = CLOSING_BRACKETS | {token.COMMA, STANDALONE_COMMENT}
1817
1818
1819 def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str:  # noqa: C901
1820     """Return whitespace prefix if needed for the given `leaf`.
1821
1822     `complex_subscript` signals whether the given leaf is part of a subscription
1823     which has non-trivial arguments, like arithmetic expressions or function calls.
1824     """
1825     NO = ""
1826     SPACE = " "
1827     DOUBLESPACE = "  "
1828     t = leaf.type
1829     p = leaf.parent
1830     v = leaf.value
1831     if t in ALWAYS_NO_SPACE:
1832         return NO
1833
1834     if t == token.COMMENT:
1835         return DOUBLESPACE
1836
1837     assert p is not None, f"INTERNAL ERROR: hand-made leaf without parent: {leaf!r}"
1838     if t == token.COLON and p.type not in {
1839         syms.subscript,
1840         syms.subscriptlist,
1841         syms.sliceop,
1842     }:
1843         return NO
1844
1845     prev = leaf.prev_sibling
1846     if not prev:
1847         prevp = preceding_leaf(p)
1848         if not prevp or prevp.type in OPENING_BRACKETS:
1849             return NO
1850
1851         if t == token.COLON:
1852             if prevp.type == token.COLON:
1853                 return NO
1854
1855             elif prevp.type != token.COMMA and not complex_subscript:
1856                 return NO
1857
1858             return SPACE
1859
1860         if prevp.type == token.EQUAL:
1861             if prevp.parent:
1862                 if prevp.parent.type in {
1863                     syms.arglist,
1864                     syms.argument,
1865                     syms.parameters,
1866                     syms.varargslist,
1867                 }:
1868                     return NO
1869
1870                 elif prevp.parent.type == syms.typedargslist:
1871                     # A bit hacky: if the equal sign has whitespace, it means we
1872                     # previously found it's a typed argument.  So, we're using
1873                     # that, too.
1874                     return prevp.prefix
1875
1876         elif prevp.type in VARARGS_SPECIALS:
1877             if is_vararg(prevp, within=VARARGS_PARENTS | UNPACKING_PARENTS):
1878                 return NO
1879
1880         elif prevp.type == token.COLON:
1881             if prevp.parent and prevp.parent.type in {syms.subscript, syms.sliceop}:
1882                 return SPACE if complex_subscript else NO
1883
1884         elif (
1885             prevp.parent
1886             and prevp.parent.type == syms.factor
1887             and prevp.type in MATH_OPERATORS
1888         ):
1889             return NO
1890
1891         elif (
1892             prevp.type == token.RIGHTSHIFT
1893             and prevp.parent
1894             and prevp.parent.type == syms.shift_expr
1895             and prevp.prev_sibling
1896             and prevp.prev_sibling.type == token.NAME
1897             and prevp.prev_sibling.value == "print"  # type: ignore
1898         ):
1899             # Python 2 print chevron
1900             return NO
1901
1902     elif prev.type in OPENING_BRACKETS:
1903         return NO
1904
1905     if p.type in {syms.parameters, syms.arglist}:
1906         # untyped function signatures or calls
1907         if not prev or prev.type != token.COMMA:
1908             return NO
1909
1910     elif p.type == syms.varargslist:
1911         # lambdas
1912         if prev and prev.type != token.COMMA:
1913             return NO
1914
1915     elif p.type == syms.typedargslist:
1916         # typed function signatures
1917         if not prev:
1918             return NO
1919
1920         if t == token.EQUAL:
1921             if prev.type != syms.tname:
1922                 return NO
1923
1924         elif prev.type == token.EQUAL:
1925             # A bit hacky: if the equal sign has whitespace, it means we
1926             # previously found it's a typed argument.  So, we're using that, too.
1927             return prev.prefix
1928
1929         elif prev.type != token.COMMA:
1930             return NO
1931
1932     elif p.type == syms.tname:
1933         # type names
1934         if not prev:
1935             prevp = preceding_leaf(p)
1936             if not prevp or prevp.type != token.COMMA:
1937                 return NO
1938
1939     elif p.type == syms.trailer:
1940         # attributes and calls
1941         if t == token.LPAR or t == token.RPAR:
1942             return NO
1943
1944         if not prev:
1945             if t == token.DOT:
1946                 prevp = preceding_leaf(p)
1947                 if not prevp or prevp.type != token.NUMBER:
1948                     return NO
1949
1950             elif t == token.LSQB:
1951                 return NO
1952
1953         elif prev.type != token.COMMA:
1954             return NO
1955
1956     elif p.type == syms.argument:
1957         # single argument
1958         if t == token.EQUAL:
1959             return NO
1960
1961         if not prev:
1962             prevp = preceding_leaf(p)
1963             if not prevp or prevp.type == token.LPAR:
1964                 return NO
1965
1966         elif prev.type in {token.EQUAL} | VARARGS_SPECIALS:
1967             return NO
1968
1969     elif p.type == syms.decorator:
1970         # decorators
1971         return NO
1972
1973     elif p.type == syms.dotted_name:
1974         if prev:
1975             return NO
1976
1977         prevp = preceding_leaf(p)
1978         if not prevp or prevp.type == token.AT or prevp.type == token.DOT:
1979             return NO
1980
1981     elif p.type == syms.classdef:
1982         if t == token.LPAR:
1983             return NO
1984
1985         if prev and prev.type == token.LPAR:
1986             return NO
1987
1988     elif p.type in {syms.subscript, syms.sliceop}:
1989         # indexing
1990         if not prev:
1991             assert p.parent is not None, "subscripts are always parented"
1992             if p.parent.type == syms.subscriptlist:
1993                 return SPACE
1994
1995             return NO
1996
1997         elif not complex_subscript:
1998             return NO
1999
2000     elif p.type == syms.atom:
2001         if prev and t == token.DOT:
2002             # dots, but not the first one.
2003             return NO
2004
2005     elif p.type == syms.dictsetmaker:
2006         # dict unpacking
2007         if prev and prev.type == token.DOUBLESTAR:
2008             return NO
2009
2010     elif p.type in {syms.factor, syms.star_expr}:
2011         # unary ops
2012         if not prev:
2013             prevp = preceding_leaf(p)
2014             if not prevp or prevp.type in OPENING_BRACKETS:
2015                 return NO
2016
2017             prevp_parent = prevp.parent
2018             assert prevp_parent is not None
2019             if prevp.type == token.COLON and prevp_parent.type in {
2020                 syms.subscript,
2021                 syms.sliceop,
2022             }:
2023                 return NO
2024
2025             elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
2026                 return NO
2027
2028         elif t in {token.NAME, token.NUMBER, token.STRING}:
2029             return NO
2030
2031     elif p.type == syms.import_from:
2032         if t == token.DOT:
2033             if prev and prev.type == token.DOT:
2034                 return NO
2035
2036         elif t == token.NAME:
2037             if v == "import":
2038                 return SPACE
2039
2040             if prev and prev.type == token.DOT:
2041                 return NO
2042
2043     elif p.type == syms.sliceop:
2044         return NO
2045
2046     return SPACE
2047
2048
2049 def preceding_leaf(node: Optional[LN]) -> Optional[Leaf]:
2050     """Return the first leaf that precedes `node`, if any."""
2051     while node:
2052         res = node.prev_sibling
2053         if res:
2054             if isinstance(res, Leaf):
2055                 return res
2056
2057             try:
2058                 return list(res.leaves())[-1]
2059
2060             except IndexError:
2061                 return None
2062
2063         node = node.parent
2064     return None
2065
2066
2067 def child_towards(ancestor: Node, descendant: LN) -> Optional[LN]:
2068     """Return the child of `ancestor` that contains `descendant`."""
2069     node: Optional[LN] = descendant
2070     while node and node.parent != ancestor:
2071         node = node.parent
2072     return node
2073
2074
2075 def container_of(leaf: Leaf) -> LN:
2076     """Return `leaf` or one of its ancestors that is the topmost container of it.
2077
2078     By "container" we mean a node where `leaf` is the very first child.
2079     """
2080     same_prefix = leaf.prefix
2081     container: LN = leaf
2082     while container:
2083         parent = container.parent
2084         if parent is None:
2085             break
2086
2087         if parent.children[0].prefix != same_prefix:
2088             break
2089
2090         if parent.type == syms.file_input:
2091             break
2092
2093         if parent.prev_sibling is not None and parent.prev_sibling.type in BRACKETS:
2094             break
2095
2096         container = parent
2097     return container
2098
2099
2100 def is_split_after_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2101     """Return the priority of the `leaf` delimiter, given a line break after it.
2102
2103     The delimiter priorities returned here are from those delimiters that would
2104     cause a line break after themselves.
2105
2106     Higher numbers are higher priority.
2107     """
2108     if leaf.type == token.COMMA:
2109         return COMMA_PRIORITY
2110
2111     return 0
2112
2113
2114 def is_split_before_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2115     """Return the priority of the `leaf` delimiter, given a line break before it.
2116
2117     The delimiter priorities returned here are from those delimiters that would
2118     cause a line break before themselves.
2119
2120     Higher numbers are higher priority.
2121     """
2122     if is_vararg(leaf, within=VARARGS_PARENTS | UNPACKING_PARENTS):
2123         # * and ** might also be MATH_OPERATORS but in this case they are not.
2124         # Don't treat them as a delimiter.
2125         return 0
2126
2127     if (
2128         leaf.type == token.DOT
2129         and leaf.parent
2130         and leaf.parent.type not in {syms.import_from, syms.dotted_name}
2131         and (previous is None or previous.type in CLOSING_BRACKETS)
2132     ):
2133         return DOT_PRIORITY
2134
2135     if (
2136         leaf.type in MATH_OPERATORS
2137         and leaf.parent
2138         and leaf.parent.type not in {syms.factor, syms.star_expr}
2139     ):
2140         return MATH_PRIORITIES[leaf.type]
2141
2142     if leaf.type in COMPARATORS:
2143         return COMPARATOR_PRIORITY
2144
2145     if (
2146         leaf.type == token.STRING
2147         and previous is not None
2148         and previous.type == token.STRING
2149     ):
2150         return STRING_PRIORITY
2151
2152     if leaf.type not in {token.NAME, token.ASYNC}:
2153         return 0
2154
2155     if (
2156         leaf.value == "for"
2157         and leaf.parent
2158         and leaf.parent.type in {syms.comp_for, syms.old_comp_for}
2159         or leaf.type == token.ASYNC
2160     ):
2161         if (
2162             not isinstance(leaf.prev_sibling, Leaf)
2163             or leaf.prev_sibling.value != "async"
2164         ):
2165             return COMPREHENSION_PRIORITY
2166
2167     if (
2168         leaf.value == "if"
2169         and leaf.parent
2170         and leaf.parent.type in {syms.comp_if, syms.old_comp_if}
2171     ):
2172         return COMPREHENSION_PRIORITY
2173
2174     if leaf.value in {"if", "else"} and leaf.parent and leaf.parent.type == syms.test:
2175         return TERNARY_PRIORITY
2176
2177     if leaf.value == "is":
2178         return COMPARATOR_PRIORITY
2179
2180     if (
2181         leaf.value == "in"
2182         and leaf.parent
2183         and leaf.parent.type in {syms.comp_op, syms.comparison}
2184         and not (
2185             previous is not None
2186             and previous.type == token.NAME
2187             and previous.value == "not"
2188         )
2189     ):
2190         return COMPARATOR_PRIORITY
2191
2192     if (
2193         leaf.value == "not"
2194         and leaf.parent
2195         and leaf.parent.type == syms.comp_op
2196         and not (
2197             previous is not None
2198             and previous.type == token.NAME
2199             and previous.value == "is"
2200         )
2201     ):
2202         return COMPARATOR_PRIORITY
2203
2204     if leaf.value in LOGIC_OPERATORS and leaf.parent:
2205         return LOGIC_PRIORITY
2206
2207     return 0
2208
2209
2210 FMT_OFF = {"# fmt: off", "# fmt:off", "# yapf: disable"}
2211 FMT_ON = {"# fmt: on", "# fmt:on", "# yapf: enable"}
2212
2213
2214 def generate_comments(leaf: LN) -> Iterator[Leaf]:
2215     """Clean the prefix of the `leaf` and generate comments from it, if any.
2216
2217     Comments in lib2to3 are shoved into the whitespace prefix.  This happens
2218     in `pgen2/driver.py:Driver.parse_tokens()`.  This was a brilliant implementation
2219     move because it does away with modifying the grammar to include all the
2220     possible places in which comments can be placed.
2221
2222     The sad consequence for us though is that comments don't "belong" anywhere.
2223     This is why this function generates simple parentless Leaf objects for
2224     comments.  We simply don't know what the correct parent should be.
2225
2226     No matter though, we can live without this.  We really only need to
2227     differentiate between inline and standalone comments.  The latter don't
2228     share the line with any code.
2229
2230     Inline comments are emitted as regular token.COMMENT leaves.  Standalone
2231     are emitted with a fake STANDALONE_COMMENT token identifier.
2232     """
2233     for pc in list_comments(leaf.prefix, is_endmarker=leaf.type == token.ENDMARKER):
2234         yield Leaf(pc.type, pc.value, prefix="\n" * pc.newlines)
2235
2236
2237 @dataclass
2238 class ProtoComment:
2239     """Describes a piece of syntax that is a comment.
2240
2241     It's not a :class:`blib2to3.pytree.Leaf` so that:
2242
2243     * it can be cached (`Leaf` objects should not be reused more than once as
2244       they store their lineno, column, prefix, and parent information);
2245     * `newlines` and `consumed` fields are kept separate from the `value`. This
2246       simplifies handling of special marker comments like ``# fmt: off/on``.
2247     """
2248
2249     type: int  # token.COMMENT or STANDALONE_COMMENT
2250     value: str  # content of the comment
2251     newlines: int  # how many newlines before the comment
2252     consumed: int  # how many characters of the original leaf's prefix did we consume
2253
2254
2255 @lru_cache(maxsize=4096)
2256 def list_comments(prefix: str, *, is_endmarker: bool) -> List[ProtoComment]:
2257     """Return a list of :class:`ProtoComment` objects parsed from the given `prefix`."""
2258     result: List[ProtoComment] = []
2259     if not prefix or "#" not in prefix:
2260         return result
2261
2262     consumed = 0
2263     nlines = 0
2264     ignored_lines = 0
2265     for index, line in enumerate(prefix.split("\n")):
2266         consumed += len(line) + 1  # adding the length of the split '\n'
2267         line = line.lstrip()
2268         if not line:
2269             nlines += 1
2270         if not line.startswith("#"):
2271             # Escaped newlines outside of a comment are not really newlines at
2272             # all. We treat a single-line comment following an escaped newline
2273             # as a simple trailing comment.
2274             if line.endswith("\\"):
2275                 ignored_lines += 1
2276             continue
2277
2278         if index == ignored_lines and not is_endmarker:
2279             comment_type = token.COMMENT  # simple trailing comment
2280         else:
2281             comment_type = STANDALONE_COMMENT
2282         comment = make_comment(line)
2283         result.append(
2284             ProtoComment(
2285                 type=comment_type, value=comment, newlines=nlines, consumed=consumed
2286             )
2287         )
2288         nlines = 0
2289     return result
2290
2291
2292 def make_comment(content: str) -> str:
2293     """Return a consistently formatted comment from the given `content` string.
2294
2295     All comments (except for "##", "#!", "#:", '#'", "#%%") should have a single
2296     space between the hash sign and the content.
2297
2298     If `content` didn't start with a hash sign, one is provided.
2299     """
2300     content = content.rstrip()
2301     if not content:
2302         return "#"
2303
2304     if content[0] == "#":
2305         content = content[1:]
2306     if content and content[0] not in " !:#'%":
2307         content = " " + content
2308     return "#" + content
2309
2310
2311 def split_line(
2312     line: Line,
2313     line_length: int,
2314     inner: bool = False,
2315     features: Collection[Feature] = (),
2316 ) -> Iterator[Line]:
2317     """Split a `line` into potentially many lines.
2318
2319     They should fit in the allotted `line_length` but might not be able to.
2320     `inner` signifies that there were a pair of brackets somewhere around the
2321     current `line`, possibly transitively. This means we can fallback to splitting
2322     by delimiters if the LHS/RHS don't yield any results.
2323
2324     `features` are syntactical features that may be used in the output.
2325     """
2326     if line.is_comment:
2327         yield line
2328         return
2329
2330     line_str = str(line).strip("\n")
2331
2332     if (
2333         not line.contains_uncollapsable_type_comments()
2334         and not line.should_explode
2335         and is_line_short_enough(line, line_length=line_length, line_str=line_str)
2336     ):
2337         yield line
2338         return
2339
2340     split_funcs: List[SplitFunc]
2341     if line.is_def:
2342         split_funcs = [left_hand_split]
2343     else:
2344
2345         def rhs(line: Line, features: Collection[Feature]) -> Iterator[Line]:
2346             for omit in generate_trailers_to_omit(line, line_length):
2347                 lines = list(right_hand_split(line, line_length, features, omit=omit))
2348                 if is_line_short_enough(lines[0], line_length=line_length):
2349                     yield from lines
2350                     return
2351
2352             # All splits failed, best effort split with no omits.
2353             # This mostly happens to multiline strings that are by definition
2354             # reported as not fitting a single line.
2355             yield from right_hand_split(line, line_length, features=features)
2356
2357         if line.inside_brackets:
2358             split_funcs = [delimiter_split, standalone_comment_split, rhs]
2359         else:
2360             split_funcs = [rhs]
2361     for split_func in split_funcs:
2362         # We are accumulating lines in `result` because we might want to abort
2363         # mission and return the original line in the end, or attempt a different
2364         # split altogether.
2365         result: List[Line] = []
2366         try:
2367             for l in split_func(line, features):
2368                 if str(l).strip("\n") == line_str:
2369                     raise CannotSplit("Split function returned an unchanged result")
2370
2371                 result.extend(
2372                     split_line(
2373                         l, line_length=line_length, inner=True, features=features
2374                     )
2375                 )
2376         except CannotSplit:
2377             continue
2378
2379         else:
2380             yield from result
2381             break
2382
2383     else:
2384         yield line
2385
2386
2387 def left_hand_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2388     """Split line into many lines, starting with the first matching bracket pair.
2389
2390     Note: this usually looks weird, only use this for function definitions.
2391     Prefer RHS otherwise.  This is why this function is not symmetrical with
2392     :func:`right_hand_split` which also handles optional parentheses.
2393     """
2394     tail_leaves: List[Leaf] = []
2395     body_leaves: List[Leaf] = []
2396     head_leaves: List[Leaf] = []
2397     current_leaves = head_leaves
2398     matching_bracket = None
2399     for leaf in line.leaves:
2400         if (
2401             current_leaves is body_leaves
2402             and leaf.type in CLOSING_BRACKETS
2403             and leaf.opening_bracket is matching_bracket
2404         ):
2405             current_leaves = tail_leaves if body_leaves else head_leaves
2406         current_leaves.append(leaf)
2407         if current_leaves is head_leaves:
2408             if leaf.type in OPENING_BRACKETS:
2409                 matching_bracket = leaf
2410                 current_leaves = body_leaves
2411     if not matching_bracket:
2412         raise CannotSplit("No brackets found")
2413
2414     head = bracket_split_build_line(head_leaves, line, matching_bracket)
2415     body = bracket_split_build_line(body_leaves, line, matching_bracket, is_body=True)
2416     tail = bracket_split_build_line(tail_leaves, line, matching_bracket)
2417     bracket_split_succeeded_or_raise(head, body, tail)
2418     for result in (head, body, tail):
2419         if result:
2420             yield result
2421
2422
2423 def right_hand_split(
2424     line: Line,
2425     line_length: int,
2426     features: Collection[Feature] = (),
2427     omit: Collection[LeafID] = (),
2428 ) -> Iterator[Line]:
2429     """Split line into many lines, starting with the last matching bracket pair.
2430
2431     If the split was by optional parentheses, attempt splitting without them, too.
2432     `omit` is a collection of closing bracket IDs that shouldn't be considered for
2433     this split.
2434
2435     Note: running this function modifies `bracket_depth` on the leaves of `line`.
2436     """
2437     tail_leaves: List[Leaf] = []
2438     body_leaves: List[Leaf] = []
2439     head_leaves: List[Leaf] = []
2440     current_leaves = tail_leaves
2441     opening_bracket = None
2442     closing_bracket = None
2443     for leaf in reversed(line.leaves):
2444         if current_leaves is body_leaves:
2445             if leaf is opening_bracket:
2446                 current_leaves = head_leaves if body_leaves else tail_leaves
2447         current_leaves.append(leaf)
2448         if current_leaves is tail_leaves:
2449             if leaf.type in CLOSING_BRACKETS and id(leaf) not in omit:
2450                 opening_bracket = leaf.opening_bracket
2451                 closing_bracket = leaf
2452                 current_leaves = body_leaves
2453     if not (opening_bracket and closing_bracket and head_leaves):
2454         # If there is no opening or closing_bracket that means the split failed and
2455         # all content is in the tail.  Otherwise, if `head_leaves` are empty, it means
2456         # the matching `opening_bracket` wasn't available on `line` anymore.
2457         raise CannotSplit("No brackets found")
2458
2459     tail_leaves.reverse()
2460     body_leaves.reverse()
2461     head_leaves.reverse()
2462     head = bracket_split_build_line(head_leaves, line, opening_bracket)
2463     body = bracket_split_build_line(body_leaves, line, opening_bracket, is_body=True)
2464     tail = bracket_split_build_line(tail_leaves, line, opening_bracket)
2465     bracket_split_succeeded_or_raise(head, body, tail)
2466     if (
2467         # the body shouldn't be exploded
2468         not body.should_explode
2469         # the opening bracket is an optional paren
2470         and opening_bracket.type == token.LPAR
2471         and not opening_bracket.value
2472         # the closing bracket is an optional paren
2473         and closing_bracket.type == token.RPAR
2474         and not closing_bracket.value
2475         # it's not an import (optional parens are the only thing we can split on
2476         # in this case; attempting a split without them is a waste of time)
2477         and not line.is_import
2478         # there are no standalone comments in the body
2479         and not body.contains_standalone_comments(0)
2480         # and we can actually remove the parens
2481         and can_omit_invisible_parens(body, line_length)
2482     ):
2483         omit = {id(closing_bracket), *omit}
2484         try:
2485             yield from right_hand_split(line, line_length, features=features, omit=omit)
2486             return
2487
2488         except CannotSplit:
2489             if not (
2490                 can_be_split(body)
2491                 or is_line_short_enough(body, line_length=line_length)
2492             ):
2493                 raise CannotSplit(
2494                     "Splitting failed, body is still too long and can't be split."
2495                 )
2496
2497             elif head.contains_multiline_strings() or tail.contains_multiline_strings():
2498                 raise CannotSplit(
2499                     "The current optional pair of parentheses is bound to fail to "
2500                     "satisfy the splitting algorithm because the head or the tail "
2501                     "contains multiline strings which by definition never fit one "
2502                     "line."
2503                 )
2504
2505     ensure_visible(opening_bracket)
2506     ensure_visible(closing_bracket)
2507     for result in (head, body, tail):
2508         if result:
2509             yield result
2510
2511
2512 def bracket_split_succeeded_or_raise(head: Line, body: Line, tail: Line) -> None:
2513     """Raise :exc:`CannotSplit` if the last left- or right-hand split failed.
2514
2515     Do nothing otherwise.
2516
2517     A left- or right-hand split is based on a pair of brackets. Content before
2518     (and including) the opening bracket is left on one line, content inside the
2519     brackets is put on a separate line, and finally content starting with and
2520     following the closing bracket is put on a separate line.
2521
2522     Those are called `head`, `body`, and `tail`, respectively. If the split
2523     produced the same line (all content in `head`) or ended up with an empty `body`
2524     and the `tail` is just the closing bracket, then it's considered failed.
2525     """
2526     tail_len = len(str(tail).strip())
2527     if not body:
2528         if tail_len == 0:
2529             raise CannotSplit("Splitting brackets produced the same line")
2530
2531         elif tail_len < 3:
2532             raise CannotSplit(
2533                 f"Splitting brackets on an empty body to save "
2534                 f"{tail_len} characters is not worth it"
2535             )
2536
2537
2538 def bracket_split_build_line(
2539     leaves: List[Leaf], original: Line, opening_bracket: Leaf, *, is_body: bool = False
2540 ) -> Line:
2541     """Return a new line with given `leaves` and respective comments from `original`.
2542
2543     If `is_body` is True, the result line is one-indented inside brackets and as such
2544     has its first leaf's prefix normalized and a trailing comma added when expected.
2545     """
2546     result = Line(depth=original.depth)
2547     if is_body:
2548         result.inside_brackets = True
2549         result.depth += 1
2550         if leaves:
2551             # Since body is a new indent level, remove spurious leading whitespace.
2552             normalize_prefix(leaves[0], inside_brackets=True)
2553             # Ensure a trailing comma for imports and standalone function arguments, but
2554             # be careful not to add one after any comments.
2555             no_commas = original.is_def and not any(
2556                 l.type == token.COMMA for l in leaves
2557             )
2558
2559             if original.is_import or no_commas:
2560                 for i in range(len(leaves) - 1, -1, -1):
2561                     if leaves[i].type == STANDALONE_COMMENT:
2562                         continue
2563                     elif leaves[i].type == token.COMMA:
2564                         break
2565                     else:
2566                         leaves.insert(i + 1, Leaf(token.COMMA, ","))
2567                         break
2568     # Populate the line
2569     for leaf in leaves:
2570         result.append(leaf, preformatted=True)
2571         for comment_after in original.comments_after(leaf):
2572             result.append(comment_after, preformatted=True)
2573     if is_body:
2574         result.should_explode = should_explode(result, opening_bracket)
2575     return result
2576
2577
2578 def dont_increase_indentation(split_func: SplitFunc) -> SplitFunc:
2579     """Normalize prefix of the first leaf in every line returned by `split_func`.
2580
2581     This is a decorator over relevant split functions.
2582     """
2583
2584     @wraps(split_func)
2585     def split_wrapper(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2586         for l in split_func(line, features):
2587             normalize_prefix(l.leaves[0], inside_brackets=True)
2588             yield l
2589
2590     return split_wrapper
2591
2592
2593 @dont_increase_indentation
2594 def delimiter_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2595     """Split according to delimiters of the highest priority.
2596
2597     If the appropriate Features are given, the split will add trailing commas
2598     also in function signatures and calls that contain `*` and `**`.
2599     """
2600     try:
2601         last_leaf = line.leaves[-1]
2602     except IndexError:
2603         raise CannotSplit("Line empty")
2604
2605     bt = line.bracket_tracker
2606     try:
2607         delimiter_priority = bt.max_delimiter_priority(exclude={id(last_leaf)})
2608     except ValueError:
2609         raise CannotSplit("No delimiters found")
2610
2611     if delimiter_priority == DOT_PRIORITY:
2612         if bt.delimiter_count_with_priority(delimiter_priority) == 1:
2613             raise CannotSplit("Splitting a single attribute from its owner looks wrong")
2614
2615     current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2616     lowest_depth = sys.maxsize
2617     trailing_comma_safe = True
2618
2619     def append_to_line(leaf: Leaf) -> Iterator[Line]:
2620         """Append `leaf` to current line or to new line if appending impossible."""
2621         nonlocal current_line
2622         try:
2623             current_line.append_safe(leaf, preformatted=True)
2624         except ValueError:
2625             yield current_line
2626
2627             current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2628             current_line.append(leaf)
2629
2630     for leaf in line.leaves:
2631         yield from append_to_line(leaf)
2632
2633         for comment_after in line.comments_after(leaf):
2634             yield from append_to_line(comment_after)
2635
2636         lowest_depth = min(lowest_depth, leaf.bracket_depth)
2637         if leaf.bracket_depth == lowest_depth:
2638             if is_vararg(leaf, within={syms.typedargslist}):
2639                 trailing_comma_safe = (
2640                     trailing_comma_safe and Feature.TRAILING_COMMA_IN_DEF in features
2641                 )
2642             elif is_vararg(leaf, within={syms.arglist, syms.argument}):
2643                 trailing_comma_safe = (
2644                     trailing_comma_safe and Feature.TRAILING_COMMA_IN_CALL in features
2645                 )
2646
2647         leaf_priority = bt.delimiters.get(id(leaf))
2648         if leaf_priority == delimiter_priority:
2649             yield current_line
2650
2651             current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2652     if current_line:
2653         if (
2654             trailing_comma_safe
2655             and delimiter_priority == COMMA_PRIORITY
2656             and current_line.leaves[-1].type != token.COMMA
2657             and current_line.leaves[-1].type != STANDALONE_COMMENT
2658         ):
2659             current_line.append(Leaf(token.COMMA, ","))
2660         yield current_line
2661
2662
2663 @dont_increase_indentation
2664 def standalone_comment_split(
2665     line: Line, features: Collection[Feature] = ()
2666 ) -> Iterator[Line]:
2667     """Split standalone comments from the rest of the line."""
2668     if not line.contains_standalone_comments(0):
2669         raise CannotSplit("Line does not have any standalone comments")
2670
2671     current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2672
2673     def append_to_line(leaf: Leaf) -> Iterator[Line]:
2674         """Append `leaf` to current line or to new line if appending impossible."""
2675         nonlocal current_line
2676         try:
2677             current_line.append_safe(leaf, preformatted=True)
2678         except ValueError:
2679             yield current_line
2680
2681             current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2682             current_line.append(leaf)
2683
2684     for leaf in line.leaves:
2685         yield from append_to_line(leaf)
2686
2687         for comment_after in line.comments_after(leaf):
2688             yield from append_to_line(comment_after)
2689
2690     if current_line:
2691         yield current_line
2692
2693
2694 def is_import(leaf: Leaf) -> bool:
2695     """Return True if the given leaf starts an import statement."""
2696     p = leaf.parent
2697     t = leaf.type
2698     v = leaf.value
2699     return bool(
2700         t == token.NAME
2701         and (
2702             (v == "import" and p and p.type == syms.import_name)
2703             or (v == "from" and p and p.type == syms.import_from)
2704         )
2705     )
2706
2707
2708 def is_type_comment(leaf: Leaf) -> bool:
2709     """Return True if the given leaf is a special comment.
2710     Only returns true for type comments for now."""
2711     t = leaf.type
2712     v = leaf.value
2713     return t in {token.COMMENT, t == STANDALONE_COMMENT} and v.startswith("# type:")
2714
2715
2716 def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
2717     """Leave existing extra newlines if not `inside_brackets`. Remove everything
2718     else.
2719
2720     Note: don't use backslashes for formatting or you'll lose your voting rights.
2721     """
2722     if not inside_brackets:
2723         spl = leaf.prefix.split("#")
2724         if "\\" not in spl[0]:
2725             nl_count = spl[-1].count("\n")
2726             if len(spl) > 1:
2727                 nl_count -= 1
2728             leaf.prefix = "\n" * nl_count
2729             return
2730
2731     leaf.prefix = ""
2732
2733
2734 def normalize_string_prefix(leaf: Leaf, remove_u_prefix: bool = False) -> None:
2735     """Make all string prefixes lowercase.
2736
2737     If remove_u_prefix is given, also removes any u prefix from the string.
2738
2739     Note: Mutates its argument.
2740     """
2741     match = re.match(r"^([furbFURB]*)(.*)$", leaf.value, re.DOTALL)
2742     assert match is not None, f"failed to match string {leaf.value!r}"
2743     orig_prefix = match.group(1)
2744     new_prefix = orig_prefix.lower()
2745     if remove_u_prefix:
2746         new_prefix = new_prefix.replace("u", "")
2747     leaf.value = f"{new_prefix}{match.group(2)}"
2748
2749
2750 def normalize_string_quotes(leaf: Leaf) -> None:
2751     """Prefer double quotes but only if it doesn't cause more escaping.
2752
2753     Adds or removes backslashes as appropriate. Doesn't parse and fix
2754     strings nested in f-strings (yet).
2755
2756     Note: Mutates its argument.
2757     """
2758     value = leaf.value.lstrip("furbFURB")
2759     if value[:3] == '"""':
2760         return
2761
2762     elif value[:3] == "'''":
2763         orig_quote = "'''"
2764         new_quote = '"""'
2765     elif value[0] == '"':
2766         orig_quote = '"'
2767         new_quote = "'"
2768     else:
2769         orig_quote = "'"
2770         new_quote = '"'
2771     first_quote_pos = leaf.value.find(orig_quote)
2772     if first_quote_pos == -1:
2773         return  # There's an internal error
2774
2775     prefix = leaf.value[:first_quote_pos]
2776     unescaped_new_quote = re.compile(rf"(([^\\]|^)(\\\\)*){new_quote}")
2777     escaped_new_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){new_quote}")
2778     escaped_orig_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){orig_quote}")
2779     body = leaf.value[first_quote_pos + len(orig_quote) : -len(orig_quote)]
2780     if "r" in prefix.casefold():
2781         if unescaped_new_quote.search(body):
2782             # There's at least one unescaped new_quote in this raw string
2783             # so converting is impossible
2784             return
2785
2786         # Do not introduce or remove backslashes in raw strings
2787         new_body = body
2788     else:
2789         # remove unnecessary escapes
2790         new_body = sub_twice(escaped_new_quote, rf"\1\2{new_quote}", body)
2791         if body != new_body:
2792             # Consider the string without unnecessary escapes as the original
2793             body = new_body
2794             leaf.value = f"{prefix}{orig_quote}{body}{orig_quote}"
2795         new_body = sub_twice(escaped_orig_quote, rf"\1\2{orig_quote}", new_body)
2796         new_body = sub_twice(unescaped_new_quote, rf"\1\\{new_quote}", new_body)
2797     if "f" in prefix.casefold():
2798         matches = re.findall(
2799             r"""
2800             (?:[^{]|^)\{  # start of the string or a non-{ followed by a single {
2801                 ([^{].*?)  # contents of the brackets except if begins with {{
2802             \}(?:[^}]|$)  # A } followed by end of the string or a non-}
2803             """,
2804             new_body,
2805             re.VERBOSE,
2806         )
2807         for m in matches:
2808             if "\\" in str(m):
2809                 # Do not introduce backslashes in interpolated expressions
2810                 return
2811     if new_quote == '"""' and new_body[-1:] == '"':
2812         # edge case:
2813         new_body = new_body[:-1] + '\\"'
2814     orig_escape_count = body.count("\\")
2815     new_escape_count = new_body.count("\\")
2816     if new_escape_count > orig_escape_count:
2817         return  # Do not introduce more escaping
2818
2819     if new_escape_count == orig_escape_count and orig_quote == '"':
2820         return  # Prefer double quotes
2821
2822     leaf.value = f"{prefix}{new_quote}{new_body}{new_quote}"
2823
2824
2825 def normalize_numeric_literal(leaf: Leaf) -> None:
2826     """Normalizes numeric (float, int, and complex) literals.
2827
2828     All letters used in the representation are normalized to lowercase (except
2829     in Python 2 long literals).
2830     """
2831     text = leaf.value.lower()
2832     if text.startswith(("0o", "0b")):
2833         # Leave octal and binary literals alone.
2834         pass
2835     elif text.startswith("0x"):
2836         # Change hex literals to upper case.
2837         before, after = text[:2], text[2:]
2838         text = f"{before}{after.upper()}"
2839     elif "e" in text:
2840         before, after = text.split("e")
2841         sign = ""
2842         if after.startswith("-"):
2843             after = after[1:]
2844             sign = "-"
2845         elif after.startswith("+"):
2846             after = after[1:]
2847         before = format_float_or_int_string(before)
2848         text = f"{before}e{sign}{after}"
2849     elif text.endswith(("j", "l")):
2850         number = text[:-1]
2851         suffix = text[-1]
2852         # Capitalize in "2L" because "l" looks too similar to "1".
2853         if suffix == "l":
2854             suffix = "L"
2855         text = f"{format_float_or_int_string(number)}{suffix}"
2856     else:
2857         text = format_float_or_int_string(text)
2858     leaf.value = text
2859
2860
2861 def format_float_or_int_string(text: str) -> str:
2862     """Formats a float string like "1.0"."""
2863     if "." not in text:
2864         return text
2865
2866     before, after = text.split(".")
2867     return f"{before or 0}.{after or 0}"
2868
2869
2870 def normalize_invisible_parens(node: Node, parens_after: Set[str]) -> None:
2871     """Make existing optional parentheses invisible or create new ones.
2872
2873     `parens_after` is a set of string leaf values immediately after which parens
2874     should be put.
2875
2876     Standardizes on visible parentheses for single-element tuples, and keeps
2877     existing visible parentheses for other tuples and generator expressions.
2878     """
2879     for pc in list_comments(node.prefix, is_endmarker=False):
2880         if pc.value in FMT_OFF:
2881             # This `node` has a prefix with `# fmt: off`, don't mess with parens.
2882             return
2883
2884     check_lpar = False
2885     for index, child in enumerate(list(node.children)):
2886         # Add parentheses around long tuple unpacking in assignments.
2887         if (
2888             index == 0
2889             and isinstance(child, Node)
2890             and child.type == syms.testlist_star_expr
2891         ):
2892             check_lpar = True
2893
2894         if check_lpar:
2895             if is_walrus_assignment(child):
2896                 continue
2897             if child.type == syms.atom:
2898                 # Determines if the underlying atom should be surrounded with
2899                 # invisible params - also makes parens invisible recursively
2900                 # within the atom and removes repeated invisible parens within
2901                 # the atom
2902                 should_surround_with_parens = maybe_make_parens_invisible_in_atom(
2903                     child, parent=node
2904                 )
2905
2906                 if should_surround_with_parens:
2907                     lpar = Leaf(token.LPAR, "")
2908                     rpar = Leaf(token.RPAR, "")
2909                     index = child.remove() or 0
2910                     node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2911             elif is_one_tuple(child):
2912                 # wrap child in visible parentheses
2913                 lpar = Leaf(token.LPAR, "(")
2914                 rpar = Leaf(token.RPAR, ")")
2915                 child.remove()
2916                 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2917             elif node.type == syms.import_from:
2918                 # "import from" nodes store parentheses directly as part of
2919                 # the statement
2920                 if child.type == token.LPAR:
2921                     # make parentheses invisible
2922                     child.value = ""  # type: ignore
2923                     node.children[-1].value = ""  # type: ignore
2924                 elif child.type != token.STAR:
2925                     # insert invisible parentheses
2926                     node.insert_child(index, Leaf(token.LPAR, ""))
2927                     node.append_child(Leaf(token.RPAR, ""))
2928                 break
2929
2930             elif not (isinstance(child, Leaf) and is_multiline_string(child)):
2931                 # wrap child in invisible parentheses
2932                 lpar = Leaf(token.LPAR, "")
2933                 rpar = Leaf(token.RPAR, "")
2934                 index = child.remove() or 0
2935                 prefix = child.prefix
2936                 child.prefix = ""
2937                 new_child = Node(syms.atom, [lpar, child, rpar])
2938                 new_child.prefix = prefix
2939                 node.insert_child(index, new_child)
2940
2941         check_lpar = isinstance(child, Leaf) and child.value in parens_after
2942
2943
2944 def normalize_fmt_off(node: Node) -> None:
2945     """Convert content between `# fmt: off`/`# fmt: on` into standalone comments."""
2946     try_again = True
2947     while try_again:
2948         try_again = convert_one_fmt_off_pair(node)
2949
2950
2951 def convert_one_fmt_off_pair(node: Node) -> bool:
2952     """Convert content of a single `# fmt: off`/`# fmt: on` into a standalone comment.
2953
2954     Returns True if a pair was converted.
2955     """
2956     for leaf in node.leaves():
2957         previous_consumed = 0
2958         for comment in list_comments(leaf.prefix, is_endmarker=False):
2959             if comment.value in FMT_OFF:
2960                 # We only want standalone comments. If there's no previous leaf or
2961                 # the previous leaf is indentation, it's a standalone comment in
2962                 # disguise.
2963                 if comment.type != STANDALONE_COMMENT:
2964                     prev = preceding_leaf(leaf)
2965                     if prev and prev.type not in WHITESPACE:
2966                         continue
2967
2968                 ignored_nodes = list(generate_ignored_nodes(leaf))
2969                 if not ignored_nodes:
2970                     continue
2971
2972                 first = ignored_nodes[0]  # Can be a container node with the `leaf`.
2973                 parent = first.parent
2974                 prefix = first.prefix
2975                 first.prefix = prefix[comment.consumed :]
2976                 hidden_value = (
2977                     comment.value + "\n" + "".join(str(n) for n in ignored_nodes)
2978                 )
2979                 if hidden_value.endswith("\n"):
2980                     # That happens when one of the `ignored_nodes` ended with a NEWLINE
2981                     # leaf (possibly followed by a DEDENT).
2982                     hidden_value = hidden_value[:-1]
2983                 first_idx = None
2984                 for ignored in ignored_nodes:
2985                     index = ignored.remove()
2986                     if first_idx is None:
2987                         first_idx = index
2988                 assert parent is not None, "INTERNAL ERROR: fmt: on/off handling (1)"
2989                 assert first_idx is not None, "INTERNAL ERROR: fmt: on/off handling (2)"
2990                 parent.insert_child(
2991                     first_idx,
2992                     Leaf(
2993                         STANDALONE_COMMENT,
2994                         hidden_value,
2995                         prefix=prefix[:previous_consumed] + "\n" * comment.newlines,
2996                     ),
2997                 )
2998                 return True
2999
3000             previous_consumed = comment.consumed
3001
3002     return False
3003
3004
3005 def generate_ignored_nodes(leaf: Leaf) -> Iterator[LN]:
3006     """Starting from the container of `leaf`, generate all leaves until `# fmt: on`.
3007
3008     Stops at the end of the block.
3009     """
3010     container: Optional[LN] = container_of(leaf)
3011     while container is not None and container.type != token.ENDMARKER:
3012         for comment in list_comments(container.prefix, is_endmarker=False):
3013             if comment.value in FMT_ON:
3014                 return
3015
3016         yield container
3017
3018         container = container.next_sibling
3019
3020
3021 def maybe_make_parens_invisible_in_atom(node: LN, parent: LN) -> bool:
3022     """If it's safe, make the parens in the atom `node` invisible, recursively.
3023     Additionally, remove repeated, adjacent invisible parens from the atom `node`
3024     as they are redundant.
3025
3026     Returns whether the node should itself be wrapped in invisible parentheses.
3027
3028     """
3029     if (
3030         node.type != syms.atom
3031         or is_empty_tuple(node)
3032         or is_one_tuple(node)
3033         or (is_yield(node) and parent.type != syms.expr_stmt)
3034         or max_delimiter_priority_in_atom(node) >= COMMA_PRIORITY
3035     ):
3036         return False
3037
3038     first = node.children[0]
3039     last = node.children[-1]
3040     if first.type == token.LPAR and last.type == token.RPAR:
3041         middle = node.children[1]
3042         # make parentheses invisible
3043         first.value = ""  # type: ignore
3044         last.value = ""  # type: ignore
3045         maybe_make_parens_invisible_in_atom(middle, parent=parent)
3046
3047         if is_atom_with_invisible_parens(middle):
3048             # Strip the invisible parens from `middle` by replacing
3049             # it with the child in-between the invisible parens
3050             middle.replace(middle.children[1])
3051
3052         return False
3053
3054     return True
3055
3056
3057 def is_atom_with_invisible_parens(node: LN) -> bool:
3058     """Given a `LN`, determines whether it's an atom `node` with invisible
3059     parens. Useful in dedupe-ing and normalizing parens.
3060     """
3061     if isinstance(node, Leaf) or node.type != syms.atom:
3062         return False
3063
3064     first, last = node.children[0], node.children[-1]
3065     return (
3066         isinstance(first, Leaf)
3067         and first.type == token.LPAR
3068         and first.value == ""
3069         and isinstance(last, Leaf)
3070         and last.type == token.RPAR
3071         and last.value == ""
3072     )
3073
3074
3075 def is_empty_tuple(node: LN) -> bool:
3076     """Return True if `node` holds an empty tuple."""
3077     return (
3078         node.type == syms.atom
3079         and len(node.children) == 2
3080         and node.children[0].type == token.LPAR
3081         and node.children[1].type == token.RPAR
3082     )
3083
3084
3085 def unwrap_singleton_parenthesis(node: LN) -> Optional[LN]:
3086     """Returns `wrapped` if `node` is of the shape ( wrapped ).
3087
3088     Parenthesis can be optional. Returns None otherwise"""
3089     if len(node.children) != 3:
3090         return None
3091     lpar, wrapped, rpar = node.children
3092     if not (lpar.type == token.LPAR and rpar.type == token.RPAR):
3093         return None
3094
3095     return wrapped
3096
3097
3098 def is_one_tuple(node: LN) -> bool:
3099     """Return True if `node` holds a tuple with one element, with or without parens."""
3100     if node.type == syms.atom:
3101         gexp = unwrap_singleton_parenthesis(node)
3102         if gexp is None or gexp.type != syms.testlist_gexp:
3103             return False
3104
3105         return len(gexp.children) == 2 and gexp.children[1].type == token.COMMA
3106
3107     return (
3108         node.type in IMPLICIT_TUPLE
3109         and len(node.children) == 2
3110         and node.children[1].type == token.COMMA
3111     )
3112
3113
3114 def is_walrus_assignment(node: LN) -> bool:
3115     """Return True iff `node` is of the shape ( test := test )"""
3116     inner = unwrap_singleton_parenthesis(node)
3117     return inner is not None and inner.type == syms.namedexpr_test
3118
3119
3120 def is_yield(node: LN) -> bool:
3121     """Return True if `node` holds a `yield` or `yield from` expression."""
3122     if node.type == syms.yield_expr:
3123         return True
3124
3125     if node.type == token.NAME and node.value == "yield":  # type: ignore
3126         return True
3127
3128     if node.type != syms.atom:
3129         return False
3130
3131     if len(node.children) != 3:
3132         return False
3133
3134     lpar, expr, rpar = node.children
3135     if lpar.type == token.LPAR and rpar.type == token.RPAR:
3136         return is_yield(expr)
3137
3138     return False
3139
3140
3141 def is_vararg(leaf: Leaf, within: Set[NodeType]) -> bool:
3142     """Return True if `leaf` is a star or double star in a vararg or kwarg.
3143
3144     If `within` includes VARARGS_PARENTS, this applies to function signatures.
3145     If `within` includes UNPACKING_PARENTS, it applies to right hand-side
3146     extended iterable unpacking (PEP 3132) and additional unpacking
3147     generalizations (PEP 448).
3148     """
3149     if leaf.type not in VARARGS_SPECIALS or not leaf.parent:
3150         return False
3151
3152     p = leaf.parent
3153     if p.type == syms.star_expr:
3154         # Star expressions are also used as assignment targets in extended
3155         # iterable unpacking (PEP 3132).  See what its parent is instead.
3156         if not p.parent:
3157             return False
3158
3159         p = p.parent
3160
3161     return p.type in within
3162
3163
3164 def is_multiline_string(leaf: Leaf) -> bool:
3165     """Return True if `leaf` is a multiline string that actually spans many lines."""
3166     value = leaf.value.lstrip("furbFURB")
3167     return value[:3] in {'"""', "'''"} and "\n" in value
3168
3169
3170 def is_stub_suite(node: Node) -> bool:
3171     """Return True if `node` is a suite with a stub body."""
3172     if (
3173         len(node.children) != 4
3174         or node.children[0].type != token.NEWLINE
3175         or node.children[1].type != token.INDENT
3176         or node.children[3].type != token.DEDENT
3177     ):
3178         return False
3179
3180     return is_stub_body(node.children[2])
3181
3182
3183 def is_stub_body(node: LN) -> bool:
3184     """Return True if `node` is a simple statement containing an ellipsis."""
3185     if not isinstance(node, Node) or node.type != syms.simple_stmt:
3186         return False
3187
3188     if len(node.children) != 2:
3189         return False
3190
3191     child = node.children[0]
3192     return (
3193         child.type == syms.atom
3194         and len(child.children) == 3
3195         and all(leaf == Leaf(token.DOT, ".") for leaf in child.children)
3196     )
3197
3198
3199 def max_delimiter_priority_in_atom(node: LN) -> Priority:
3200     """Return maximum delimiter priority inside `node`.
3201
3202     This is specific to atoms with contents contained in a pair of parentheses.
3203     If `node` isn't an atom or there are no enclosing parentheses, returns 0.
3204     """
3205     if node.type != syms.atom:
3206         return 0
3207
3208     first = node.children[0]
3209     last = node.children[-1]
3210     if not (first.type == token.LPAR and last.type == token.RPAR):
3211         return 0
3212
3213     bt = BracketTracker()
3214     for c in node.children[1:-1]:
3215         if isinstance(c, Leaf):
3216             bt.mark(c)
3217         else:
3218             for leaf in c.leaves():
3219                 bt.mark(leaf)
3220     try:
3221         return bt.max_delimiter_priority()
3222
3223     except ValueError:
3224         return 0
3225
3226
3227 def ensure_visible(leaf: Leaf) -> None:
3228     """Make sure parentheses are visible.
3229
3230     They could be invisible as part of some statements (see
3231     :func:`normalize_invisible_parens` and :func:`visit_import_from`).
3232     """
3233     if leaf.type == token.LPAR:
3234         leaf.value = "("
3235     elif leaf.type == token.RPAR:
3236         leaf.value = ")"
3237
3238
3239 def should_explode(line: Line, opening_bracket: Leaf) -> bool:
3240     """Should `line` immediately be split with `delimiter_split()` after RHS?"""
3241
3242     if not (
3243         opening_bracket.parent
3244         and opening_bracket.parent.type in {syms.atom, syms.import_from}
3245         and opening_bracket.value in "[{("
3246     ):
3247         return False
3248
3249     try:
3250         last_leaf = line.leaves[-1]
3251         exclude = {id(last_leaf)} if last_leaf.type == token.COMMA else set()
3252         max_priority = line.bracket_tracker.max_delimiter_priority(exclude=exclude)
3253     except (IndexError, ValueError):
3254         return False
3255
3256     return max_priority == COMMA_PRIORITY
3257
3258
3259 def get_features_used(node: Node) -> Set[Feature]:
3260     """Return a set of (relatively) new Python features used in this file.
3261
3262     Currently looking for:
3263     - f-strings;
3264     - underscores in numeric literals;
3265     - trailing commas after * or ** in function signatures and calls;
3266     - positional only arguments in function signatures and lambdas;
3267     """
3268     features: Set[Feature] = set()
3269     for n in node.pre_order():
3270         if n.type == token.STRING:
3271             value_head = n.value[:2]  # type: ignore
3272             if value_head in {'f"', 'F"', "f'", "F'", "rf", "fr", "RF", "FR"}:
3273                 features.add(Feature.F_STRINGS)
3274
3275         elif n.type == token.NUMBER:
3276             if "_" in n.value:  # type: ignore
3277                 features.add(Feature.NUMERIC_UNDERSCORES)
3278
3279         elif n.type == token.SLASH:
3280             if n.parent and n.parent.type in {syms.typedargslist, syms.arglist}:
3281                 features.add(Feature.POS_ONLY_ARGUMENTS)
3282
3283         elif n.type == token.COLONEQUAL:
3284             features.add(Feature.ASSIGNMENT_EXPRESSIONS)
3285
3286         elif (
3287             n.type in {syms.typedargslist, syms.arglist}
3288             and n.children
3289             and n.children[-1].type == token.COMMA
3290         ):
3291             if n.type == syms.typedargslist:
3292                 feature = Feature.TRAILING_COMMA_IN_DEF
3293             else:
3294                 feature = Feature.TRAILING_COMMA_IN_CALL
3295
3296             for ch in n.children:
3297                 if ch.type in STARS:
3298                     features.add(feature)
3299
3300                 if ch.type == syms.argument:
3301                     for argch in ch.children:
3302                         if argch.type in STARS:
3303                             features.add(feature)
3304
3305     return features
3306
3307
3308 def detect_target_versions(node: Node) -> Set[TargetVersion]:
3309     """Detect the version to target based on the nodes used."""
3310     features = get_features_used(node)
3311     return {
3312         version for version in TargetVersion if features <= VERSION_TO_FEATURES[version]
3313     }
3314
3315
3316 def generate_trailers_to_omit(line: Line, line_length: int) -> Iterator[Set[LeafID]]:
3317     """Generate sets of closing bracket IDs that should be omitted in a RHS.
3318
3319     Brackets can be omitted if the entire trailer up to and including
3320     a preceding closing bracket fits in one line.
3321
3322     Yielded sets are cumulative (contain results of previous yields, too).  First
3323     set is empty.
3324     """
3325
3326     omit: Set[LeafID] = set()
3327     yield omit
3328
3329     length = 4 * line.depth
3330     opening_bracket = None
3331     closing_bracket = None
3332     inner_brackets: Set[LeafID] = set()
3333     for index, leaf, leaf_length in enumerate_with_length(line, reversed=True):
3334         length += leaf_length
3335         if length > line_length:
3336             break
3337
3338         has_inline_comment = leaf_length > len(leaf.value) + len(leaf.prefix)
3339         if leaf.type == STANDALONE_COMMENT or has_inline_comment:
3340             break
3341
3342         if opening_bracket:
3343             if leaf is opening_bracket:
3344                 opening_bracket = None
3345             elif leaf.type in CLOSING_BRACKETS:
3346                 inner_brackets.add(id(leaf))
3347         elif leaf.type in CLOSING_BRACKETS:
3348             if index > 0 and line.leaves[index - 1].type in OPENING_BRACKETS:
3349                 # Empty brackets would fail a split so treat them as "inner"
3350                 # brackets (e.g. only add them to the `omit` set if another
3351                 # pair of brackets was good enough.
3352                 inner_brackets.add(id(leaf))
3353                 continue
3354
3355             if closing_bracket:
3356                 omit.add(id(closing_bracket))
3357                 omit.update(inner_brackets)
3358                 inner_brackets.clear()
3359                 yield omit
3360
3361             if leaf.value:
3362                 opening_bracket = leaf.opening_bracket
3363                 closing_bracket = leaf
3364
3365
3366 def get_future_imports(node: Node) -> Set[str]:
3367     """Return a set of __future__ imports in the file."""
3368     imports: Set[str] = set()
3369
3370     def get_imports_from_children(children: List[LN]) -> Generator[str, None, None]:
3371         for child in children:
3372             if isinstance(child, Leaf):
3373                 if child.type == token.NAME:
3374                     yield child.value
3375             elif child.type == syms.import_as_name:
3376                 orig_name = child.children[0]
3377                 assert isinstance(orig_name, Leaf), "Invalid syntax parsing imports"
3378                 assert orig_name.type == token.NAME, "Invalid syntax parsing imports"
3379                 yield orig_name.value
3380             elif child.type == syms.import_as_names:
3381                 yield from get_imports_from_children(child.children)
3382             else:
3383                 raise AssertionError("Invalid syntax parsing imports")
3384
3385     for child in node.children:
3386         if child.type != syms.simple_stmt:
3387             break
3388         first_child = child.children[0]
3389         if isinstance(first_child, Leaf):
3390             # Continue looking if we see a docstring; otherwise stop.
3391             if (
3392                 len(child.children) == 2
3393                 and first_child.type == token.STRING
3394                 and child.children[1].type == token.NEWLINE
3395             ):
3396                 continue
3397             else:
3398                 break
3399         elif first_child.type == syms.import_from:
3400             module_name = first_child.children[1]
3401             if not isinstance(module_name, Leaf) or module_name.value != "__future__":
3402                 break
3403             imports |= set(get_imports_from_children(first_child.children[3:]))
3404         else:
3405             break
3406     return imports
3407
3408
3409 def gen_python_files_in_dir(
3410     path: Path,
3411     root: Path,
3412     include: Pattern[str],
3413     exclude: Pattern[str],
3414     report: "Report",
3415 ) -> Iterator[Path]:
3416     """Generate all files under `path` whose paths are not excluded by the
3417     `exclude` regex, but are included by the `include` regex.
3418
3419     Symbolic links pointing outside of the `root` directory are ignored.
3420
3421     `report` is where output about exclusions goes.
3422     """
3423     assert root.is_absolute(), f"INTERNAL ERROR: `root` must be absolute but is {root}"
3424     for child in path.iterdir():
3425         try:
3426             normalized_path = "/" + child.resolve().relative_to(root).as_posix()
3427         except ValueError:
3428             if child.is_symlink():
3429                 report.path_ignored(
3430                     child, f"is a symbolic link that points outside {root}"
3431                 )
3432                 continue
3433
3434             raise
3435
3436         if child.is_dir():
3437             normalized_path += "/"
3438         exclude_match = exclude.search(normalized_path)
3439         if exclude_match and exclude_match.group(0):
3440             report.path_ignored(child, f"matches the --exclude regular expression")
3441             continue
3442
3443         if child.is_dir():
3444             yield from gen_python_files_in_dir(child, root, include, exclude, report)
3445
3446         elif child.is_file():
3447             include_match = include.search(normalized_path)
3448             if include_match:
3449                 yield child
3450
3451
3452 @lru_cache()
3453 def find_project_root(srcs: Iterable[str]) -> Path:
3454     """Return a directory containing .git, .hg, or pyproject.toml.
3455
3456     That directory can be one of the directories passed in `srcs` or their
3457     common parent.
3458
3459     If no directory in the tree contains a marker that would specify it's the
3460     project root, the root of the file system is returned.
3461     """
3462     if not srcs:
3463         return Path("/").resolve()
3464
3465     common_base = min(Path(src).resolve() for src in srcs)
3466     if common_base.is_dir():
3467         # Append a fake file so `parents` below returns `common_base_dir`, too.
3468         common_base /= "fake-file"
3469     for directory in common_base.parents:
3470         if (directory / ".git").is_dir():
3471             return directory
3472
3473         if (directory / ".hg").is_dir():
3474             return directory
3475
3476         if (directory / "pyproject.toml").is_file():
3477             return directory
3478
3479     return directory
3480
3481
3482 @dataclass
3483 class Report:
3484     """Provides a reformatting counter. Can be rendered with `str(report)`."""
3485
3486     check: bool = False
3487     quiet: bool = False
3488     verbose: bool = False
3489     change_count: int = 0
3490     same_count: int = 0
3491     failure_count: int = 0
3492
3493     def done(self, src: Path, changed: Changed) -> None:
3494         """Increment the counter for successful reformatting. Write out a message."""
3495         if changed is Changed.YES:
3496             reformatted = "would reformat" if self.check else "reformatted"
3497             if self.verbose or not self.quiet:
3498                 out(f"{reformatted} {src}")
3499             self.change_count += 1
3500         else:
3501             if self.verbose:
3502                 if changed is Changed.NO:
3503                     msg = f"{src} already well formatted, good job."
3504                 else:
3505                     msg = f"{src} wasn't modified on disk since last run."
3506                 out(msg, bold=False)
3507             self.same_count += 1
3508
3509     def failed(self, src: Path, message: str) -> None:
3510         """Increment the counter for failed reformatting. Write out a message."""
3511         err(f"error: cannot format {src}: {message}")
3512         self.failure_count += 1
3513
3514     def path_ignored(self, path: Path, message: str) -> None:
3515         if self.verbose:
3516             out(f"{path} ignored: {message}", bold=False)
3517
3518     @property
3519     def return_code(self) -> int:
3520         """Return the exit code that the app should use.
3521
3522         This considers the current state of changed files and failures:
3523         - if there were any failures, return 123;
3524         - if any files were changed and --check is being used, return 1;
3525         - otherwise return 0.
3526         """
3527         # According to http://tldp.org/LDP/abs/html/exitcodes.html starting with
3528         # 126 we have special return codes reserved by the shell.
3529         if self.failure_count:
3530             return 123
3531
3532         elif self.change_count and self.check:
3533             return 1
3534
3535         return 0
3536
3537     def __str__(self) -> str:
3538         """Render a color report of the current state.
3539
3540         Use `click.unstyle` to remove colors.
3541         """
3542         if self.check:
3543             reformatted = "would be reformatted"
3544             unchanged = "would be left unchanged"
3545             failed = "would fail to reformat"
3546         else:
3547             reformatted = "reformatted"
3548             unchanged = "left unchanged"
3549             failed = "failed to reformat"
3550         report = []
3551         if self.change_count:
3552             s = "s" if self.change_count > 1 else ""
3553             report.append(
3554                 click.style(f"{self.change_count} file{s} {reformatted}", bold=True)
3555             )
3556         if self.same_count:
3557             s = "s" if self.same_count > 1 else ""
3558             report.append(f"{self.same_count} file{s} {unchanged}")
3559         if self.failure_count:
3560             s = "s" if self.failure_count > 1 else ""
3561             report.append(
3562                 click.style(f"{self.failure_count} file{s} {failed}", fg="red")
3563             )
3564         return ", ".join(report) + "."
3565
3566
3567 def parse_ast(src: str) -> Union[ast.AST, ast3.AST, ast27.AST]:
3568     filename = "<unknown>"
3569     if sys.version_info >= (3, 8):
3570         # TODO: support Python 4+ ;)
3571         for minor_version in range(sys.version_info[1], 4, -1):
3572             try:
3573                 return ast.parse(src, filename, feature_version=(3, minor_version))
3574             except SyntaxError:
3575                 continue
3576     else:
3577         for feature_version in (7, 6):
3578             try:
3579                 return ast3.parse(src, filename, feature_version=feature_version)
3580             except SyntaxError:
3581                 continue
3582
3583     return ast27.parse(src)
3584
3585
3586 def _fixup_ast_constants(
3587     node: Union[ast.AST, ast3.AST, ast27.AST]
3588 ) -> Union[ast.AST, ast3.AST, ast27.AST]:
3589     """Map ast nodes deprecated in 3.8 to Constant."""
3590     # casts are required until this is released:
3591     # https://github.com/python/typeshed/pull/3142
3592     if isinstance(node, (ast.Str, ast3.Str, ast27.Str, ast.Bytes, ast3.Bytes)):
3593         return cast(ast.AST, ast.Constant(value=node.s))
3594     elif isinstance(node, (ast.Num, ast3.Num, ast27.Num)):
3595         return cast(ast.AST, ast.Constant(value=node.n))
3596     elif isinstance(node, (ast.NameConstant, ast3.NameConstant)):
3597         return cast(ast.AST, ast.Constant(value=node.value))
3598     return node
3599
3600
3601 def assert_equivalent(src: str, dst: str) -> None:
3602     """Raise AssertionError if `src` and `dst` aren't equivalent."""
3603
3604     def _v(node: Union[ast.AST, ast3.AST, ast27.AST], depth: int = 0) -> Iterator[str]:
3605         """Simple visitor generating strings to compare ASTs by content."""
3606
3607         node = _fixup_ast_constants(node)
3608
3609         yield f"{'  ' * depth}{node.__class__.__name__}("
3610
3611         for field in sorted(node._fields):
3612             # TypeIgnore has only one field 'lineno' which breaks this comparison
3613             type_ignore_classes = (ast3.TypeIgnore, ast27.TypeIgnore)
3614             if sys.version_info >= (3, 8):
3615                 type_ignore_classes += (ast.TypeIgnore,)
3616             if isinstance(node, type_ignore_classes):
3617                 break
3618
3619             try:
3620                 value = getattr(node, field)
3621             except AttributeError:
3622                 continue
3623
3624             yield f"{'  ' * (depth+1)}{field}="
3625
3626             if isinstance(value, list):
3627                 for item in value:
3628                     # Ignore nested tuples within del statements, because we may insert
3629                     # parentheses and they change the AST.
3630                     if (
3631                         field == "targets"
3632                         and isinstance(node, (ast.Delete, ast3.Delete, ast27.Delete))
3633                         and isinstance(item, (ast.Tuple, ast3.Tuple, ast27.Tuple))
3634                     ):
3635                         for item in item.elts:
3636                             yield from _v(item, depth + 2)
3637                     elif isinstance(item, (ast.AST, ast3.AST, ast27.AST)):
3638                         yield from _v(item, depth + 2)
3639
3640             elif isinstance(value, (ast.AST, ast3.AST, ast27.AST)):
3641                 yield from _v(value, depth + 2)
3642
3643             else:
3644                 yield f"{'  ' * (depth+2)}{value!r},  # {value.__class__.__name__}"
3645
3646         yield f"{'  ' * depth})  # /{node.__class__.__name__}"
3647
3648     try:
3649         src_ast = parse_ast(src)
3650     except Exception as exc:
3651         raise AssertionError(
3652             f"cannot use --safe with this file; failed to parse source file.  "
3653             f"AST error message: {exc}"
3654         )
3655
3656     try:
3657         dst_ast = parse_ast(dst)
3658     except Exception as exc:
3659         log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
3660         raise AssertionError(
3661             f"INTERNAL ERROR: Black produced invalid code: {exc}. "
3662             f"Please report a bug on https://github.com/psf/black/issues.  "
3663             f"This invalid output might be helpful: {log}"
3664         ) from None
3665
3666     src_ast_str = "\n".join(_v(src_ast))
3667     dst_ast_str = "\n".join(_v(dst_ast))
3668     if src_ast_str != dst_ast_str:
3669         log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst"))
3670         raise AssertionError(
3671             f"INTERNAL ERROR: Black produced code that is not equivalent to "
3672             f"the source.  "
3673             f"Please report a bug on https://github.com/psf/black/issues.  "
3674             f"This diff might be helpful: {log}"
3675         ) from None
3676
3677
3678 def assert_stable(src: str, dst: str, mode: FileMode) -> None:
3679     """Raise AssertionError if `dst` reformats differently the second time."""
3680     newdst = format_str(dst, mode=mode)
3681     if dst != newdst:
3682         log = dump_to_file(
3683             diff(src, dst, "source", "first pass"),
3684             diff(dst, newdst, "first pass", "second pass"),
3685         )
3686         raise AssertionError(
3687             f"INTERNAL ERROR: Black produced different code on the second pass "
3688             f"of the formatter.  "
3689             f"Please report a bug on https://github.com/psf/black/issues.  "
3690             f"This diff might be helpful: {log}"
3691         ) from None
3692
3693
3694 def dump_to_file(*output: str) -> str:
3695     """Dump `output` to a temporary file. Return path to the file."""
3696     with tempfile.NamedTemporaryFile(
3697         mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
3698     ) as f:
3699         for lines in output:
3700             f.write(lines)
3701             if lines and lines[-1] != "\n":
3702                 f.write("\n")
3703     return f.name
3704
3705
3706 @contextmanager
3707 def nullcontext() -> Iterator[None]:
3708     """Return context manager that does nothing.
3709     Similar to `nullcontext` from python 3.7"""
3710     yield
3711
3712
3713 def diff(a: str, b: str, a_name: str, b_name: str) -> str:
3714     """Return a unified diff string between strings `a` and `b`."""
3715     import difflib
3716
3717     a_lines = [line + "\n" for line in a.split("\n")]
3718     b_lines = [line + "\n" for line in b.split("\n")]
3719     return "".join(
3720         difflib.unified_diff(a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5)
3721     )
3722
3723
3724 def cancel(tasks: Iterable[asyncio.Task]) -> None:
3725     """asyncio signal handler that cancels all `tasks` and reports to stderr."""
3726     err("Aborted!")
3727     for task in tasks:
3728         task.cancel()
3729
3730
3731 def shutdown(loop: asyncio.AbstractEventLoop) -> None:
3732     """Cancel all pending tasks on `loop`, wait for them, and close the loop."""
3733     try:
3734         if sys.version_info[:2] >= (3, 7):
3735             all_tasks = asyncio.all_tasks
3736         else:
3737             all_tasks = asyncio.Task.all_tasks
3738         # This part is borrowed from asyncio/runners.py in Python 3.7b2.
3739         to_cancel = [task for task in all_tasks(loop) if not task.done()]
3740         if not to_cancel:
3741             return
3742
3743         for task in to_cancel:
3744             task.cancel()
3745         loop.run_until_complete(
3746             asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
3747         )
3748     finally:
3749         # `concurrent.futures.Future` objects cannot be cancelled once they
3750         # are already running. There might be some when the `shutdown()` happened.
3751         # Silence their logger's spew about the event loop being closed.
3752         cf_logger = logging.getLogger("concurrent.futures")
3753         cf_logger.setLevel(logging.CRITICAL)
3754         loop.close()
3755
3756
3757 def sub_twice(regex: Pattern[str], replacement: str, original: str) -> str:
3758     """Replace `regex` with `replacement` twice on `original`.
3759
3760     This is used by string normalization to perform replaces on
3761     overlapping matches.
3762     """
3763     return regex.sub(replacement, regex.sub(replacement, original))
3764
3765
3766 def re_compile_maybe_verbose(regex: str) -> Pattern[str]:
3767     """Compile a regular expression string in `regex`.
3768
3769     If it contains newlines, use verbose mode.
3770     """
3771     if "\n" in regex:
3772         regex = "(?x)" + regex
3773     return re.compile(regex)
3774
3775
3776 def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
3777     """Like `reversed(enumerate(sequence))` if that were possible."""
3778     index = len(sequence) - 1
3779     for element in reversed(sequence):
3780         yield (index, element)
3781         index -= 1
3782
3783
3784 def enumerate_with_length(
3785     line: Line, reversed: bool = False
3786 ) -> Iterator[Tuple[Index, Leaf, int]]:
3787     """Return an enumeration of leaves with their length.
3788
3789     Stops prematurely on multiline strings and standalone comments.
3790     """
3791     op = cast(
3792         Callable[[Sequence[Leaf]], Iterator[Tuple[Index, Leaf]]],
3793         enumerate_reversed if reversed else enumerate,
3794     )
3795     for index, leaf in op(line.leaves):
3796         length = len(leaf.prefix) + len(leaf.value)
3797         if "\n" in leaf.value:
3798             return  # Multiline strings, we can't continue.
3799
3800         for comment in line.comments_after(leaf):
3801             length += len(comment.value)
3802
3803         yield index, leaf, length
3804
3805
3806 def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
3807     """Return True if `line` is no longer than `line_length`.
3808
3809     Uses the provided `line_str` rendering, if any, otherwise computes a new one.
3810     """
3811     if not line_str:
3812         line_str = str(line).strip("\n")
3813     return (
3814         len(line_str) <= line_length
3815         and "\n" not in line_str  # multiline strings
3816         and not line.contains_standalone_comments()
3817     )
3818
3819
3820 def can_be_split(line: Line) -> bool:
3821     """Return False if the line cannot be split *for sure*.
3822
3823     This is not an exhaustive search but a cheap heuristic that we can use to
3824     avoid some unfortunate formattings (mostly around wrapping unsplittable code
3825     in unnecessary parentheses).
3826     """
3827     leaves = line.leaves
3828     if len(leaves) < 2:
3829         return False
3830
3831     if leaves[0].type == token.STRING and leaves[1].type == token.DOT:
3832         call_count = 0
3833         dot_count = 0
3834         next = leaves[-1]
3835         for leaf in leaves[-2::-1]:
3836             if leaf.type in OPENING_BRACKETS:
3837                 if next.type not in CLOSING_BRACKETS:
3838                     return False
3839
3840                 call_count += 1
3841             elif leaf.type == token.DOT:
3842                 dot_count += 1
3843             elif leaf.type == token.NAME:
3844                 if not (next.type == token.DOT or next.type in OPENING_BRACKETS):
3845                     return False
3846
3847             elif leaf.type not in CLOSING_BRACKETS:
3848                 return False
3849
3850             if dot_count > 1 and call_count > 1:
3851                 return False
3852
3853     return True
3854
3855
3856 def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
3857     """Does `line` have a shape safe to reformat without optional parens around it?
3858
3859     Returns True for only a subset of potentially nice looking formattings but
3860     the point is to not return false positives that end up producing lines that
3861     are too long.
3862     """
3863     bt = line.bracket_tracker
3864     if not bt.delimiters:
3865         # Without delimiters the optional parentheses are useless.
3866         return True
3867
3868     max_priority = bt.max_delimiter_priority()
3869     if bt.delimiter_count_with_priority(max_priority) > 1:
3870         # With more than one delimiter of a kind the optional parentheses read better.
3871         return False
3872
3873     if max_priority == DOT_PRIORITY:
3874         # A single stranded method call doesn't require optional parentheses.
3875         return True
3876
3877     assert len(line.leaves) >= 2, "Stranded delimiter"
3878
3879     first = line.leaves[0]
3880     second = line.leaves[1]
3881     penultimate = line.leaves[-2]
3882     last = line.leaves[-1]
3883
3884     # With a single delimiter, omit if the expression starts or ends with
3885     # a bracket.
3886     if first.type in OPENING_BRACKETS and second.type not in CLOSING_BRACKETS:
3887         remainder = False
3888         length = 4 * line.depth
3889         for _index, leaf, leaf_length in enumerate_with_length(line):
3890             if leaf.type in CLOSING_BRACKETS and leaf.opening_bracket is first:
3891                 remainder = True
3892             if remainder:
3893                 length += leaf_length
3894                 if length > line_length:
3895                     break
3896
3897                 if leaf.type in OPENING_BRACKETS:
3898                     # There are brackets we can further split on.
3899                     remainder = False
3900
3901         else:
3902             # checked the entire string and line length wasn't exceeded
3903             if len(line.leaves) == _index + 1:
3904                 return True
3905
3906         # Note: we are not returning False here because a line might have *both*
3907         # a leading opening bracket and a trailing closing bracket.  If the
3908         # opening bracket doesn't match our rule, maybe the closing will.
3909
3910     if (
3911         last.type == token.RPAR
3912         or last.type == token.RBRACE
3913         or (
3914             # don't use indexing for omitting optional parentheses;
3915             # it looks weird
3916             last.type == token.RSQB
3917             and last.parent
3918             and last.parent.type != syms.trailer
3919         )
3920     ):
3921         if penultimate.type in OPENING_BRACKETS:
3922             # Empty brackets don't help.
3923             return False
3924
3925         if is_multiline_string(first):
3926             # Additional wrapping of a multiline string in this situation is
3927             # unnecessary.
3928             return True
3929
3930         length = 4 * line.depth
3931         seen_other_brackets = False
3932         for _index, leaf, leaf_length in enumerate_with_length(line):
3933             length += leaf_length
3934             if leaf is last.opening_bracket:
3935                 if seen_other_brackets or length <= line_length:
3936                     return True
3937
3938             elif leaf.type in OPENING_BRACKETS:
3939                 # There are brackets we can further split on.
3940                 seen_other_brackets = True
3941
3942     return False
3943
3944
3945 def get_cache_file(mode: FileMode) -> Path:
3946     return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
3947
3948
3949 def read_cache(mode: FileMode) -> Cache:
3950     """Read the cache if it exists and is well formed.
3951
3952     If it is not well formed, the call to write_cache later should resolve the issue.
3953     """
3954     cache_file = get_cache_file(mode)
3955     if not cache_file.exists():
3956         return {}
3957
3958     with cache_file.open("rb") as fobj:
3959         try:
3960             cache: Cache = pickle.load(fobj)
3961         except pickle.UnpicklingError:
3962             return {}
3963
3964     return cache
3965
3966
3967 def get_cache_info(path: Path) -> CacheInfo:
3968     """Return the information used to check if a file is already formatted or not."""
3969     stat = path.stat()
3970     return stat.st_mtime, stat.st_size
3971
3972
3973 def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
3974     """Split an iterable of paths in `sources` into two sets.
3975
3976     The first contains paths of files that modified on disk or are not in the
3977     cache. The other contains paths to non-modified files.
3978     """
3979     todo, done = set(), set()
3980     for src in sources:
3981         src = src.resolve()
3982         if cache.get(src) != get_cache_info(src):
3983             todo.add(src)
3984         else:
3985             done.add(src)
3986     return todo, done
3987
3988
3989 def write_cache(cache: Cache, sources: Iterable[Path], mode: FileMode) -> None:
3990     """Update the cache file."""
3991     cache_file = get_cache_file(mode)
3992     try:
3993         CACHE_DIR.mkdir(parents=True, exist_ok=True)
3994         new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
3995         with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
3996             pickle.dump(new_cache, f, protocol=pickle.HIGHEST_PROTOCOL)
3997         os.replace(f.name, cache_file)
3998     except OSError:
3999         pass
4000
4001
4002 def patch_click() -> None:
4003     """Make Click not crash.
4004
4005     On certain misconfigured environments, Python 3 selects the ASCII encoding as the
4006     default which restricts paths that it can access during the lifetime of the
4007     application.  Click refuses to work in this scenario by raising a RuntimeError.
4008
4009     In case of Black the likelihood that non-ASCII characters are going to be used in
4010     file paths is minimal since it's Python source code.  Moreover, this crash was
4011     spurious on Python 3.7 thanks to PEP 538 and PEP 540.
4012     """
4013     try:
4014         from click import core
4015         from click import _unicodefun  # type: ignore
4016     except ModuleNotFoundError:
4017         return
4018
4019     for module in (core, _unicodefun):
4020         if hasattr(module, "_verify_python3_env"):
4021             module._verify_python3_env = lambda: None
4022
4023
4024 def patched_main() -> None:
4025     freeze_support()
4026     patch_click()
4027     main()
4028
4029
4030 if __name__ == "__main__":
4031     patched_main()