]> git.madduck.net Git - etc/vim.git/blob - black.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

9938b37f6dfe61b219442056c17b10ce1183ecf9
[etc/vim.git] / black.py
1 import ast
2 import asyncio
3 from concurrent.futures import Executor, ProcessPoolExecutor
4 from contextlib import contextmanager
5 from datetime import datetime
6 from enum import Enum
7 from functools import lru_cache, partial, wraps
8 import io
9 import itertools
10 import logging
11 from multiprocessing import Manager, freeze_support
12 import os
13 from pathlib import Path
14 import pickle
15 import re
16 import signal
17 import sys
18 import tempfile
19 import tokenize
20 import traceback
21 from typing import (
22     Any,
23     Callable,
24     Collection,
25     Dict,
26     Generator,
27     Generic,
28     Iterable,
29     Iterator,
30     List,
31     Optional,
32     Pattern,
33     Sequence,
34     Set,
35     Tuple,
36     TypeVar,
37     Union,
38     cast,
39 )
40
41 from appdirs import user_cache_dir
42 from attr import dataclass, evolve, Factory
43 import click
44 import toml
45 from typed_ast import ast3, ast27
46
47 # lib2to3 fork
48 from blib2to3.pytree import Node, Leaf, type_repr
49 from blib2to3 import pygram, pytree
50 from blib2to3.pgen2 import driver, token
51 from blib2to3.pgen2.grammar import Grammar
52 from blib2to3.pgen2.parse import ParseError
53
54
55 __version__ = "19.3b0"
56 DEFAULT_LINE_LENGTH = 88
57 DEFAULT_EXCLUDES = (
58     r"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|_build|buck-out|build|dist)/"
59 )
60 DEFAULT_INCLUDES = r"\.pyi?$"
61 CACHE_DIR = Path(user_cache_dir("black", version=__version__))
62
63
64 # types
65 FileContent = str
66 Encoding = str
67 NewLine = str
68 Depth = int
69 NodeType = int
70 LeafID = int
71 Priority = int
72 Index = int
73 LN = Union[Leaf, Node]
74 SplitFunc = Callable[["Line", Collection["Feature"]], Iterator["Line"]]
75 Timestamp = float
76 FileSize = int
77 CacheInfo = Tuple[Timestamp, FileSize]
78 Cache = Dict[Path, CacheInfo]
79 out = partial(click.secho, bold=True, err=True)
80 err = partial(click.secho, fg="red", err=True)
81
82 pygram.initialize(CACHE_DIR)
83 syms = pygram.python_symbols
84
85
86 class NothingChanged(UserWarning):
87     """Raised when reformatted code is the same as source."""
88
89
90 class CannotSplit(Exception):
91     """A readable split that fits the allotted line length is impossible."""
92
93
94 class InvalidInput(ValueError):
95     """Raised when input source code fails all parse attempts."""
96
97
98 class WriteBack(Enum):
99     NO = 0
100     YES = 1
101     DIFF = 2
102     CHECK = 3
103
104     @classmethod
105     def from_configuration(cls, *, check: bool, diff: bool) -> "WriteBack":
106         if check and not diff:
107             return cls.CHECK
108
109         return cls.DIFF if diff else cls.YES
110
111
112 class Changed(Enum):
113     NO = 0
114     CACHED = 1
115     YES = 2
116
117
118 class TargetVersion(Enum):
119     PY27 = 2
120     PY33 = 3
121     PY34 = 4
122     PY35 = 5
123     PY36 = 6
124     PY37 = 7
125     PY38 = 8
126
127     def is_python2(self) -> bool:
128         return self is TargetVersion.PY27
129
130
131 PY36_VERSIONS = {TargetVersion.PY36, TargetVersion.PY37, TargetVersion.PY38}
132
133
134 class Feature(Enum):
135     # All string literals are unicode
136     UNICODE_LITERALS = 1
137     F_STRINGS = 2
138     NUMERIC_UNDERSCORES = 3
139     TRAILING_COMMA_IN_CALL = 4
140     TRAILING_COMMA_IN_DEF = 5
141     # The following two feature-flags are mutually exclusive, and exactly one should be
142     # set for every version of python.
143     ASYNC_IDENTIFIERS = 6
144     ASYNC_KEYWORDS = 7
145     ASSIGNMENT_EXPRESSIONS = 8
146
147
148 VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
149     TargetVersion.PY27: {Feature.ASYNC_IDENTIFIERS},
150     TargetVersion.PY33: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
151     TargetVersion.PY34: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
152     TargetVersion.PY35: {
153         Feature.UNICODE_LITERALS,
154         Feature.TRAILING_COMMA_IN_CALL,
155         Feature.ASYNC_IDENTIFIERS,
156     },
157     TargetVersion.PY36: {
158         Feature.UNICODE_LITERALS,
159         Feature.F_STRINGS,
160         Feature.NUMERIC_UNDERSCORES,
161         Feature.TRAILING_COMMA_IN_CALL,
162         Feature.TRAILING_COMMA_IN_DEF,
163         Feature.ASYNC_IDENTIFIERS,
164     },
165     TargetVersion.PY37: {
166         Feature.UNICODE_LITERALS,
167         Feature.F_STRINGS,
168         Feature.NUMERIC_UNDERSCORES,
169         Feature.TRAILING_COMMA_IN_CALL,
170         Feature.TRAILING_COMMA_IN_DEF,
171         Feature.ASYNC_KEYWORDS,
172     },
173     TargetVersion.PY38: {
174         Feature.UNICODE_LITERALS,
175         Feature.F_STRINGS,
176         Feature.NUMERIC_UNDERSCORES,
177         Feature.TRAILING_COMMA_IN_CALL,
178         Feature.TRAILING_COMMA_IN_DEF,
179         Feature.ASYNC_KEYWORDS,
180         Feature.ASSIGNMENT_EXPRESSIONS,
181     },
182 }
183
184
185 @dataclass
186 class FileMode:
187     target_versions: Set[TargetVersion] = Factory(set)
188     line_length: int = DEFAULT_LINE_LENGTH
189     string_normalization: bool = True
190     is_pyi: bool = False
191
192     def get_cache_key(self) -> str:
193         if self.target_versions:
194             version_str = ",".join(
195                 str(version.value)
196                 for version in sorted(self.target_versions, key=lambda v: v.value)
197             )
198         else:
199             version_str = "-"
200         parts = [
201             version_str,
202             str(self.line_length),
203             str(int(self.string_normalization)),
204             str(int(self.is_pyi)),
205         ]
206         return ".".join(parts)
207
208
209 def supports_feature(target_versions: Set[TargetVersion], feature: Feature) -> bool:
210     return all(feature in VERSION_TO_FEATURES[version] for version in target_versions)
211
212
213 def read_pyproject_toml(
214     ctx: click.Context, param: click.Parameter, value: Union[str, int, bool, None]
215 ) -> Optional[str]:
216     """Inject Black configuration from "pyproject.toml" into defaults in `ctx`.
217
218     Returns the path to a successfully found and read configuration file, None
219     otherwise.
220     """
221     assert not isinstance(value, (int, bool)), "Invalid parameter type passed"
222     if not value:
223         root = find_project_root(ctx.params.get("src", ()))
224         path = root / "pyproject.toml"
225         if path.is_file():
226             value = str(path)
227         else:
228             return None
229
230     try:
231         pyproject_toml = toml.load(value)
232         config = pyproject_toml.get("tool", {}).get("black", {})
233     except (toml.TomlDecodeError, OSError) as e:
234         raise click.FileError(
235             filename=value, hint=f"Error reading configuration file: {e}"
236         )
237
238     if not config:
239         return None
240
241     if ctx.default_map is None:
242         ctx.default_map = {}
243     ctx.default_map.update(  # type: ignore  # bad types in .pyi
244         {k.replace("--", "").replace("-", "_"): v for k, v in config.items()}
245     )
246     return value
247
248
249 @click.command(context_settings=dict(help_option_names=["-h", "--help"]))
250 @click.option("-c", "--code", type=str, help="Format the code passed in as a string.")
251 @click.option(
252     "-l",
253     "--line-length",
254     type=int,
255     default=DEFAULT_LINE_LENGTH,
256     help="How many characters per line to allow.",
257     show_default=True,
258 )
259 @click.option(
260     "-t",
261     "--target-version",
262     type=click.Choice([v.name.lower() for v in TargetVersion]),
263     callback=lambda c, p, v: [TargetVersion[val.upper()] for val in v],
264     multiple=True,
265     help=(
266         "Python versions that should be supported by Black's output. [default: "
267         "per-file auto-detection]"
268     ),
269 )
270 @click.option(
271     "--py36",
272     is_flag=True,
273     help=(
274         "Allow using Python 3.6-only syntax on all input files.  This will put "
275         "trailing commas in function signatures and calls also after *args and "
276         "**kwargs. Deprecated; use --target-version instead. "
277         "[default: per-file auto-detection]"
278     ),
279 )
280 @click.option(
281     "--pyi",
282     is_flag=True,
283     help=(
284         "Format all input files like typing stubs regardless of file extension "
285         "(useful when piping source on standard input)."
286     ),
287 )
288 @click.option(
289     "-S",
290     "--skip-string-normalization",
291     is_flag=True,
292     help="Don't normalize string quotes or prefixes.",
293 )
294 @click.option(
295     "--check",
296     is_flag=True,
297     help=(
298         "Don't write the files back, just return the status.  Return code 0 "
299         "means nothing would change.  Return code 1 means some files would be "
300         "reformatted.  Return code 123 means there was an internal error."
301     ),
302 )
303 @click.option(
304     "--diff",
305     is_flag=True,
306     help="Don't write the files back, just output a diff for each file on stdout.",
307 )
308 @click.option(
309     "--fast/--safe",
310     is_flag=True,
311     help="If --fast given, skip temporary sanity checks. [default: --safe]",
312 )
313 @click.option(
314     "--include",
315     type=str,
316     default=DEFAULT_INCLUDES,
317     help=(
318         "A regular expression that matches files and directories that should be "
319         "included on recursive searches.  An empty value means all files are "
320         "included regardless of the name.  Use forward slashes for directories on "
321         "all platforms (Windows, too).  Exclusions are calculated first, inclusions "
322         "later."
323     ),
324     show_default=True,
325 )
326 @click.option(
327     "--exclude",
328     type=str,
329     default=DEFAULT_EXCLUDES,
330     help=(
331         "A regular expression that matches files and directories that should be "
332         "excluded on recursive searches.  An empty value means no paths are excluded. "
333         "Use forward slashes for directories on all platforms (Windows, too).  "
334         "Exclusions are calculated first, inclusions later."
335     ),
336     show_default=True,
337 )
338 @click.option(
339     "-q",
340     "--quiet",
341     is_flag=True,
342     help=(
343         "Don't emit non-error messages to stderr. Errors are still emitted; "
344         "silence those with 2>/dev/null."
345     ),
346 )
347 @click.option(
348     "-v",
349     "--verbose",
350     is_flag=True,
351     help=(
352         "Also emit messages to stderr about files that were not changed or were "
353         "ignored due to --exclude=."
354     ),
355 )
356 @click.version_option(version=__version__)
357 @click.argument(
358     "src",
359     nargs=-1,
360     type=click.Path(
361         exists=True, file_okay=True, dir_okay=True, readable=True, allow_dash=True
362     ),
363     is_eager=True,
364 )
365 @click.option(
366     "--config",
367     type=click.Path(
368         exists=False, file_okay=True, dir_okay=False, readable=True, allow_dash=False
369     ),
370     is_eager=True,
371     callback=read_pyproject_toml,
372     help="Read configuration from PATH.",
373 )
374 @click.pass_context
375 def main(
376     ctx: click.Context,
377     code: Optional[str],
378     line_length: int,
379     target_version: List[TargetVersion],
380     check: bool,
381     diff: bool,
382     fast: bool,
383     pyi: bool,
384     py36: bool,
385     skip_string_normalization: bool,
386     quiet: bool,
387     verbose: bool,
388     include: str,
389     exclude: str,
390     src: Tuple[str],
391     config: Optional[str],
392 ) -> None:
393     """The uncompromising code formatter."""
394     write_back = WriteBack.from_configuration(check=check, diff=diff)
395     if target_version:
396         if py36:
397             err(f"Cannot use both --target-version and --py36")
398             ctx.exit(2)
399         else:
400             versions = set(target_version)
401     elif py36:
402         err(
403             "--py36 is deprecated and will be removed in a future version. "
404             "Use --target-version py36 instead."
405         )
406         versions = PY36_VERSIONS
407     else:
408         # We'll autodetect later.
409         versions = set()
410     mode = FileMode(
411         target_versions=versions,
412         line_length=line_length,
413         is_pyi=pyi,
414         string_normalization=not skip_string_normalization,
415     )
416     if config and verbose:
417         out(f"Using configuration from {config}.", bold=False, fg="blue")
418     if code is not None:
419         print(format_str(code, mode=mode))
420         ctx.exit(0)
421     try:
422         include_regex = re_compile_maybe_verbose(include)
423     except re.error:
424         err(f"Invalid regular expression for include given: {include!r}")
425         ctx.exit(2)
426     try:
427         exclude_regex = re_compile_maybe_verbose(exclude)
428     except re.error:
429         err(f"Invalid regular expression for exclude given: {exclude!r}")
430         ctx.exit(2)
431     report = Report(check=check, quiet=quiet, verbose=verbose)
432     root = find_project_root(src)
433     sources: Set[Path] = set()
434     for s in src:
435         p = Path(s)
436         if p.is_dir():
437             sources.update(
438                 gen_python_files_in_dir(p, root, include_regex, exclude_regex, report)
439             )
440         elif p.is_file() or s == "-":
441             # if a file was explicitly given, we don't care about its extension
442             sources.add(p)
443         else:
444             err(f"invalid path: {s}")
445     if len(sources) == 0:
446         if verbose or not quiet:
447             out("No paths given. Nothing to do 😴")
448         ctx.exit(0)
449
450     if len(sources) == 1:
451         reformat_one(
452             src=sources.pop(),
453             fast=fast,
454             write_back=write_back,
455             mode=mode,
456             report=report,
457         )
458     else:
459         reformat_many(
460             sources=sources, fast=fast, write_back=write_back, mode=mode, report=report
461         )
462
463     if verbose or not quiet:
464         out("Oh no! 💥 💔 💥" if report.return_code else "All done! ✨ 🍰 ✨")
465         click.secho(str(report), err=True)
466     ctx.exit(report.return_code)
467
468
469 def reformat_one(
470     src: Path, fast: bool, write_back: WriteBack, mode: FileMode, report: "Report"
471 ) -> None:
472     """Reformat a single file under `src` without spawning child processes.
473
474     `fast`, `write_back`, and `mode` options are passed to
475     :func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
476     """
477     try:
478         changed = Changed.NO
479         if not src.is_file() and str(src) == "-":
480             if format_stdin_to_stdout(fast=fast, write_back=write_back, mode=mode):
481                 changed = Changed.YES
482         else:
483             cache: Cache = {}
484             if write_back != WriteBack.DIFF:
485                 cache = read_cache(mode)
486                 res_src = src.resolve()
487                 if res_src in cache and cache[res_src] == get_cache_info(res_src):
488                     changed = Changed.CACHED
489             if changed is not Changed.CACHED and format_file_in_place(
490                 src, fast=fast, write_back=write_back, mode=mode
491             ):
492                 changed = Changed.YES
493             if (write_back is WriteBack.YES and changed is not Changed.CACHED) or (
494                 write_back is WriteBack.CHECK and changed is Changed.NO
495             ):
496                 write_cache(cache, [src], mode)
497         report.done(src, changed)
498     except Exception as exc:
499         report.failed(src, str(exc))
500
501
502 def reformat_many(
503     sources: Set[Path],
504     fast: bool,
505     write_back: WriteBack,
506     mode: FileMode,
507     report: "Report",
508 ) -> None:
509     """Reformat multiple files using a ProcessPoolExecutor."""
510     loop = asyncio.get_event_loop()
511     worker_count = os.cpu_count()
512     if sys.platform == "win32":
513         # Work around https://bugs.python.org/issue26903
514         worker_count = min(worker_count, 61)
515     executor = ProcessPoolExecutor(max_workers=worker_count)
516     try:
517         loop.run_until_complete(
518             schedule_formatting(
519                 sources=sources,
520                 fast=fast,
521                 write_back=write_back,
522                 mode=mode,
523                 report=report,
524                 loop=loop,
525                 executor=executor,
526             )
527         )
528     finally:
529         shutdown(loop)
530         executor.shutdown()
531
532
533 async def schedule_formatting(
534     sources: Set[Path],
535     fast: bool,
536     write_back: WriteBack,
537     mode: FileMode,
538     report: "Report",
539     loop: asyncio.AbstractEventLoop,
540     executor: Executor,
541 ) -> None:
542     """Run formatting of `sources` in parallel using the provided `executor`.
543
544     (Use ProcessPoolExecutors for actual parallelism.)
545
546     `write_back`, `fast`, and `mode` options are passed to
547     :func:`format_file_in_place`.
548     """
549     cache: Cache = {}
550     if write_back != WriteBack.DIFF:
551         cache = read_cache(mode)
552         sources, cached = filter_cached(cache, sources)
553         for src in sorted(cached):
554             report.done(src, Changed.CACHED)
555     if not sources:
556         return
557
558     cancelled = []
559     sources_to_cache = []
560     lock = None
561     if write_back == WriteBack.DIFF:
562         # For diff output, we need locks to ensure we don't interleave output
563         # from different processes.
564         manager = Manager()
565         lock = manager.Lock()
566     tasks = {
567         asyncio.ensure_future(
568             loop.run_in_executor(
569                 executor, format_file_in_place, src, fast, mode, write_back, lock
570             )
571         ): src
572         for src in sorted(sources)
573     }
574     pending: Iterable[asyncio.Future] = tasks.keys()
575     try:
576         loop.add_signal_handler(signal.SIGINT, cancel, pending)
577         loop.add_signal_handler(signal.SIGTERM, cancel, pending)
578     except NotImplementedError:
579         # There are no good alternatives for these on Windows.
580         pass
581     while pending:
582         done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
583         for task in done:
584             src = tasks.pop(task)
585             if task.cancelled():
586                 cancelled.append(task)
587             elif task.exception():
588                 report.failed(src, str(task.exception()))
589             else:
590                 changed = Changed.YES if task.result() else Changed.NO
591                 # If the file was written back or was successfully checked as
592                 # well-formatted, store this information in the cache.
593                 if write_back is WriteBack.YES or (
594                     write_back is WriteBack.CHECK and changed is Changed.NO
595                 ):
596                     sources_to_cache.append(src)
597                 report.done(src, changed)
598     if cancelled:
599         await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
600     if sources_to_cache:
601         write_cache(cache, sources_to_cache, mode)
602
603
604 def format_file_in_place(
605     src: Path,
606     fast: bool,
607     mode: FileMode,
608     write_back: WriteBack = WriteBack.NO,
609     lock: Any = None,  # multiprocessing.Manager().Lock() is some crazy proxy
610 ) -> bool:
611     """Format file under `src` path. Return True if changed.
612
613     If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted
614     code to the file.
615     `mode` and `fast` options are passed to :func:`format_file_contents`.
616     """
617     if src.suffix == ".pyi":
618         mode = evolve(mode, is_pyi=True)
619
620     then = datetime.utcfromtimestamp(src.stat().st_mtime)
621     with open(src, "rb") as buf:
622         src_contents, encoding, newline = decode_bytes(buf.read())
623     try:
624         dst_contents = format_file_contents(src_contents, fast=fast, mode=mode)
625     except NothingChanged:
626         return False
627
628     if write_back == write_back.YES:
629         with open(src, "w", encoding=encoding, newline=newline) as f:
630             f.write(dst_contents)
631     elif write_back == write_back.DIFF:
632         now = datetime.utcnow()
633         src_name = f"{src}\t{then} +0000"
634         dst_name = f"{src}\t{now} +0000"
635         diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
636
637         with lock or nullcontext():
638             f = io.TextIOWrapper(
639                 sys.stdout.buffer,
640                 encoding=encoding,
641                 newline=newline,
642                 write_through=True,
643             )
644             f.write(diff_contents)
645             f.detach()
646
647     return True
648
649
650 def format_stdin_to_stdout(
651     fast: bool, *, write_back: WriteBack = WriteBack.NO, mode: FileMode
652 ) -> bool:
653     """Format file on stdin. Return True if changed.
654
655     If `write_back` is YES, write reformatted code back to stdout. If it is DIFF,
656     write a diff to stdout. The `mode` argument is passed to
657     :func:`format_file_contents`.
658     """
659     then = datetime.utcnow()
660     src, encoding, newline = decode_bytes(sys.stdin.buffer.read())
661     dst = src
662     try:
663         dst = format_file_contents(src, fast=fast, mode=mode)
664         return True
665
666     except NothingChanged:
667         return False
668
669     finally:
670         f = io.TextIOWrapper(
671             sys.stdout.buffer, encoding=encoding, newline=newline, write_through=True
672         )
673         if write_back == WriteBack.YES:
674             f.write(dst)
675         elif write_back == WriteBack.DIFF:
676             now = datetime.utcnow()
677             src_name = f"STDIN\t{then} +0000"
678             dst_name = f"STDOUT\t{now} +0000"
679             f.write(diff(src, dst, src_name, dst_name))
680         f.detach()
681
682
683 def format_file_contents(
684     src_contents: str, *, fast: bool, mode: FileMode
685 ) -> FileContent:
686     """Reformat contents a file and return new contents.
687
688     If `fast` is False, additionally confirm that the reformatted code is
689     valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
690     `mode` is passed to :func:`format_str`.
691     """
692     if src_contents.strip() == "":
693         raise NothingChanged
694
695     dst_contents = format_str(src_contents, mode=mode)
696     if src_contents == dst_contents:
697         raise NothingChanged
698
699     if not fast:
700         assert_equivalent(src_contents, dst_contents)
701         assert_stable(src_contents, dst_contents, mode=mode)
702     return dst_contents
703
704
705 def format_str(src_contents: str, *, mode: FileMode) -> FileContent:
706     """Reformat a string and return new contents.
707
708     `mode` determines formatting options, such as how many characters per line are
709     allowed.
710     """
711     src_node = lib2to3_parse(src_contents.lstrip(), mode.target_versions)
712     dst_contents = []
713     future_imports = get_future_imports(src_node)
714     if mode.target_versions:
715         versions = mode.target_versions
716     else:
717         versions = detect_target_versions(src_node)
718     normalize_fmt_off(src_node)
719     lines = LineGenerator(
720         remove_u_prefix="unicode_literals" in future_imports
721         or supports_feature(versions, Feature.UNICODE_LITERALS),
722         is_pyi=mode.is_pyi,
723         normalize_strings=mode.string_normalization,
724     )
725     elt = EmptyLineTracker(is_pyi=mode.is_pyi)
726     empty_line = Line()
727     after = 0
728     split_line_features = {
729         feature
730         for feature in {Feature.TRAILING_COMMA_IN_CALL, Feature.TRAILING_COMMA_IN_DEF}
731         if supports_feature(versions, feature)
732     }
733     for current_line in lines.visit(src_node):
734         for _ in range(after):
735             dst_contents.append(str(empty_line))
736         before, after = elt.maybe_empty_lines(current_line)
737         for _ in range(before):
738             dst_contents.append(str(empty_line))
739         for line in split_line(
740             current_line, line_length=mode.line_length, features=split_line_features
741         ):
742             dst_contents.append(str(line))
743     return "".join(dst_contents)
744
745
746 def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine]:
747     """Return a tuple of (decoded_contents, encoding, newline).
748
749     `newline` is either CRLF or LF but `decoded_contents` is decoded with
750     universal newlines (i.e. only contains LF).
751     """
752     srcbuf = io.BytesIO(src)
753     encoding, lines = tokenize.detect_encoding(srcbuf.readline)
754     if not lines:
755         return "", encoding, "\n"
756
757     newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
758     srcbuf.seek(0)
759     with io.TextIOWrapper(srcbuf, encoding) as tiow:
760         return tiow.read(), encoding, newline
761
762
763 def get_grammars(target_versions: Set[TargetVersion]) -> List[Grammar]:
764     if not target_versions:
765         # No target_version specified, so try all grammars.
766         return [
767             # Python 3.7+
768             pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords,
769             # Python 3.0-3.6
770             pygram.python_grammar_no_print_statement_no_exec_statement,
771             # Python 2.7 with future print_function import
772             pygram.python_grammar_no_print_statement,
773             # Python 2.7
774             pygram.python_grammar,
775         ]
776     elif all(version.is_python2() for version in target_versions):
777         # Python 2-only code, so try Python 2 grammars.
778         return [
779             # Python 2.7 with future print_function import
780             pygram.python_grammar_no_print_statement,
781             # Python 2.7
782             pygram.python_grammar,
783         ]
784     else:
785         # Python 3-compatible code, so only try Python 3 grammar.
786         grammars = []
787         # If we have to parse both, try to parse async as a keyword first
788         if not supports_feature(target_versions, Feature.ASYNC_IDENTIFIERS):
789             # Python 3.7+
790             grammars.append(
791                 pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords  # noqa: B950
792             )
793         if not supports_feature(target_versions, Feature.ASYNC_KEYWORDS):
794             # Python 3.0-3.6
795             grammars.append(pygram.python_grammar_no_print_statement_no_exec_statement)
796         # At least one of the above branches must have been taken, because every Python
797         # version has exactly one of the two 'ASYNC_*' flags
798         return grammars
799
800
801 def lib2to3_parse(src_txt: str, target_versions: Iterable[TargetVersion] = ()) -> Node:
802     """Given a string with source, return the lib2to3 Node."""
803     if src_txt[-1:] != "\n":
804         src_txt += "\n"
805
806     for grammar in get_grammars(set(target_versions)):
807         drv = driver.Driver(grammar, pytree.convert)
808         try:
809             result = drv.parse_string(src_txt, True)
810             break
811
812         except ParseError as pe:
813             lineno, column = pe.context[1]
814             lines = src_txt.splitlines()
815             try:
816                 faulty_line = lines[lineno - 1]
817             except IndexError:
818                 faulty_line = "<line number missing in source>"
819             exc = InvalidInput(f"Cannot parse: {lineno}:{column}: {faulty_line}")
820     else:
821         raise exc from None
822
823     if isinstance(result, Leaf):
824         result = Node(syms.file_input, [result])
825     return result
826
827
828 def lib2to3_unparse(node: Node) -> str:
829     """Given a lib2to3 node, return its string representation."""
830     code = str(node)
831     return code
832
833
834 T = TypeVar("T")
835
836
837 class Visitor(Generic[T]):
838     """Basic lib2to3 visitor that yields things of type `T` on `visit()`."""
839
840     def visit(self, node: LN) -> Iterator[T]:
841         """Main method to visit `node` and its children.
842
843         It tries to find a `visit_*()` method for the given `node.type`, like
844         `visit_simple_stmt` for Node objects or `visit_INDENT` for Leaf objects.
845         If no dedicated `visit_*()` method is found, chooses `visit_default()`
846         instead.
847
848         Then yields objects of type `T` from the selected visitor.
849         """
850         if node.type < 256:
851             name = token.tok_name[node.type]
852         else:
853             name = type_repr(node.type)
854         yield from getattr(self, f"visit_{name}", self.visit_default)(node)
855
856     def visit_default(self, node: LN) -> Iterator[T]:
857         """Default `visit_*()` implementation. Recurses to children of `node`."""
858         if isinstance(node, Node):
859             for child in node.children:
860                 yield from self.visit(child)
861
862
863 @dataclass
864 class DebugVisitor(Visitor[T]):
865     tree_depth: int = 0
866
867     def visit_default(self, node: LN) -> Iterator[T]:
868         indent = " " * (2 * self.tree_depth)
869         if isinstance(node, Node):
870             _type = type_repr(node.type)
871             out(f"{indent}{_type}", fg="yellow")
872             self.tree_depth += 1
873             for child in node.children:
874                 yield from self.visit(child)
875
876             self.tree_depth -= 1
877             out(f"{indent}/{_type}", fg="yellow", bold=False)
878         else:
879             _type = token.tok_name.get(node.type, str(node.type))
880             out(f"{indent}{_type}", fg="blue", nl=False)
881             if node.prefix:
882                 # We don't have to handle prefixes for `Node` objects since
883                 # that delegates to the first child anyway.
884                 out(f" {node.prefix!r}", fg="green", bold=False, nl=False)
885             out(f" {node.value!r}", fg="blue", bold=False)
886
887     @classmethod
888     def show(cls, code: Union[str, Leaf, Node]) -> None:
889         """Pretty-print the lib2to3 AST of a given string of `code`.
890
891         Convenience method for debugging.
892         """
893         v: DebugVisitor[None] = DebugVisitor()
894         if isinstance(code, str):
895             code = lib2to3_parse(code)
896         list(v.visit(code))
897
898
899 WHITESPACE = {token.DEDENT, token.INDENT, token.NEWLINE}
900 STATEMENT = {
901     syms.if_stmt,
902     syms.while_stmt,
903     syms.for_stmt,
904     syms.try_stmt,
905     syms.except_clause,
906     syms.with_stmt,
907     syms.funcdef,
908     syms.classdef,
909 }
910 STANDALONE_COMMENT = 153
911 token.tok_name[STANDALONE_COMMENT] = "STANDALONE_COMMENT"
912 LOGIC_OPERATORS = {"and", "or"}
913 COMPARATORS = {
914     token.LESS,
915     token.GREATER,
916     token.EQEQUAL,
917     token.NOTEQUAL,
918     token.LESSEQUAL,
919     token.GREATEREQUAL,
920 }
921 MATH_OPERATORS = {
922     token.VBAR,
923     token.CIRCUMFLEX,
924     token.AMPER,
925     token.LEFTSHIFT,
926     token.RIGHTSHIFT,
927     token.PLUS,
928     token.MINUS,
929     token.STAR,
930     token.SLASH,
931     token.DOUBLESLASH,
932     token.PERCENT,
933     token.AT,
934     token.TILDE,
935     token.DOUBLESTAR,
936 }
937 STARS = {token.STAR, token.DOUBLESTAR}
938 VARARGS_PARENTS = {
939     syms.arglist,
940     syms.argument,  # double star in arglist
941     syms.trailer,  # single argument to call
942     syms.typedargslist,
943     syms.varargslist,  # lambdas
944 }
945 UNPACKING_PARENTS = {
946     syms.atom,  # single element of a list or set literal
947     syms.dictsetmaker,
948     syms.listmaker,
949     syms.testlist_gexp,
950     syms.testlist_star_expr,
951 }
952 TEST_DESCENDANTS = {
953     syms.test,
954     syms.lambdef,
955     syms.or_test,
956     syms.and_test,
957     syms.not_test,
958     syms.comparison,
959     syms.star_expr,
960     syms.expr,
961     syms.xor_expr,
962     syms.and_expr,
963     syms.shift_expr,
964     syms.arith_expr,
965     syms.trailer,
966     syms.term,
967     syms.power,
968 }
969 ASSIGNMENTS = {
970     "=",
971     "+=",
972     "-=",
973     "*=",
974     "@=",
975     "/=",
976     "%=",
977     "&=",
978     "|=",
979     "^=",
980     "<<=",
981     ">>=",
982     "**=",
983     "//=",
984 }
985 COMPREHENSION_PRIORITY = 20
986 COMMA_PRIORITY = 18
987 TERNARY_PRIORITY = 16
988 LOGIC_PRIORITY = 14
989 STRING_PRIORITY = 12
990 COMPARATOR_PRIORITY = 10
991 MATH_PRIORITIES = {
992     token.VBAR: 9,
993     token.CIRCUMFLEX: 8,
994     token.AMPER: 7,
995     token.LEFTSHIFT: 6,
996     token.RIGHTSHIFT: 6,
997     token.PLUS: 5,
998     token.MINUS: 5,
999     token.STAR: 4,
1000     token.SLASH: 4,
1001     token.DOUBLESLASH: 4,
1002     token.PERCENT: 4,
1003     token.AT: 4,
1004     token.TILDE: 3,
1005     token.DOUBLESTAR: 2,
1006 }
1007 DOT_PRIORITY = 1
1008
1009
1010 @dataclass
1011 class BracketTracker:
1012     """Keeps track of brackets on a line."""
1013
1014     depth: int = 0
1015     bracket_match: Dict[Tuple[Depth, NodeType], Leaf] = Factory(dict)
1016     delimiters: Dict[LeafID, Priority] = Factory(dict)
1017     previous: Optional[Leaf] = None
1018     _for_loop_depths: List[int] = Factory(list)
1019     _lambda_argument_depths: List[int] = Factory(list)
1020
1021     def mark(self, leaf: Leaf) -> None:
1022         """Mark `leaf` with bracket-related metadata. Keep track of delimiters.
1023
1024         All leaves receive an int `bracket_depth` field that stores how deep
1025         within brackets a given leaf is. 0 means there are no enclosing brackets
1026         that started on this line.
1027
1028         If a leaf is itself a closing bracket, it receives an `opening_bracket`
1029         field that it forms a pair with. This is a one-directional link to
1030         avoid reference cycles.
1031
1032         If a leaf is a delimiter (a token on which Black can split the line if
1033         needed) and it's on depth 0, its `id()` is stored in the tracker's
1034         `delimiters` field.
1035         """
1036         if leaf.type == token.COMMENT:
1037             return
1038
1039         self.maybe_decrement_after_for_loop_variable(leaf)
1040         self.maybe_decrement_after_lambda_arguments(leaf)
1041         if leaf.type in CLOSING_BRACKETS:
1042             self.depth -= 1
1043             opening_bracket = self.bracket_match.pop((self.depth, leaf.type))
1044             leaf.opening_bracket = opening_bracket
1045         leaf.bracket_depth = self.depth
1046         if self.depth == 0:
1047             delim = is_split_before_delimiter(leaf, self.previous)
1048             if delim and self.previous is not None:
1049                 self.delimiters[id(self.previous)] = delim
1050             else:
1051                 delim = is_split_after_delimiter(leaf, self.previous)
1052                 if delim:
1053                     self.delimiters[id(leaf)] = delim
1054         if leaf.type in OPENING_BRACKETS:
1055             self.bracket_match[self.depth, BRACKET[leaf.type]] = leaf
1056             self.depth += 1
1057         self.previous = leaf
1058         self.maybe_increment_lambda_arguments(leaf)
1059         self.maybe_increment_for_loop_variable(leaf)
1060
1061     def any_open_brackets(self) -> bool:
1062         """Return True if there is an yet unmatched open bracket on the line."""
1063         return bool(self.bracket_match)
1064
1065     def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> Priority:
1066         """Return the highest priority of a delimiter found on the line.
1067
1068         Values are consistent with what `is_split_*_delimiter()` return.
1069         Raises ValueError on no delimiters.
1070         """
1071         return max(v for k, v in self.delimiters.items() if k not in exclude)
1072
1073     def delimiter_count_with_priority(self, priority: Priority = 0) -> int:
1074         """Return the number of delimiters with the given `priority`.
1075
1076         If no `priority` is passed, defaults to max priority on the line.
1077         """
1078         if not self.delimiters:
1079             return 0
1080
1081         priority = priority or self.max_delimiter_priority()
1082         return sum(1 for p in self.delimiters.values() if p == priority)
1083
1084     def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
1085         """In a for loop, or comprehension, the variables are often unpacks.
1086
1087         To avoid splitting on the comma in this situation, increase the depth of
1088         tokens between `for` and `in`.
1089         """
1090         if leaf.type == token.NAME and leaf.value == "for":
1091             self.depth += 1
1092             self._for_loop_depths.append(self.depth)
1093             return True
1094
1095         return False
1096
1097     def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
1098         """See `maybe_increment_for_loop_variable` above for explanation."""
1099         if (
1100             self._for_loop_depths
1101             and self._for_loop_depths[-1] == self.depth
1102             and leaf.type == token.NAME
1103             and leaf.value == "in"
1104         ):
1105             self.depth -= 1
1106             self._for_loop_depths.pop()
1107             return True
1108
1109         return False
1110
1111     def maybe_increment_lambda_arguments(self, leaf: Leaf) -> bool:
1112         """In a lambda expression, there might be more than one argument.
1113
1114         To avoid splitting on the comma in this situation, increase the depth of
1115         tokens between `lambda` and `:`.
1116         """
1117         if leaf.type == token.NAME and leaf.value == "lambda":
1118             self.depth += 1
1119             self._lambda_argument_depths.append(self.depth)
1120             return True
1121
1122         return False
1123
1124     def maybe_decrement_after_lambda_arguments(self, leaf: Leaf) -> bool:
1125         """See `maybe_increment_lambda_arguments` above for explanation."""
1126         if (
1127             self._lambda_argument_depths
1128             and self._lambda_argument_depths[-1] == self.depth
1129             and leaf.type == token.COLON
1130         ):
1131             self.depth -= 1
1132             self._lambda_argument_depths.pop()
1133             return True
1134
1135         return False
1136
1137     def get_open_lsqb(self) -> Optional[Leaf]:
1138         """Return the most recent opening square bracket (if any)."""
1139         return self.bracket_match.get((self.depth - 1, token.RSQB))
1140
1141
1142 @dataclass
1143 class Line:
1144     """Holds leaves and comments. Can be printed with `str(line)`."""
1145
1146     depth: int = 0
1147     leaves: List[Leaf] = Factory(list)
1148     comments: Dict[LeafID, List[Leaf]] = Factory(dict)  # keys ordered like `leaves`
1149     bracket_tracker: BracketTracker = Factory(BracketTracker)
1150     inside_brackets: bool = False
1151     should_explode: bool = False
1152
1153     def append(self, leaf: Leaf, preformatted: bool = False) -> None:
1154         """Add a new `leaf` to the end of the line.
1155
1156         Unless `preformatted` is True, the `leaf` will receive a new consistent
1157         whitespace prefix and metadata applied by :class:`BracketTracker`.
1158         Trailing commas are maybe removed, unpacked for loop variables are
1159         demoted from being delimiters.
1160
1161         Inline comments are put aside.
1162         """
1163         has_value = leaf.type in BRACKETS or bool(leaf.value.strip())
1164         if not has_value:
1165             return
1166
1167         if token.COLON == leaf.type and self.is_class_paren_empty:
1168             del self.leaves[-2:]
1169         if self.leaves and not preformatted:
1170             # Note: at this point leaf.prefix should be empty except for
1171             # imports, for which we only preserve newlines.
1172             leaf.prefix += whitespace(
1173                 leaf, complex_subscript=self.is_complex_subscript(leaf)
1174             )
1175         if self.inside_brackets or not preformatted:
1176             self.bracket_tracker.mark(leaf)
1177             self.maybe_remove_trailing_comma(leaf)
1178         if not self.append_comment(leaf):
1179             self.leaves.append(leaf)
1180
1181     def append_safe(self, leaf: Leaf, preformatted: bool = False) -> None:
1182         """Like :func:`append()` but disallow invalid standalone comment structure.
1183
1184         Raises ValueError when any `leaf` is appended after a standalone comment
1185         or when a standalone comment is not the first leaf on the line.
1186         """
1187         if self.bracket_tracker.depth == 0:
1188             if self.is_comment:
1189                 raise ValueError("cannot append to standalone comments")
1190
1191             if self.leaves and leaf.type == STANDALONE_COMMENT:
1192                 raise ValueError(
1193                     "cannot append standalone comments to a populated line"
1194                 )
1195
1196         self.append(leaf, preformatted=preformatted)
1197
1198     @property
1199     def is_comment(self) -> bool:
1200         """Is this line a standalone comment?"""
1201         return len(self.leaves) == 1 and self.leaves[0].type == STANDALONE_COMMENT
1202
1203     @property
1204     def is_decorator(self) -> bool:
1205         """Is this line a decorator?"""
1206         return bool(self) and self.leaves[0].type == token.AT
1207
1208     @property
1209     def is_import(self) -> bool:
1210         """Is this an import line?"""
1211         return bool(self) and is_import(self.leaves[0])
1212
1213     @property
1214     def is_class(self) -> bool:
1215         """Is this line a class definition?"""
1216         return (
1217             bool(self)
1218             and self.leaves[0].type == token.NAME
1219             and self.leaves[0].value == "class"
1220         )
1221
1222     @property
1223     def is_stub_class(self) -> bool:
1224         """Is this line a class definition with a body consisting only of "..."?"""
1225         return self.is_class and self.leaves[-3:] == [
1226             Leaf(token.DOT, ".") for _ in range(3)
1227         ]
1228
1229     @property
1230     def is_def(self) -> bool:
1231         """Is this a function definition? (Also returns True for async defs.)"""
1232         try:
1233             first_leaf = self.leaves[0]
1234         except IndexError:
1235             return False
1236
1237         try:
1238             second_leaf: Optional[Leaf] = self.leaves[1]
1239         except IndexError:
1240             second_leaf = None
1241         return (first_leaf.type == token.NAME and first_leaf.value == "def") or (
1242             first_leaf.type == token.ASYNC
1243             and second_leaf is not None
1244             and second_leaf.type == token.NAME
1245             and second_leaf.value == "def"
1246         )
1247
1248     @property
1249     def is_class_paren_empty(self) -> bool:
1250         """Is this a class with no base classes but using parentheses?
1251
1252         Those are unnecessary and should be removed.
1253         """
1254         return (
1255             bool(self)
1256             and len(self.leaves) == 4
1257             and self.is_class
1258             and self.leaves[2].type == token.LPAR
1259             and self.leaves[2].value == "("
1260             and self.leaves[3].type == token.RPAR
1261             and self.leaves[3].value == ")"
1262         )
1263
1264     @property
1265     def is_triple_quoted_string(self) -> bool:
1266         """Is the line a triple quoted string?"""
1267         return (
1268             bool(self)
1269             and self.leaves[0].type == token.STRING
1270             and self.leaves[0].value.startswith(('"""', "'''"))
1271         )
1272
1273     def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
1274         """If so, needs to be split before emitting."""
1275         for leaf in self.leaves:
1276             if leaf.type == STANDALONE_COMMENT:
1277                 if leaf.bracket_depth <= depth_limit:
1278                     return True
1279         return False
1280
1281     def contains_inner_type_comments(self) -> bool:
1282         ignored_ids = set()
1283         try:
1284             last_leaf = self.leaves[-1]
1285             ignored_ids.add(id(last_leaf))
1286             if last_leaf.type == token.COMMA or (
1287                 last_leaf.type == token.RPAR and not last_leaf.value
1288             ):
1289                 # When trailing commas or optional parens are inserted by Black for
1290                 # consistency, comments after the previous last element are not moved
1291                 # (they don't have to, rendering will still be correct).  So we ignore
1292                 # trailing commas and invisible.
1293                 last_leaf = self.leaves[-2]
1294                 ignored_ids.add(id(last_leaf))
1295         except IndexError:
1296             return False
1297
1298         for leaf_id, comments in self.comments.items():
1299             if leaf_id in ignored_ids:
1300                 continue
1301
1302             for comment in comments:
1303                 if is_type_comment(comment):
1304                     return True
1305
1306         return False
1307
1308     def contains_multiline_strings(self) -> bool:
1309         for leaf in self.leaves:
1310             if is_multiline_string(leaf):
1311                 return True
1312
1313         return False
1314
1315     def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
1316         """Remove trailing comma if there is one and it's safe."""
1317         if not (
1318             self.leaves
1319             and self.leaves[-1].type == token.COMMA
1320             and closing.type in CLOSING_BRACKETS
1321         ):
1322             return False
1323
1324         if closing.type == token.RBRACE:
1325             self.remove_trailing_comma()
1326             return True
1327
1328         if closing.type == token.RSQB:
1329             comma = self.leaves[-1]
1330             if comma.parent and comma.parent.type == syms.listmaker:
1331                 self.remove_trailing_comma()
1332                 return True
1333
1334         # For parens let's check if it's safe to remove the comma.
1335         # Imports are always safe.
1336         if self.is_import:
1337             self.remove_trailing_comma()
1338             return True
1339
1340         # Otherwise, if the trailing one is the only one, we might mistakenly
1341         # change a tuple into a different type by removing the comma.
1342         depth = closing.bracket_depth + 1
1343         commas = 0
1344         opening = closing.opening_bracket
1345         for _opening_index, leaf in enumerate(self.leaves):
1346             if leaf is opening:
1347                 break
1348
1349         else:
1350             return False
1351
1352         for leaf in self.leaves[_opening_index + 1 :]:
1353             if leaf is closing:
1354                 break
1355
1356             bracket_depth = leaf.bracket_depth
1357             if bracket_depth == depth and leaf.type == token.COMMA:
1358                 commas += 1
1359                 if leaf.parent and leaf.parent.type in {
1360                     syms.arglist,
1361                     syms.typedargslist,
1362                 }:
1363                     commas += 1
1364                     break
1365
1366         if commas > 1:
1367             self.remove_trailing_comma()
1368             return True
1369
1370         return False
1371
1372     def append_comment(self, comment: Leaf) -> bool:
1373         """Add an inline or standalone comment to the line."""
1374         if (
1375             comment.type == STANDALONE_COMMENT
1376             and self.bracket_tracker.any_open_brackets()
1377         ):
1378             comment.prefix = ""
1379             return False
1380
1381         if comment.type != token.COMMENT:
1382             return False
1383
1384         if not self.leaves:
1385             comment.type = STANDALONE_COMMENT
1386             comment.prefix = ""
1387             return False
1388
1389         last_leaf = self.leaves[-1]
1390         if (
1391             last_leaf.type == token.RPAR
1392             and not last_leaf.value
1393             and last_leaf.parent
1394             and len(list(last_leaf.parent.leaves())) <= 3
1395             and not is_type_comment(comment)
1396         ):
1397             # Comments on an optional parens wrapping a single leaf should belong to
1398             # the wrapped node except if it's a type comment. Pinning the comment like
1399             # this avoids unstable formatting caused by comment migration.
1400             if len(self.leaves) < 2:
1401                 comment.type = STANDALONE_COMMENT
1402                 comment.prefix = ""
1403                 return False
1404             last_leaf = self.leaves[-2]
1405         self.comments.setdefault(id(last_leaf), []).append(comment)
1406         return True
1407
1408     def comments_after(self, leaf: Leaf) -> List[Leaf]:
1409         """Generate comments that should appear directly after `leaf`."""
1410         return self.comments.get(id(leaf), [])
1411
1412     def remove_trailing_comma(self) -> None:
1413         """Remove the trailing comma and moves the comments attached to it."""
1414         trailing_comma = self.leaves.pop()
1415         trailing_comma_comments = self.comments.pop(id(trailing_comma), [])
1416         self.comments.setdefault(id(self.leaves[-1]), []).extend(
1417             trailing_comma_comments
1418         )
1419
1420     def is_complex_subscript(self, leaf: Leaf) -> bool:
1421         """Return True iff `leaf` is part of a slice with non-trivial exprs."""
1422         open_lsqb = self.bracket_tracker.get_open_lsqb()
1423         if open_lsqb is None:
1424             return False
1425
1426         subscript_start = open_lsqb.next_sibling
1427
1428         if isinstance(subscript_start, Node):
1429             if subscript_start.type == syms.listmaker:
1430                 return False
1431
1432             if subscript_start.type == syms.subscriptlist:
1433                 subscript_start = child_towards(subscript_start, leaf)
1434         return subscript_start is not None and any(
1435             n.type in TEST_DESCENDANTS for n in subscript_start.pre_order()
1436         )
1437
1438     def __str__(self) -> str:
1439         """Render the line."""
1440         if not self:
1441             return "\n"
1442
1443         indent = "    " * self.depth
1444         leaves = iter(self.leaves)
1445         first = next(leaves)
1446         res = f"{first.prefix}{indent}{first.value}"
1447         for leaf in leaves:
1448             res += str(leaf)
1449         for comment in itertools.chain.from_iterable(self.comments.values()):
1450             res += str(comment)
1451         return res + "\n"
1452
1453     def __bool__(self) -> bool:
1454         """Return True if the line has leaves or comments."""
1455         return bool(self.leaves or self.comments)
1456
1457
1458 @dataclass
1459 class EmptyLineTracker:
1460     """Provides a stateful method that returns the number of potential extra
1461     empty lines needed before and after the currently processed line.
1462
1463     Note: this tracker works on lines that haven't been split yet.  It assumes
1464     the prefix of the first leaf consists of optional newlines.  Those newlines
1465     are consumed by `maybe_empty_lines()` and included in the computation.
1466     """
1467
1468     is_pyi: bool = False
1469     previous_line: Optional[Line] = None
1470     previous_after: int = 0
1471     previous_defs: List[int] = Factory(list)
1472
1473     def maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1474         """Return the number of extra empty lines before and after the `current_line`.
1475
1476         This is for separating `def`, `async def` and `class` with extra empty
1477         lines (two on module-level).
1478         """
1479         before, after = self._maybe_empty_lines(current_line)
1480         before -= self.previous_after
1481         self.previous_after = after
1482         self.previous_line = current_line
1483         return before, after
1484
1485     def _maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1486         max_allowed = 1
1487         if current_line.depth == 0:
1488             max_allowed = 1 if self.is_pyi else 2
1489         if current_line.leaves:
1490             # Consume the first leaf's extra newlines.
1491             first_leaf = current_line.leaves[0]
1492             before = first_leaf.prefix.count("\n")
1493             before = min(before, max_allowed)
1494             first_leaf.prefix = ""
1495         else:
1496             before = 0
1497         depth = current_line.depth
1498         while self.previous_defs and self.previous_defs[-1] >= depth:
1499             self.previous_defs.pop()
1500             if self.is_pyi:
1501                 before = 0 if depth else 1
1502             else:
1503                 before = 1 if depth else 2
1504         if current_line.is_decorator or current_line.is_def or current_line.is_class:
1505             return self._maybe_empty_lines_for_class_or_def(current_line, before)
1506
1507         if (
1508             self.previous_line
1509             and self.previous_line.is_import
1510             and not current_line.is_import
1511             and depth == self.previous_line.depth
1512         ):
1513             return (before or 1), 0
1514
1515         if (
1516             self.previous_line
1517             and self.previous_line.is_class
1518             and current_line.is_triple_quoted_string
1519         ):
1520             return before, 1
1521
1522         return before, 0
1523
1524     def _maybe_empty_lines_for_class_or_def(
1525         self, current_line: Line, before: int
1526     ) -> Tuple[int, int]:
1527         if not current_line.is_decorator:
1528             self.previous_defs.append(current_line.depth)
1529         if self.previous_line is None:
1530             # Don't insert empty lines before the first line in the file.
1531             return 0, 0
1532
1533         if self.previous_line.is_decorator:
1534             return 0, 0
1535
1536         if self.previous_line.depth < current_line.depth and (
1537             self.previous_line.is_class or self.previous_line.is_def
1538         ):
1539             return 0, 0
1540
1541         if (
1542             self.previous_line.is_comment
1543             and self.previous_line.depth == current_line.depth
1544             and before == 0
1545         ):
1546             return 0, 0
1547
1548         if self.is_pyi:
1549             if self.previous_line.depth > current_line.depth:
1550                 newlines = 1
1551             elif current_line.is_class or self.previous_line.is_class:
1552                 if current_line.is_stub_class and self.previous_line.is_stub_class:
1553                     # No blank line between classes with an empty body
1554                     newlines = 0
1555                 else:
1556                     newlines = 1
1557             elif current_line.is_def and not self.previous_line.is_def:
1558                 # Blank line between a block of functions and a block of non-functions
1559                 newlines = 1
1560             else:
1561                 newlines = 0
1562         else:
1563             newlines = 2
1564         if current_line.depth and newlines:
1565             newlines -= 1
1566         return newlines, 0
1567
1568
1569 @dataclass
1570 class LineGenerator(Visitor[Line]):
1571     """Generates reformatted Line objects.  Empty lines are not emitted.
1572
1573     Note: destroys the tree it's visiting by mutating prefixes of its leaves
1574     in ways that will no longer stringify to valid Python code on the tree.
1575     """
1576
1577     is_pyi: bool = False
1578     normalize_strings: bool = True
1579     current_line: Line = Factory(Line)
1580     remove_u_prefix: bool = False
1581
1582     def line(self, indent: int = 0) -> Iterator[Line]:
1583         """Generate a line.
1584
1585         If the line is empty, only emit if it makes sense.
1586         If the line is too long, split it first and then generate.
1587
1588         If any lines were generated, set up a new current_line.
1589         """
1590         if not self.current_line:
1591             self.current_line.depth += indent
1592             return  # Line is empty, don't emit. Creating a new one unnecessary.
1593
1594         complete_line = self.current_line
1595         self.current_line = Line(depth=complete_line.depth + indent)
1596         yield complete_line
1597
1598     def visit_default(self, node: LN) -> Iterator[Line]:
1599         """Default `visit_*()` implementation. Recurses to children of `node`."""
1600         if isinstance(node, Leaf):
1601             any_open_brackets = self.current_line.bracket_tracker.any_open_brackets()
1602             for comment in generate_comments(node):
1603                 if any_open_brackets:
1604                     # any comment within brackets is subject to splitting
1605                     self.current_line.append(comment)
1606                 elif comment.type == token.COMMENT:
1607                     # regular trailing comment
1608                     self.current_line.append(comment)
1609                     yield from self.line()
1610
1611                 else:
1612                     # regular standalone comment
1613                     yield from self.line()
1614
1615                     self.current_line.append(comment)
1616                     yield from self.line()
1617
1618             normalize_prefix(node, inside_brackets=any_open_brackets)
1619             if self.normalize_strings and node.type == token.STRING:
1620                 normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
1621                 normalize_string_quotes(node)
1622             if node.type == token.NUMBER:
1623                 normalize_numeric_literal(node)
1624             if node.type not in WHITESPACE:
1625                 self.current_line.append(node)
1626         yield from super().visit_default(node)
1627
1628     def visit_atom(self, node: Node) -> Iterator[Line]:
1629         # Always make parentheses invisible around a single node, because it should
1630         # not be needed (except in the case of yield, where removing the parentheses
1631         # produces a SyntaxError).
1632         if (
1633             len(node.children) == 3
1634             and isinstance(node.children[0], Leaf)
1635             and node.children[0].type == token.LPAR
1636             and isinstance(node.children[2], Leaf)
1637             and node.children[2].type == token.RPAR
1638             and isinstance(node.children[1], Leaf)
1639             and not (
1640                 node.children[1].type == token.NAME
1641                 and node.children[1].value == "yield"
1642             )
1643         ):
1644             node.children[0].value = ""
1645             node.children[2].value = ""
1646         yield from super().visit_default(node)
1647
1648     def visit_factor(self, node: Node) -> Iterator[Line]:
1649         """Force parentheses between a unary op and a binary power:
1650
1651         -2 ** 8 -> -(2 ** 8)
1652         """
1653         child = node.children[1]
1654         if child.type == syms.power and len(child.children) == 3:
1655             lpar = Leaf(token.LPAR, "(")
1656             rpar = Leaf(token.RPAR, ")")
1657             index = child.remove() or 0
1658             node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
1659         yield from self.visit_default(node)
1660
1661     def visit_INDENT(self, node: Node) -> Iterator[Line]:
1662         """Increase indentation level, maybe yield a line."""
1663         # In blib2to3 INDENT never holds comments.
1664         yield from self.line(+1)
1665         yield from self.visit_default(node)
1666
1667     def visit_DEDENT(self, node: Node) -> Iterator[Line]:
1668         """Decrease indentation level, maybe yield a line."""
1669         # The current line might still wait for trailing comments.  At DEDENT time
1670         # there won't be any (they would be prefixes on the preceding NEWLINE).
1671         # Emit the line then.
1672         yield from self.line()
1673
1674         # While DEDENT has no value, its prefix may contain standalone comments
1675         # that belong to the current indentation level.  Get 'em.
1676         yield from self.visit_default(node)
1677
1678         # Finally, emit the dedent.
1679         yield from self.line(-1)
1680
1681     def visit_stmt(
1682         self, node: Node, keywords: Set[str], parens: Set[str]
1683     ) -> Iterator[Line]:
1684         """Visit a statement.
1685
1686         This implementation is shared for `if`, `while`, `for`, `try`, `except`,
1687         `def`, `with`, `class`, `assert` and assignments.
1688
1689         The relevant Python language `keywords` for a given statement will be
1690         NAME leaves within it. This methods puts those on a separate line.
1691
1692         `parens` holds a set of string leaf values immediately after which
1693         invisible parens should be put.
1694         """
1695         normalize_invisible_parens(node, parens_after=parens)
1696         for child in node.children:
1697             if child.type == token.NAME and child.value in keywords:  # type: ignore
1698                 yield from self.line()
1699
1700             yield from self.visit(child)
1701
1702     def visit_suite(self, node: Node) -> Iterator[Line]:
1703         """Visit a suite."""
1704         if self.is_pyi and is_stub_suite(node):
1705             yield from self.visit(node.children[2])
1706         else:
1707             yield from self.visit_default(node)
1708
1709     def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
1710         """Visit a statement without nested statements."""
1711         is_suite_like = node.parent and node.parent.type in STATEMENT
1712         if is_suite_like:
1713             if self.is_pyi and is_stub_body(node):
1714                 yield from self.visit_default(node)
1715             else:
1716                 yield from self.line(+1)
1717                 yield from self.visit_default(node)
1718                 yield from self.line(-1)
1719
1720         else:
1721             if not self.is_pyi or not node.parent or not is_stub_suite(node.parent):
1722                 yield from self.line()
1723             yield from self.visit_default(node)
1724
1725     def visit_async_stmt(self, node: Node) -> Iterator[Line]:
1726         """Visit `async def`, `async for`, `async with`."""
1727         yield from self.line()
1728
1729         children = iter(node.children)
1730         for child in children:
1731             yield from self.visit(child)
1732
1733             if child.type == token.ASYNC:
1734                 break
1735
1736         internal_stmt = next(children)
1737         for child in internal_stmt.children:
1738             yield from self.visit(child)
1739
1740     def visit_decorators(self, node: Node) -> Iterator[Line]:
1741         """Visit decorators."""
1742         for child in node.children:
1743             yield from self.line()
1744             yield from self.visit(child)
1745
1746     def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
1747         """Remove a semicolon and put the other statement on a separate line."""
1748         yield from self.line()
1749
1750     def visit_ENDMARKER(self, leaf: Leaf) -> Iterator[Line]:
1751         """End of file. Process outstanding comments and end with a newline."""
1752         yield from self.visit_default(leaf)
1753         yield from self.line()
1754
1755     def visit_STANDALONE_COMMENT(self, leaf: Leaf) -> Iterator[Line]:
1756         if not self.current_line.bracket_tracker.any_open_brackets():
1757             yield from self.line()
1758         yield from self.visit_default(leaf)
1759
1760     def __attrs_post_init__(self) -> None:
1761         """You are in a twisty little maze of passages."""
1762         v = self.visit_stmt
1763         Ø: Set[str] = set()
1764         self.visit_assert_stmt = partial(v, keywords={"assert"}, parens={"assert", ","})
1765         self.visit_if_stmt = partial(
1766             v, keywords={"if", "else", "elif"}, parens={"if", "elif"}
1767         )
1768         self.visit_while_stmt = partial(v, keywords={"while", "else"}, parens={"while"})
1769         self.visit_for_stmt = partial(v, keywords={"for", "else"}, parens={"for", "in"})
1770         self.visit_try_stmt = partial(
1771             v, keywords={"try", "except", "else", "finally"}, parens=Ø
1772         )
1773         self.visit_except_clause = partial(v, keywords={"except"}, parens=Ø)
1774         self.visit_with_stmt = partial(v, keywords={"with"}, parens=Ø)
1775         self.visit_funcdef = partial(v, keywords={"def"}, parens=Ø)
1776         self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
1777         self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
1778         self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
1779         self.visit_import_from = partial(v, keywords=Ø, parens={"import"})
1780         self.visit_del_stmt = partial(v, keywords=Ø, parens={"del"})
1781         self.visit_async_funcdef = self.visit_async_stmt
1782         self.visit_decorated = self.visit_decorators
1783
1784
1785 IMPLICIT_TUPLE = {syms.testlist, syms.testlist_star_expr, syms.exprlist}
1786 BRACKET = {token.LPAR: token.RPAR, token.LSQB: token.RSQB, token.LBRACE: token.RBRACE}
1787 OPENING_BRACKETS = set(BRACKET.keys())
1788 CLOSING_BRACKETS = set(BRACKET.values())
1789 BRACKETS = OPENING_BRACKETS | CLOSING_BRACKETS
1790 ALWAYS_NO_SPACE = CLOSING_BRACKETS | {token.COMMA, STANDALONE_COMMENT}
1791
1792
1793 def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str:  # noqa: C901
1794     """Return whitespace prefix if needed for the given `leaf`.
1795
1796     `complex_subscript` signals whether the given leaf is part of a subscription
1797     which has non-trivial arguments, like arithmetic expressions or function calls.
1798     """
1799     NO = ""
1800     SPACE = " "
1801     DOUBLESPACE = "  "
1802     t = leaf.type
1803     p = leaf.parent
1804     v = leaf.value
1805     if t in ALWAYS_NO_SPACE:
1806         return NO
1807
1808     if t == token.COMMENT:
1809         return DOUBLESPACE
1810
1811     assert p is not None, f"INTERNAL ERROR: hand-made leaf without parent: {leaf!r}"
1812     if t == token.COLON and p.type not in {
1813         syms.subscript,
1814         syms.subscriptlist,
1815         syms.sliceop,
1816     }:
1817         return NO
1818
1819     prev = leaf.prev_sibling
1820     if not prev:
1821         prevp = preceding_leaf(p)
1822         if not prevp or prevp.type in OPENING_BRACKETS:
1823             return NO
1824
1825         if t == token.COLON:
1826             if prevp.type == token.COLON:
1827                 return NO
1828
1829             elif prevp.type != token.COMMA and not complex_subscript:
1830                 return NO
1831
1832             return SPACE
1833
1834         if prevp.type == token.EQUAL:
1835             if prevp.parent:
1836                 if prevp.parent.type in {
1837                     syms.arglist,
1838                     syms.argument,
1839                     syms.parameters,
1840                     syms.varargslist,
1841                 }:
1842                     return NO
1843
1844                 elif prevp.parent.type == syms.typedargslist:
1845                     # A bit hacky: if the equal sign has whitespace, it means we
1846                     # previously found it's a typed argument.  So, we're using
1847                     # that, too.
1848                     return prevp.prefix
1849
1850         elif prevp.type in STARS:
1851             if is_vararg(prevp, within=VARARGS_PARENTS | UNPACKING_PARENTS):
1852                 return NO
1853
1854         elif prevp.type == token.COLON:
1855             if prevp.parent and prevp.parent.type in {syms.subscript, syms.sliceop}:
1856                 return SPACE if complex_subscript else NO
1857
1858         elif (
1859             prevp.parent
1860             and prevp.parent.type == syms.factor
1861             and prevp.type in MATH_OPERATORS
1862         ):
1863             return NO
1864
1865         elif (
1866             prevp.type == token.RIGHTSHIFT
1867             and prevp.parent
1868             and prevp.parent.type == syms.shift_expr
1869             and prevp.prev_sibling
1870             and prevp.prev_sibling.type == token.NAME
1871             and prevp.prev_sibling.value == "print"  # type: ignore
1872         ):
1873             # Python 2 print chevron
1874             return NO
1875
1876     elif prev.type in OPENING_BRACKETS:
1877         return NO
1878
1879     if p.type in {syms.parameters, syms.arglist}:
1880         # untyped function signatures or calls
1881         if not prev or prev.type != token.COMMA:
1882             return NO
1883
1884     elif p.type == syms.varargslist:
1885         # lambdas
1886         if prev and prev.type != token.COMMA:
1887             return NO
1888
1889     elif p.type == syms.typedargslist:
1890         # typed function signatures
1891         if not prev:
1892             return NO
1893
1894         if t == token.EQUAL:
1895             if prev.type != syms.tname:
1896                 return NO
1897
1898         elif prev.type == token.EQUAL:
1899             # A bit hacky: if the equal sign has whitespace, it means we
1900             # previously found it's a typed argument.  So, we're using that, too.
1901             return prev.prefix
1902
1903         elif prev.type != token.COMMA:
1904             return NO
1905
1906     elif p.type == syms.tname:
1907         # type names
1908         if not prev:
1909             prevp = preceding_leaf(p)
1910             if not prevp or prevp.type != token.COMMA:
1911                 return NO
1912
1913     elif p.type == syms.trailer:
1914         # attributes and calls
1915         if t == token.LPAR or t == token.RPAR:
1916             return NO
1917
1918         if not prev:
1919             if t == token.DOT:
1920                 prevp = preceding_leaf(p)
1921                 if not prevp or prevp.type != token.NUMBER:
1922                     return NO
1923
1924             elif t == token.LSQB:
1925                 return NO
1926
1927         elif prev.type != token.COMMA:
1928             return NO
1929
1930     elif p.type == syms.argument:
1931         # single argument
1932         if t == token.EQUAL:
1933             return NO
1934
1935         if not prev:
1936             prevp = preceding_leaf(p)
1937             if not prevp or prevp.type == token.LPAR:
1938                 return NO
1939
1940         elif prev.type in {token.EQUAL} | STARS:
1941             return NO
1942
1943     elif p.type == syms.decorator:
1944         # decorators
1945         return NO
1946
1947     elif p.type == syms.dotted_name:
1948         if prev:
1949             return NO
1950
1951         prevp = preceding_leaf(p)
1952         if not prevp or prevp.type == token.AT or prevp.type == token.DOT:
1953             return NO
1954
1955     elif p.type == syms.classdef:
1956         if t == token.LPAR:
1957             return NO
1958
1959         if prev and prev.type == token.LPAR:
1960             return NO
1961
1962     elif p.type in {syms.subscript, syms.sliceop}:
1963         # indexing
1964         if not prev:
1965             assert p.parent is not None, "subscripts are always parented"
1966             if p.parent.type == syms.subscriptlist:
1967                 return SPACE
1968
1969             return NO
1970
1971         elif not complex_subscript:
1972             return NO
1973
1974     elif p.type == syms.atom:
1975         if prev and t == token.DOT:
1976             # dots, but not the first one.
1977             return NO
1978
1979     elif p.type == syms.dictsetmaker:
1980         # dict unpacking
1981         if prev and prev.type == token.DOUBLESTAR:
1982             return NO
1983
1984     elif p.type in {syms.factor, syms.star_expr}:
1985         # unary ops
1986         if not prev:
1987             prevp = preceding_leaf(p)
1988             if not prevp or prevp.type in OPENING_BRACKETS:
1989                 return NO
1990
1991             prevp_parent = prevp.parent
1992             assert prevp_parent is not None
1993             if prevp.type == token.COLON and prevp_parent.type in {
1994                 syms.subscript,
1995                 syms.sliceop,
1996             }:
1997                 return NO
1998
1999             elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
2000                 return NO
2001
2002         elif t in {token.NAME, token.NUMBER, token.STRING}:
2003             return NO
2004
2005     elif p.type == syms.import_from:
2006         if t == token.DOT:
2007             if prev and prev.type == token.DOT:
2008                 return NO
2009
2010         elif t == token.NAME:
2011             if v == "import":
2012                 return SPACE
2013
2014             if prev and prev.type == token.DOT:
2015                 return NO
2016
2017     elif p.type == syms.sliceop:
2018         return NO
2019
2020     return SPACE
2021
2022
2023 def preceding_leaf(node: Optional[LN]) -> Optional[Leaf]:
2024     """Return the first leaf that precedes `node`, if any."""
2025     while node:
2026         res = node.prev_sibling
2027         if res:
2028             if isinstance(res, Leaf):
2029                 return res
2030
2031             try:
2032                 return list(res.leaves())[-1]
2033
2034             except IndexError:
2035                 return None
2036
2037         node = node.parent
2038     return None
2039
2040
2041 def child_towards(ancestor: Node, descendant: LN) -> Optional[LN]:
2042     """Return the child of `ancestor` that contains `descendant`."""
2043     node: Optional[LN] = descendant
2044     while node and node.parent != ancestor:
2045         node = node.parent
2046     return node
2047
2048
2049 def container_of(leaf: Leaf) -> LN:
2050     """Return `leaf` or one of its ancestors that is the topmost container of it.
2051
2052     By "container" we mean a node where `leaf` is the very first child.
2053     """
2054     same_prefix = leaf.prefix
2055     container: LN = leaf
2056     while container:
2057         parent = container.parent
2058         if parent is None:
2059             break
2060
2061         if parent.children[0].prefix != same_prefix:
2062             break
2063
2064         if parent.type == syms.file_input:
2065             break
2066
2067         if parent.prev_sibling is not None and parent.prev_sibling.type in BRACKETS:
2068             break
2069
2070         container = parent
2071     return container
2072
2073
2074 def is_split_after_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2075     """Return the priority of the `leaf` delimiter, given a line break after it.
2076
2077     The delimiter priorities returned here are from those delimiters that would
2078     cause a line break after themselves.
2079
2080     Higher numbers are higher priority.
2081     """
2082     if leaf.type == token.COMMA:
2083         return COMMA_PRIORITY
2084
2085     return 0
2086
2087
2088 def is_split_before_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
2089     """Return the priority of the `leaf` delimiter, given a line break before it.
2090
2091     The delimiter priorities returned here are from those delimiters that would
2092     cause a line break before themselves.
2093
2094     Higher numbers are higher priority.
2095     """
2096     if is_vararg(leaf, within=VARARGS_PARENTS | UNPACKING_PARENTS):
2097         # * and ** might also be MATH_OPERATORS but in this case they are not.
2098         # Don't treat them as a delimiter.
2099         return 0
2100
2101     if (
2102         leaf.type == token.DOT
2103         and leaf.parent
2104         and leaf.parent.type not in {syms.import_from, syms.dotted_name}
2105         and (previous is None or previous.type in CLOSING_BRACKETS)
2106     ):
2107         return DOT_PRIORITY
2108
2109     if (
2110         leaf.type in MATH_OPERATORS
2111         and leaf.parent
2112         and leaf.parent.type not in {syms.factor, syms.star_expr}
2113     ):
2114         return MATH_PRIORITIES[leaf.type]
2115
2116     if leaf.type in COMPARATORS:
2117         return COMPARATOR_PRIORITY
2118
2119     if (
2120         leaf.type == token.STRING
2121         and previous is not None
2122         and previous.type == token.STRING
2123     ):
2124         return STRING_PRIORITY
2125
2126     if leaf.type not in {token.NAME, token.ASYNC}:
2127         return 0
2128
2129     if (
2130         leaf.value == "for"
2131         and leaf.parent
2132         and leaf.parent.type in {syms.comp_for, syms.old_comp_for}
2133         or leaf.type == token.ASYNC
2134     ):
2135         if (
2136             not isinstance(leaf.prev_sibling, Leaf)
2137             or leaf.prev_sibling.value != "async"
2138         ):
2139             return COMPREHENSION_PRIORITY
2140
2141     if (
2142         leaf.value == "if"
2143         and leaf.parent
2144         and leaf.parent.type in {syms.comp_if, syms.old_comp_if}
2145     ):
2146         return COMPREHENSION_PRIORITY
2147
2148     if leaf.value in {"if", "else"} and leaf.parent and leaf.parent.type == syms.test:
2149         return TERNARY_PRIORITY
2150
2151     if leaf.value == "is":
2152         return COMPARATOR_PRIORITY
2153
2154     if (
2155         leaf.value == "in"
2156         and leaf.parent
2157         and leaf.parent.type in {syms.comp_op, syms.comparison}
2158         and not (
2159             previous is not None
2160             and previous.type == token.NAME
2161             and previous.value == "not"
2162         )
2163     ):
2164         return COMPARATOR_PRIORITY
2165
2166     if (
2167         leaf.value == "not"
2168         and leaf.parent
2169         and leaf.parent.type == syms.comp_op
2170         and not (
2171             previous is not None
2172             and previous.type == token.NAME
2173             and previous.value == "is"
2174         )
2175     ):
2176         return COMPARATOR_PRIORITY
2177
2178     if leaf.value in LOGIC_OPERATORS and leaf.parent:
2179         return LOGIC_PRIORITY
2180
2181     return 0
2182
2183
2184 FMT_OFF = {"# fmt: off", "# fmt:off", "# yapf: disable"}
2185 FMT_ON = {"# fmt: on", "# fmt:on", "# yapf: enable"}
2186
2187
2188 def generate_comments(leaf: LN) -> Iterator[Leaf]:
2189     """Clean the prefix of the `leaf` and generate comments from it, if any.
2190
2191     Comments in lib2to3 are shoved into the whitespace prefix.  This happens
2192     in `pgen2/driver.py:Driver.parse_tokens()`.  This was a brilliant implementation
2193     move because it does away with modifying the grammar to include all the
2194     possible places in which comments can be placed.
2195
2196     The sad consequence for us though is that comments don't "belong" anywhere.
2197     This is why this function generates simple parentless Leaf objects for
2198     comments.  We simply don't know what the correct parent should be.
2199
2200     No matter though, we can live without this.  We really only need to
2201     differentiate between inline and standalone comments.  The latter don't
2202     share the line with any code.
2203
2204     Inline comments are emitted as regular token.COMMENT leaves.  Standalone
2205     are emitted with a fake STANDALONE_COMMENT token identifier.
2206     """
2207     for pc in list_comments(leaf.prefix, is_endmarker=leaf.type == token.ENDMARKER):
2208         yield Leaf(pc.type, pc.value, prefix="\n" * pc.newlines)
2209
2210
2211 @dataclass
2212 class ProtoComment:
2213     """Describes a piece of syntax that is a comment.
2214
2215     It's not a :class:`blib2to3.pytree.Leaf` so that:
2216
2217     * it can be cached (`Leaf` objects should not be reused more than once as
2218       they store their lineno, column, prefix, and parent information);
2219     * `newlines` and `consumed` fields are kept separate from the `value`. This
2220       simplifies handling of special marker comments like ``# fmt: off/on``.
2221     """
2222
2223     type: int  # token.COMMENT or STANDALONE_COMMENT
2224     value: str  # content of the comment
2225     newlines: int  # how many newlines before the comment
2226     consumed: int  # how many characters of the original leaf's prefix did we consume
2227
2228
2229 @lru_cache(maxsize=4096)
2230 def list_comments(prefix: str, *, is_endmarker: bool) -> List[ProtoComment]:
2231     """Return a list of :class:`ProtoComment` objects parsed from the given `prefix`."""
2232     result: List[ProtoComment] = []
2233     if not prefix or "#" not in prefix:
2234         return result
2235
2236     consumed = 0
2237     nlines = 0
2238     ignored_lines = 0
2239     for index, line in enumerate(prefix.split("\n")):
2240         consumed += len(line) + 1  # adding the length of the split '\n'
2241         line = line.lstrip()
2242         if not line:
2243             nlines += 1
2244         if not line.startswith("#"):
2245             # Escaped newlines outside of a comment are not really newlines at
2246             # all. We treat a single-line comment following an escaped newline
2247             # as a simple trailing comment.
2248             if line.endswith("\\"):
2249                 ignored_lines += 1
2250             continue
2251
2252         if index == ignored_lines and not is_endmarker:
2253             comment_type = token.COMMENT  # simple trailing comment
2254         else:
2255             comment_type = STANDALONE_COMMENT
2256         comment = make_comment(line)
2257         result.append(
2258             ProtoComment(
2259                 type=comment_type, value=comment, newlines=nlines, consumed=consumed
2260             )
2261         )
2262         nlines = 0
2263     return result
2264
2265
2266 def make_comment(content: str) -> str:
2267     """Return a consistently formatted comment from the given `content` string.
2268
2269     All comments (except for "##", "#!", "#:", '#'", "#%%") should have a single
2270     space between the hash sign and the content.
2271
2272     If `content` didn't start with a hash sign, one is provided.
2273     """
2274     content = content.rstrip()
2275     if not content:
2276         return "#"
2277
2278     if content[0] == "#":
2279         content = content[1:]
2280     if content and content[0] not in " !:#'%":
2281         content = " " + content
2282     return "#" + content
2283
2284
2285 def split_line(
2286     line: Line,
2287     line_length: int,
2288     inner: bool = False,
2289     features: Collection[Feature] = (),
2290 ) -> Iterator[Line]:
2291     """Split a `line` into potentially many lines.
2292
2293     They should fit in the allotted `line_length` but might not be able to.
2294     `inner` signifies that there were a pair of brackets somewhere around the
2295     current `line`, possibly transitively. This means we can fallback to splitting
2296     by delimiters if the LHS/RHS don't yield any results.
2297
2298     `features` are syntactical features that may be used in the output.
2299     """
2300     if line.is_comment:
2301         yield line
2302         return
2303
2304     line_str = str(line).strip("\n")
2305
2306     if (
2307         not line.contains_inner_type_comments()
2308         and not line.should_explode
2309         and is_line_short_enough(line, line_length=line_length, line_str=line_str)
2310     ):
2311         yield line
2312         return
2313
2314     split_funcs: List[SplitFunc]
2315     if line.is_def:
2316         split_funcs = [left_hand_split]
2317     else:
2318
2319         def rhs(line: Line, features: Collection[Feature]) -> Iterator[Line]:
2320             for omit in generate_trailers_to_omit(line, line_length):
2321                 lines = list(right_hand_split(line, line_length, features, omit=omit))
2322                 if is_line_short_enough(lines[0], line_length=line_length):
2323                     yield from lines
2324                     return
2325
2326             # All splits failed, best effort split with no omits.
2327             # This mostly happens to multiline strings that are by definition
2328             # reported as not fitting a single line.
2329             yield from right_hand_split(line, line_length, features=features)
2330
2331         if line.inside_brackets:
2332             split_funcs = [delimiter_split, standalone_comment_split, rhs]
2333         else:
2334             split_funcs = [rhs]
2335     for split_func in split_funcs:
2336         # We are accumulating lines in `result` because we might want to abort
2337         # mission and return the original line in the end, or attempt a different
2338         # split altogether.
2339         result: List[Line] = []
2340         try:
2341             for l in split_func(line, features):
2342                 if str(l).strip("\n") == line_str:
2343                     raise CannotSplit("Split function returned an unchanged result")
2344
2345                 result.extend(
2346                     split_line(
2347                         l, line_length=line_length, inner=True, features=features
2348                     )
2349                 )
2350         except CannotSplit:
2351             continue
2352
2353         else:
2354             yield from result
2355             break
2356
2357     else:
2358         yield line
2359
2360
2361 def left_hand_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2362     """Split line into many lines, starting with the first matching bracket pair.
2363
2364     Note: this usually looks weird, only use this for function definitions.
2365     Prefer RHS otherwise.  This is why this function is not symmetrical with
2366     :func:`right_hand_split` which also handles optional parentheses.
2367     """
2368     tail_leaves: List[Leaf] = []
2369     body_leaves: List[Leaf] = []
2370     head_leaves: List[Leaf] = []
2371     current_leaves = head_leaves
2372     matching_bracket = None
2373     for leaf in line.leaves:
2374         if (
2375             current_leaves is body_leaves
2376             and leaf.type in CLOSING_BRACKETS
2377             and leaf.opening_bracket is matching_bracket
2378         ):
2379             current_leaves = tail_leaves if body_leaves else head_leaves
2380         current_leaves.append(leaf)
2381         if current_leaves is head_leaves:
2382             if leaf.type in OPENING_BRACKETS:
2383                 matching_bracket = leaf
2384                 current_leaves = body_leaves
2385     if not matching_bracket:
2386         raise CannotSplit("No brackets found")
2387
2388     head = bracket_split_build_line(head_leaves, line, matching_bracket)
2389     body = bracket_split_build_line(body_leaves, line, matching_bracket, is_body=True)
2390     tail = bracket_split_build_line(tail_leaves, line, matching_bracket)
2391     bracket_split_succeeded_or_raise(head, body, tail)
2392     for result in (head, body, tail):
2393         if result:
2394             yield result
2395
2396
2397 def right_hand_split(
2398     line: Line,
2399     line_length: int,
2400     features: Collection[Feature] = (),
2401     omit: Collection[LeafID] = (),
2402 ) -> Iterator[Line]:
2403     """Split line into many lines, starting with the last matching bracket pair.
2404
2405     If the split was by optional parentheses, attempt splitting without them, too.
2406     `omit` is a collection of closing bracket IDs that shouldn't be considered for
2407     this split.
2408
2409     Note: running this function modifies `bracket_depth` on the leaves of `line`.
2410     """
2411     tail_leaves: List[Leaf] = []
2412     body_leaves: List[Leaf] = []
2413     head_leaves: List[Leaf] = []
2414     current_leaves = tail_leaves
2415     opening_bracket = None
2416     closing_bracket = None
2417     for leaf in reversed(line.leaves):
2418         if current_leaves is body_leaves:
2419             if leaf is opening_bracket:
2420                 current_leaves = head_leaves if body_leaves else tail_leaves
2421         current_leaves.append(leaf)
2422         if current_leaves is tail_leaves:
2423             if leaf.type in CLOSING_BRACKETS and id(leaf) not in omit:
2424                 opening_bracket = leaf.opening_bracket
2425                 closing_bracket = leaf
2426                 current_leaves = body_leaves
2427     if not (opening_bracket and closing_bracket and head_leaves):
2428         # If there is no opening or closing_bracket that means the split failed and
2429         # all content is in the tail.  Otherwise, if `head_leaves` are empty, it means
2430         # the matching `opening_bracket` wasn't available on `line` anymore.
2431         raise CannotSplit("No brackets found")
2432
2433     tail_leaves.reverse()
2434     body_leaves.reverse()
2435     head_leaves.reverse()
2436     head = bracket_split_build_line(head_leaves, line, opening_bracket)
2437     body = bracket_split_build_line(body_leaves, line, opening_bracket, is_body=True)
2438     tail = bracket_split_build_line(tail_leaves, line, opening_bracket)
2439     bracket_split_succeeded_or_raise(head, body, tail)
2440     if (
2441         # the body shouldn't be exploded
2442         not body.should_explode
2443         # the opening bracket is an optional paren
2444         and opening_bracket.type == token.LPAR
2445         and not opening_bracket.value
2446         # the closing bracket is an optional paren
2447         and closing_bracket.type == token.RPAR
2448         and not closing_bracket.value
2449         # it's not an import (optional parens are the only thing we can split on
2450         # in this case; attempting a split without them is a waste of time)
2451         and not line.is_import
2452         # there are no standalone comments in the body
2453         and not body.contains_standalone_comments(0)
2454         # and we can actually remove the parens
2455         and can_omit_invisible_parens(body, line_length)
2456     ):
2457         omit = {id(closing_bracket), *omit}
2458         try:
2459             yield from right_hand_split(line, line_length, features=features, omit=omit)
2460             return
2461
2462         except CannotSplit:
2463             if not (
2464                 can_be_split(body)
2465                 or is_line_short_enough(body, line_length=line_length)
2466             ):
2467                 raise CannotSplit(
2468                     "Splitting failed, body is still too long and can't be split."
2469                 )
2470
2471             elif head.contains_multiline_strings() or tail.contains_multiline_strings():
2472                 raise CannotSplit(
2473                     "The current optional pair of parentheses is bound to fail to "
2474                     "satisfy the splitting algorithm because the head or the tail "
2475                     "contains multiline strings which by definition never fit one "
2476                     "line."
2477                 )
2478
2479     ensure_visible(opening_bracket)
2480     ensure_visible(closing_bracket)
2481     for result in (head, body, tail):
2482         if result:
2483             yield result
2484
2485
2486 def bracket_split_succeeded_or_raise(head: Line, body: Line, tail: Line) -> None:
2487     """Raise :exc:`CannotSplit` if the last left- or right-hand split failed.
2488
2489     Do nothing otherwise.
2490
2491     A left- or right-hand split is based on a pair of brackets. Content before
2492     (and including) the opening bracket is left on one line, content inside the
2493     brackets is put on a separate line, and finally content starting with and
2494     following the closing bracket is put on a separate line.
2495
2496     Those are called `head`, `body`, and `tail`, respectively. If the split
2497     produced the same line (all content in `head`) or ended up with an empty `body`
2498     and the `tail` is just the closing bracket, then it's considered failed.
2499     """
2500     tail_len = len(str(tail).strip())
2501     if not body:
2502         if tail_len == 0:
2503             raise CannotSplit("Splitting brackets produced the same line")
2504
2505         elif tail_len < 3:
2506             raise CannotSplit(
2507                 f"Splitting brackets on an empty body to save "
2508                 f"{tail_len} characters is not worth it"
2509             )
2510
2511
2512 def bracket_split_build_line(
2513     leaves: List[Leaf], original: Line, opening_bracket: Leaf, *, is_body: bool = False
2514 ) -> Line:
2515     """Return a new line with given `leaves` and respective comments from `original`.
2516
2517     If `is_body` is True, the result line is one-indented inside brackets and as such
2518     has its first leaf's prefix normalized and a trailing comma added when expected.
2519     """
2520     result = Line(depth=original.depth)
2521     if is_body:
2522         result.inside_brackets = True
2523         result.depth += 1
2524         if leaves:
2525             # Since body is a new indent level, remove spurious leading whitespace.
2526             normalize_prefix(leaves[0], inside_brackets=True)
2527             # Ensure a trailing comma for imports and standalone function arguments, but
2528             # be careful not to add one after any comments.
2529             no_commas = original.is_def and not any(
2530                 l.type == token.COMMA for l in leaves
2531             )
2532
2533             if original.is_import or no_commas:
2534                 for i in range(len(leaves) - 1, -1, -1):
2535                     if leaves[i].type == STANDALONE_COMMENT:
2536                         continue
2537                     elif leaves[i].type == token.COMMA:
2538                         break
2539                     else:
2540                         leaves.insert(i + 1, Leaf(token.COMMA, ","))
2541                         break
2542     # Populate the line
2543     for leaf in leaves:
2544         result.append(leaf, preformatted=True)
2545         for comment_after in original.comments_after(leaf):
2546             result.append(comment_after, preformatted=True)
2547     if is_body:
2548         result.should_explode = should_explode(result, opening_bracket)
2549     return result
2550
2551
2552 def dont_increase_indentation(split_func: SplitFunc) -> SplitFunc:
2553     """Normalize prefix of the first leaf in every line returned by `split_func`.
2554
2555     This is a decorator over relevant split functions.
2556     """
2557
2558     @wraps(split_func)
2559     def split_wrapper(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2560         for l in split_func(line, features):
2561             normalize_prefix(l.leaves[0], inside_brackets=True)
2562             yield l
2563
2564     return split_wrapper
2565
2566
2567 @dont_increase_indentation
2568 def delimiter_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
2569     """Split according to delimiters of the highest priority.
2570
2571     If the appropriate Features are given, the split will add trailing commas
2572     also in function signatures and calls that contain `*` and `**`.
2573     """
2574     try:
2575         last_leaf = line.leaves[-1]
2576     except IndexError:
2577         raise CannotSplit("Line empty")
2578
2579     bt = line.bracket_tracker
2580     try:
2581         delimiter_priority = bt.max_delimiter_priority(exclude={id(last_leaf)})
2582     except ValueError:
2583         raise CannotSplit("No delimiters found")
2584
2585     if delimiter_priority == DOT_PRIORITY:
2586         if bt.delimiter_count_with_priority(delimiter_priority) == 1:
2587             raise CannotSplit("Splitting a single attribute from its owner looks wrong")
2588
2589     current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2590     lowest_depth = sys.maxsize
2591     trailing_comma_safe = True
2592
2593     def append_to_line(leaf: Leaf) -> Iterator[Line]:
2594         """Append `leaf` to current line or to new line if appending impossible."""
2595         nonlocal current_line
2596         try:
2597             current_line.append_safe(leaf, preformatted=True)
2598         except ValueError:
2599             yield current_line
2600
2601             current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2602             current_line.append(leaf)
2603
2604     for leaf in line.leaves:
2605         yield from append_to_line(leaf)
2606
2607         for comment_after in line.comments_after(leaf):
2608             yield from append_to_line(comment_after)
2609
2610         lowest_depth = min(lowest_depth, leaf.bracket_depth)
2611         if leaf.bracket_depth == lowest_depth:
2612             if is_vararg(leaf, within={syms.typedargslist}):
2613                 trailing_comma_safe = (
2614                     trailing_comma_safe and Feature.TRAILING_COMMA_IN_DEF in features
2615                 )
2616             elif is_vararg(leaf, within={syms.arglist, syms.argument}):
2617                 trailing_comma_safe = (
2618                     trailing_comma_safe and Feature.TRAILING_COMMA_IN_CALL in features
2619                 )
2620
2621         leaf_priority = bt.delimiters.get(id(leaf))
2622         if leaf_priority == delimiter_priority:
2623             yield current_line
2624
2625             current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2626     if current_line:
2627         if (
2628             trailing_comma_safe
2629             and delimiter_priority == COMMA_PRIORITY
2630             and current_line.leaves[-1].type != token.COMMA
2631             and current_line.leaves[-1].type != STANDALONE_COMMENT
2632         ):
2633             current_line.append(Leaf(token.COMMA, ","))
2634         yield current_line
2635
2636
2637 @dont_increase_indentation
2638 def standalone_comment_split(
2639     line: Line, features: Collection[Feature] = ()
2640 ) -> Iterator[Line]:
2641     """Split standalone comments from the rest of the line."""
2642     if not line.contains_standalone_comments(0):
2643         raise CannotSplit("Line does not have any standalone comments")
2644
2645     current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2646
2647     def append_to_line(leaf: Leaf) -> Iterator[Line]:
2648         """Append `leaf` to current line or to new line if appending impossible."""
2649         nonlocal current_line
2650         try:
2651             current_line.append_safe(leaf, preformatted=True)
2652         except ValueError:
2653             yield current_line
2654
2655             current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2656             current_line.append(leaf)
2657
2658     for leaf in line.leaves:
2659         yield from append_to_line(leaf)
2660
2661         for comment_after in line.comments_after(leaf):
2662             yield from append_to_line(comment_after)
2663
2664     if current_line:
2665         yield current_line
2666
2667
2668 def is_import(leaf: Leaf) -> bool:
2669     """Return True if the given leaf starts an import statement."""
2670     p = leaf.parent
2671     t = leaf.type
2672     v = leaf.value
2673     return bool(
2674         t == token.NAME
2675         and (
2676             (v == "import" and p and p.type == syms.import_name)
2677             or (v == "from" and p and p.type == syms.import_from)
2678         )
2679     )
2680
2681
2682 def is_type_comment(leaf: Leaf) -> bool:
2683     """Return True if the given leaf is a special comment.
2684     Only returns true for type comments for now."""
2685     t = leaf.type
2686     v = leaf.value
2687     return t in {token.COMMENT, t == STANDALONE_COMMENT} and v.startswith("# type:")
2688
2689
2690 def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
2691     """Leave existing extra newlines if not `inside_brackets`. Remove everything
2692     else.
2693
2694     Note: don't use backslashes for formatting or you'll lose your voting rights.
2695     """
2696     if not inside_brackets:
2697         spl = leaf.prefix.split("#")
2698         if "\\" not in spl[0]:
2699             nl_count = spl[-1].count("\n")
2700             if len(spl) > 1:
2701                 nl_count -= 1
2702             leaf.prefix = "\n" * nl_count
2703             return
2704
2705     leaf.prefix = ""
2706
2707
2708 def normalize_string_prefix(leaf: Leaf, remove_u_prefix: bool = False) -> None:
2709     """Make all string prefixes lowercase.
2710
2711     If remove_u_prefix is given, also removes any u prefix from the string.
2712
2713     Note: Mutates its argument.
2714     """
2715     match = re.match(r"^([furbFURB]*)(.*)$", leaf.value, re.DOTALL)
2716     assert match is not None, f"failed to match string {leaf.value!r}"
2717     orig_prefix = match.group(1)
2718     new_prefix = orig_prefix.lower()
2719     if remove_u_prefix:
2720         new_prefix = new_prefix.replace("u", "")
2721     leaf.value = f"{new_prefix}{match.group(2)}"
2722
2723
2724 def normalize_string_quotes(leaf: Leaf) -> None:
2725     """Prefer double quotes but only if it doesn't cause more escaping.
2726
2727     Adds or removes backslashes as appropriate. Doesn't parse and fix
2728     strings nested in f-strings (yet).
2729
2730     Note: Mutates its argument.
2731     """
2732     value = leaf.value.lstrip("furbFURB")
2733     if value[:3] == '"""':
2734         return
2735
2736     elif value[:3] == "'''":
2737         orig_quote = "'''"
2738         new_quote = '"""'
2739     elif value[0] == '"':
2740         orig_quote = '"'
2741         new_quote = "'"
2742     else:
2743         orig_quote = "'"
2744         new_quote = '"'
2745     first_quote_pos = leaf.value.find(orig_quote)
2746     if first_quote_pos == -1:
2747         return  # There's an internal error
2748
2749     prefix = leaf.value[:first_quote_pos]
2750     unescaped_new_quote = re.compile(rf"(([^\\]|^)(\\\\)*){new_quote}")
2751     escaped_new_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){new_quote}")
2752     escaped_orig_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){orig_quote}")
2753     body = leaf.value[first_quote_pos + len(orig_quote) : -len(orig_quote)]
2754     if "r" in prefix.casefold():
2755         if unescaped_new_quote.search(body):
2756             # There's at least one unescaped new_quote in this raw string
2757             # so converting is impossible
2758             return
2759
2760         # Do not introduce or remove backslashes in raw strings
2761         new_body = body
2762     else:
2763         # remove unnecessary escapes
2764         new_body = sub_twice(escaped_new_quote, rf"\1\2{new_quote}", body)
2765         if body != new_body:
2766             # Consider the string without unnecessary escapes as the original
2767             body = new_body
2768             leaf.value = f"{prefix}{orig_quote}{body}{orig_quote}"
2769         new_body = sub_twice(escaped_orig_quote, rf"\1\2{orig_quote}", new_body)
2770         new_body = sub_twice(unescaped_new_quote, rf"\1\\{new_quote}", new_body)
2771     if "f" in prefix.casefold():
2772         matches = re.findall(
2773             r"""
2774             (?:[^{]|^)\{  # start of the string or a non-{ followed by a single {
2775                 ([^{].*?)  # contents of the brackets except if begins with {{
2776             \}(?:[^}]|$)  # A } followed by end of the string or a non-}
2777             """,
2778             new_body,
2779             re.VERBOSE,
2780         )
2781         for m in matches:
2782             if "\\" in str(m):
2783                 # Do not introduce backslashes in interpolated expressions
2784                 return
2785     if new_quote == '"""' and new_body[-1:] == '"':
2786         # edge case:
2787         new_body = new_body[:-1] + '\\"'
2788     orig_escape_count = body.count("\\")
2789     new_escape_count = new_body.count("\\")
2790     if new_escape_count > orig_escape_count:
2791         return  # Do not introduce more escaping
2792
2793     if new_escape_count == orig_escape_count and orig_quote == '"':
2794         return  # Prefer double quotes
2795
2796     leaf.value = f"{prefix}{new_quote}{new_body}{new_quote}"
2797
2798
2799 def normalize_numeric_literal(leaf: Leaf) -> None:
2800     """Normalizes numeric (float, int, and complex) literals.
2801
2802     All letters used in the representation are normalized to lowercase (except
2803     in Python 2 long literals).
2804     """
2805     text = leaf.value.lower()
2806     if text.startswith(("0o", "0b")):
2807         # Leave octal and binary literals alone.
2808         pass
2809     elif text.startswith("0x"):
2810         # Change hex literals to upper case.
2811         before, after = text[:2], text[2:]
2812         text = f"{before}{after.upper()}"
2813     elif "e" in text:
2814         before, after = text.split("e")
2815         sign = ""
2816         if after.startswith("-"):
2817             after = after[1:]
2818             sign = "-"
2819         elif after.startswith("+"):
2820             after = after[1:]
2821         before = format_float_or_int_string(before)
2822         text = f"{before}e{sign}{after}"
2823     elif text.endswith(("j", "l")):
2824         number = text[:-1]
2825         suffix = text[-1]
2826         # Capitalize in "2L" because "l" looks too similar to "1".
2827         if suffix == "l":
2828             suffix = "L"
2829         text = f"{format_float_or_int_string(number)}{suffix}"
2830     else:
2831         text = format_float_or_int_string(text)
2832     leaf.value = text
2833
2834
2835 def format_float_or_int_string(text: str) -> str:
2836     """Formats a float string like "1.0"."""
2837     if "." not in text:
2838         return text
2839
2840     before, after = text.split(".")
2841     return f"{before or 0}.{after or 0}"
2842
2843
2844 def normalize_invisible_parens(node: Node, parens_after: Set[str]) -> None:
2845     """Make existing optional parentheses invisible or create new ones.
2846
2847     `parens_after` is a set of string leaf values immediately after which parens
2848     should be put.
2849
2850     Standardizes on visible parentheses for single-element tuples, and keeps
2851     existing visible parentheses for other tuples and generator expressions.
2852     """
2853     for pc in list_comments(node.prefix, is_endmarker=False):
2854         if pc.value in FMT_OFF:
2855             # This `node` has a prefix with `# fmt: off`, don't mess with parens.
2856             return
2857
2858     check_lpar = False
2859     for index, child in enumerate(list(node.children)):
2860         # Add parentheses around long tuple unpacking in assignments.
2861         if (
2862             index == 0
2863             and isinstance(child, Node)
2864             and child.type == syms.testlist_star_expr
2865         ):
2866             check_lpar = True
2867
2868         if check_lpar:
2869             if is_walrus_assignment(child):
2870                 continue
2871             if child.type == syms.atom:
2872                 if maybe_make_parens_invisible_in_atom(child, parent=node):
2873                     lpar = Leaf(token.LPAR, "")
2874                     rpar = Leaf(token.RPAR, "")
2875                     index = child.remove() or 0
2876                     node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2877             elif is_one_tuple(child):
2878                 # wrap child in visible parentheses
2879                 lpar = Leaf(token.LPAR, "(")
2880                 rpar = Leaf(token.RPAR, ")")
2881                 child.remove()
2882                 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2883             elif node.type == syms.import_from:
2884                 # "import from" nodes store parentheses directly as part of
2885                 # the statement
2886                 if child.type == token.LPAR:
2887                     # make parentheses invisible
2888                     child.value = ""  # type: ignore
2889                     node.children[-1].value = ""  # type: ignore
2890                 elif child.type != token.STAR:
2891                     # insert invisible parentheses
2892                     node.insert_child(index, Leaf(token.LPAR, ""))
2893                     node.append_child(Leaf(token.RPAR, ""))
2894                 break
2895
2896             elif not (isinstance(child, Leaf) and is_multiline_string(child)):
2897                 # wrap child in invisible parentheses
2898                 lpar = Leaf(token.LPAR, "")
2899                 rpar = Leaf(token.RPAR, "")
2900                 index = child.remove() or 0
2901                 prefix = child.prefix
2902                 child.prefix = ""
2903                 new_child = Node(syms.atom, [lpar, child, rpar])
2904                 new_child.prefix = prefix
2905                 node.insert_child(index, new_child)
2906
2907         check_lpar = isinstance(child, Leaf) and child.value in parens_after
2908
2909
2910 def normalize_fmt_off(node: Node) -> None:
2911     """Convert content between `# fmt: off`/`# fmt: on` into standalone comments."""
2912     try_again = True
2913     while try_again:
2914         try_again = convert_one_fmt_off_pair(node)
2915
2916
2917 def convert_one_fmt_off_pair(node: Node) -> bool:
2918     """Convert content of a single `# fmt: off`/`# fmt: on` into a standalone comment.
2919
2920     Returns True if a pair was converted.
2921     """
2922     for leaf in node.leaves():
2923         previous_consumed = 0
2924         for comment in list_comments(leaf.prefix, is_endmarker=False):
2925             if comment.value in FMT_OFF:
2926                 # We only want standalone comments. If there's no previous leaf or
2927                 # the previous leaf is indentation, it's a standalone comment in
2928                 # disguise.
2929                 if comment.type != STANDALONE_COMMENT:
2930                     prev = preceding_leaf(leaf)
2931                     if prev and prev.type not in WHITESPACE:
2932                         continue
2933
2934                 ignored_nodes = list(generate_ignored_nodes(leaf))
2935                 if not ignored_nodes:
2936                     continue
2937
2938                 first = ignored_nodes[0]  # Can be a container node with the `leaf`.
2939                 parent = first.parent
2940                 prefix = first.prefix
2941                 first.prefix = prefix[comment.consumed :]
2942                 hidden_value = (
2943                     comment.value + "\n" + "".join(str(n) for n in ignored_nodes)
2944                 )
2945                 if hidden_value.endswith("\n"):
2946                     # That happens when one of the `ignored_nodes` ended with a NEWLINE
2947                     # leaf (possibly followed by a DEDENT).
2948                     hidden_value = hidden_value[:-1]
2949                 first_idx = None
2950                 for ignored in ignored_nodes:
2951                     index = ignored.remove()
2952                     if first_idx is None:
2953                         first_idx = index
2954                 assert parent is not None, "INTERNAL ERROR: fmt: on/off handling (1)"
2955                 assert first_idx is not None, "INTERNAL ERROR: fmt: on/off handling (2)"
2956                 parent.insert_child(
2957                     first_idx,
2958                     Leaf(
2959                         STANDALONE_COMMENT,
2960                         hidden_value,
2961                         prefix=prefix[:previous_consumed] + "\n" * comment.newlines,
2962                     ),
2963                 )
2964                 return True
2965
2966             previous_consumed = comment.consumed
2967
2968     return False
2969
2970
2971 def generate_ignored_nodes(leaf: Leaf) -> Iterator[LN]:
2972     """Starting from the container of `leaf`, generate all leaves until `# fmt: on`.
2973
2974     Stops at the end of the block.
2975     """
2976     container: Optional[LN] = container_of(leaf)
2977     while container is not None and container.type != token.ENDMARKER:
2978         for comment in list_comments(container.prefix, is_endmarker=False):
2979             if comment.value in FMT_ON:
2980                 return
2981
2982         yield container
2983
2984         container = container.next_sibling
2985
2986
2987 def maybe_make_parens_invisible_in_atom(node: LN, parent: LN) -> bool:
2988     """If it's safe, make the parens in the atom `node` invisible, recursively.
2989
2990     Returns whether the node should itself be wrapped in invisible parentheses.
2991
2992     """
2993     if (
2994         node.type != syms.atom
2995         or is_empty_tuple(node)
2996         or is_one_tuple(node)
2997         or (is_yield(node) and parent.type != syms.expr_stmt)
2998         or max_delimiter_priority_in_atom(node) >= COMMA_PRIORITY
2999     ):
3000         return False
3001
3002     first = node.children[0]
3003     last = node.children[-1]
3004     if first.type == token.LPAR and last.type == token.RPAR:
3005         # make parentheses invisible
3006         first.value = ""  # type: ignore
3007         last.value = ""  # type: ignore
3008         if len(node.children) > 1:
3009             maybe_make_parens_invisible_in_atom(node.children[1], parent=parent)
3010         return False
3011
3012     return True
3013
3014
3015 def is_empty_tuple(node: LN) -> bool:
3016     """Return True if `node` holds an empty tuple."""
3017     return (
3018         node.type == syms.atom
3019         and len(node.children) == 2
3020         and node.children[0].type == token.LPAR
3021         and node.children[1].type == token.RPAR
3022     )
3023
3024
3025 def unwrap_singleton_parenthesis(node: LN) -> Optional[LN]:
3026     """Returns `wrapped` if `node` is of the shape ( wrapped ).
3027
3028     Parenthesis can be optional. Returns None otherwise"""
3029     if len(node.children) != 3:
3030         return None
3031     lpar, wrapped, rpar = node.children
3032     if not (lpar.type == token.LPAR and rpar.type == token.RPAR):
3033         return None
3034
3035     return wrapped
3036
3037
3038 def is_one_tuple(node: LN) -> bool:
3039     """Return True if `node` holds a tuple with one element, with or without parens."""
3040     if node.type == syms.atom:
3041         gexp = unwrap_singleton_parenthesis(node)
3042         if gexp is None or gexp.type != syms.testlist_gexp:
3043             return False
3044
3045         return len(gexp.children) == 2 and gexp.children[1].type == token.COMMA
3046
3047     return (
3048         node.type in IMPLICIT_TUPLE
3049         and len(node.children) == 2
3050         and node.children[1].type == token.COMMA
3051     )
3052
3053
3054 def is_walrus_assignment(node: LN) -> bool:
3055     """Return True iff `node` is of the shape ( test := test )"""
3056     inner = unwrap_singleton_parenthesis(node)
3057     return inner is not None and inner.type == syms.namedexpr_test
3058
3059
3060 def is_yield(node: LN) -> bool:
3061     """Return True if `node` holds a `yield` or `yield from` expression."""
3062     if node.type == syms.yield_expr:
3063         return True
3064
3065     if node.type == token.NAME and node.value == "yield":  # type: ignore
3066         return True
3067
3068     if node.type != syms.atom:
3069         return False
3070
3071     if len(node.children) != 3:
3072         return False
3073
3074     lpar, expr, rpar = node.children
3075     if lpar.type == token.LPAR and rpar.type == token.RPAR:
3076         return is_yield(expr)
3077
3078     return False
3079
3080
3081 def is_vararg(leaf: Leaf, within: Set[NodeType]) -> bool:
3082     """Return True if `leaf` is a star or double star in a vararg or kwarg.
3083
3084     If `within` includes VARARGS_PARENTS, this applies to function signatures.
3085     If `within` includes UNPACKING_PARENTS, it applies to right hand-side
3086     extended iterable unpacking (PEP 3132) and additional unpacking
3087     generalizations (PEP 448).
3088     """
3089     if leaf.type not in STARS or not leaf.parent:
3090         return False
3091
3092     p = leaf.parent
3093     if p.type == syms.star_expr:
3094         # Star expressions are also used as assignment targets in extended
3095         # iterable unpacking (PEP 3132).  See what its parent is instead.
3096         if not p.parent:
3097             return False
3098
3099         p = p.parent
3100
3101     return p.type in within
3102
3103
3104 def is_multiline_string(leaf: Leaf) -> bool:
3105     """Return True if `leaf` is a multiline string that actually spans many lines."""
3106     value = leaf.value.lstrip("furbFURB")
3107     return value[:3] in {'"""', "'''"} and "\n" in value
3108
3109
3110 def is_stub_suite(node: Node) -> bool:
3111     """Return True if `node` is a suite with a stub body."""
3112     if (
3113         len(node.children) != 4
3114         or node.children[0].type != token.NEWLINE
3115         or node.children[1].type != token.INDENT
3116         or node.children[3].type != token.DEDENT
3117     ):
3118         return False
3119
3120     return is_stub_body(node.children[2])
3121
3122
3123 def is_stub_body(node: LN) -> bool:
3124     """Return True if `node` is a simple statement containing an ellipsis."""
3125     if not isinstance(node, Node) or node.type != syms.simple_stmt:
3126         return False
3127
3128     if len(node.children) != 2:
3129         return False
3130
3131     child = node.children[0]
3132     return (
3133         child.type == syms.atom
3134         and len(child.children) == 3
3135         and all(leaf == Leaf(token.DOT, ".") for leaf in child.children)
3136     )
3137
3138
3139 def max_delimiter_priority_in_atom(node: LN) -> Priority:
3140     """Return maximum delimiter priority inside `node`.
3141
3142     This is specific to atoms with contents contained in a pair of parentheses.
3143     If `node` isn't an atom or there are no enclosing parentheses, returns 0.
3144     """
3145     if node.type != syms.atom:
3146         return 0
3147
3148     first = node.children[0]
3149     last = node.children[-1]
3150     if not (first.type == token.LPAR and last.type == token.RPAR):
3151         return 0
3152
3153     bt = BracketTracker()
3154     for c in node.children[1:-1]:
3155         if isinstance(c, Leaf):
3156             bt.mark(c)
3157         else:
3158             for leaf in c.leaves():
3159                 bt.mark(leaf)
3160     try:
3161         return bt.max_delimiter_priority()
3162
3163     except ValueError:
3164         return 0
3165
3166
3167 def ensure_visible(leaf: Leaf) -> None:
3168     """Make sure parentheses are visible.
3169
3170     They could be invisible as part of some statements (see
3171     :func:`normalize_invisible_parens` and :func:`visit_import_from`).
3172     """
3173     if leaf.type == token.LPAR:
3174         leaf.value = "("
3175     elif leaf.type == token.RPAR:
3176         leaf.value = ")"
3177
3178
3179 def should_explode(line: Line, opening_bracket: Leaf) -> bool:
3180     """Should `line` immediately be split with `delimiter_split()` after RHS?"""
3181
3182     if not (
3183         opening_bracket.parent
3184         and opening_bracket.parent.type in {syms.atom, syms.import_from}
3185         and opening_bracket.value in "[{("
3186     ):
3187         return False
3188
3189     try:
3190         last_leaf = line.leaves[-1]
3191         exclude = {id(last_leaf)} if last_leaf.type == token.COMMA else set()
3192         max_priority = line.bracket_tracker.max_delimiter_priority(exclude=exclude)
3193     except (IndexError, ValueError):
3194         return False
3195
3196     return max_priority == COMMA_PRIORITY
3197
3198
3199 def get_features_used(node: Node) -> Set[Feature]:
3200     """Return a set of (relatively) new Python features used in this file.
3201
3202     Currently looking for:
3203     - f-strings;
3204     - underscores in numeric literals; and
3205     - trailing commas after * or ** in function signatures and calls.
3206     """
3207     features: Set[Feature] = set()
3208     for n in node.pre_order():
3209         if n.type == token.STRING:
3210             value_head = n.value[:2]  # type: ignore
3211             if value_head in {'f"', 'F"', "f'", "F'", "rf", "fr", "RF", "FR"}:
3212                 features.add(Feature.F_STRINGS)
3213
3214         elif n.type == token.NUMBER:
3215             if "_" in n.value:  # type: ignore
3216                 features.add(Feature.NUMERIC_UNDERSCORES)
3217
3218         elif n.type == token.COLONEQUAL:
3219             features.add(Feature.ASSIGNMENT_EXPRESSIONS)
3220
3221         elif (
3222             n.type in {syms.typedargslist, syms.arglist}
3223             and n.children
3224             and n.children[-1].type == token.COMMA
3225         ):
3226             if n.type == syms.typedargslist:
3227                 feature = Feature.TRAILING_COMMA_IN_DEF
3228             else:
3229                 feature = Feature.TRAILING_COMMA_IN_CALL
3230
3231             for ch in n.children:
3232                 if ch.type in STARS:
3233                     features.add(feature)
3234
3235                 if ch.type == syms.argument:
3236                     for argch in ch.children:
3237                         if argch.type in STARS:
3238                             features.add(feature)
3239
3240     return features
3241
3242
3243 def detect_target_versions(node: Node) -> Set[TargetVersion]:
3244     """Detect the version to target based on the nodes used."""
3245     features = get_features_used(node)
3246     return {
3247         version for version in TargetVersion if features <= VERSION_TO_FEATURES[version]
3248     }
3249
3250
3251 def generate_trailers_to_omit(line: Line, line_length: int) -> Iterator[Set[LeafID]]:
3252     """Generate sets of closing bracket IDs that should be omitted in a RHS.
3253
3254     Brackets can be omitted if the entire trailer up to and including
3255     a preceding closing bracket fits in one line.
3256
3257     Yielded sets are cumulative (contain results of previous yields, too).  First
3258     set is empty.
3259     """
3260
3261     omit: Set[LeafID] = set()
3262     yield omit
3263
3264     length = 4 * line.depth
3265     opening_bracket = None
3266     closing_bracket = None
3267     inner_brackets: Set[LeafID] = set()
3268     for index, leaf, leaf_length in enumerate_with_length(line, reversed=True):
3269         length += leaf_length
3270         if length > line_length:
3271             break
3272
3273         has_inline_comment = leaf_length > len(leaf.value) + len(leaf.prefix)
3274         if leaf.type == STANDALONE_COMMENT or has_inline_comment:
3275             break
3276
3277         if opening_bracket:
3278             if leaf is opening_bracket:
3279                 opening_bracket = None
3280             elif leaf.type in CLOSING_BRACKETS:
3281                 inner_brackets.add(id(leaf))
3282         elif leaf.type in CLOSING_BRACKETS:
3283             if index > 0 and line.leaves[index - 1].type in OPENING_BRACKETS:
3284                 # Empty brackets would fail a split so treat them as "inner"
3285                 # brackets (e.g. only add them to the `omit` set if another
3286                 # pair of brackets was good enough.
3287                 inner_brackets.add(id(leaf))
3288                 continue
3289
3290             if closing_bracket:
3291                 omit.add(id(closing_bracket))
3292                 omit.update(inner_brackets)
3293                 inner_brackets.clear()
3294                 yield omit
3295
3296             if leaf.value:
3297                 opening_bracket = leaf.opening_bracket
3298                 closing_bracket = leaf
3299
3300
3301 def get_future_imports(node: Node) -> Set[str]:
3302     """Return a set of __future__ imports in the file."""
3303     imports: Set[str] = set()
3304
3305     def get_imports_from_children(children: List[LN]) -> Generator[str, None, None]:
3306         for child in children:
3307             if isinstance(child, Leaf):
3308                 if child.type == token.NAME:
3309                     yield child.value
3310             elif child.type == syms.import_as_name:
3311                 orig_name = child.children[0]
3312                 assert isinstance(orig_name, Leaf), "Invalid syntax parsing imports"
3313                 assert orig_name.type == token.NAME, "Invalid syntax parsing imports"
3314                 yield orig_name.value
3315             elif child.type == syms.import_as_names:
3316                 yield from get_imports_from_children(child.children)
3317             else:
3318                 raise AssertionError("Invalid syntax parsing imports")
3319
3320     for child in node.children:
3321         if child.type != syms.simple_stmt:
3322             break
3323         first_child = child.children[0]
3324         if isinstance(first_child, Leaf):
3325             # Continue looking if we see a docstring; otherwise stop.
3326             if (
3327                 len(child.children) == 2
3328                 and first_child.type == token.STRING
3329                 and child.children[1].type == token.NEWLINE
3330             ):
3331                 continue
3332             else:
3333                 break
3334         elif first_child.type == syms.import_from:
3335             module_name = first_child.children[1]
3336             if not isinstance(module_name, Leaf) or module_name.value != "__future__":
3337                 break
3338             imports |= set(get_imports_from_children(first_child.children[3:]))
3339         else:
3340             break
3341     return imports
3342
3343
3344 def gen_python_files_in_dir(
3345     path: Path,
3346     root: Path,
3347     include: Pattern[str],
3348     exclude: Pattern[str],
3349     report: "Report",
3350 ) -> Iterator[Path]:
3351     """Generate all files under `path` whose paths are not excluded by the
3352     `exclude` regex, but are included by the `include` regex.
3353
3354     Symbolic links pointing outside of the `root` directory are ignored.
3355
3356     `report` is where output about exclusions goes.
3357     """
3358     assert root.is_absolute(), f"INTERNAL ERROR: `root` must be absolute but is {root}"
3359     for child in path.iterdir():
3360         try:
3361             normalized_path = "/" + child.resolve().relative_to(root).as_posix()
3362         except ValueError:
3363             if child.is_symlink():
3364                 report.path_ignored(
3365                     child, f"is a symbolic link that points outside {root}"
3366                 )
3367                 continue
3368
3369             raise
3370
3371         if child.is_dir():
3372             normalized_path += "/"
3373         exclude_match = exclude.search(normalized_path)
3374         if exclude_match and exclude_match.group(0):
3375             report.path_ignored(child, f"matches the --exclude regular expression")
3376             continue
3377
3378         if child.is_dir():
3379             yield from gen_python_files_in_dir(child, root, include, exclude, report)
3380
3381         elif child.is_file():
3382             include_match = include.search(normalized_path)
3383             if include_match:
3384                 yield child
3385
3386
3387 @lru_cache()
3388 def find_project_root(srcs: Iterable[str]) -> Path:
3389     """Return a directory containing .git, .hg, or pyproject.toml.
3390
3391     That directory can be one of the directories passed in `srcs` or their
3392     common parent.
3393
3394     If no directory in the tree contains a marker that would specify it's the
3395     project root, the root of the file system is returned.
3396     """
3397     if not srcs:
3398         return Path("/").resolve()
3399
3400     common_base = min(Path(src).resolve() for src in srcs)
3401     if common_base.is_dir():
3402         # Append a fake file so `parents` below returns `common_base_dir`, too.
3403         common_base /= "fake-file"
3404     for directory in common_base.parents:
3405         if (directory / ".git").is_dir():
3406             return directory
3407
3408         if (directory / ".hg").is_dir():
3409             return directory
3410
3411         if (directory / "pyproject.toml").is_file():
3412             return directory
3413
3414     return directory
3415
3416
3417 @dataclass
3418 class Report:
3419     """Provides a reformatting counter. Can be rendered with `str(report)`."""
3420
3421     check: bool = False
3422     quiet: bool = False
3423     verbose: bool = False
3424     change_count: int = 0
3425     same_count: int = 0
3426     failure_count: int = 0
3427
3428     def done(self, src: Path, changed: Changed) -> None:
3429         """Increment the counter for successful reformatting. Write out a message."""
3430         if changed is Changed.YES:
3431             reformatted = "would reformat" if self.check else "reformatted"
3432             if self.verbose or not self.quiet:
3433                 out(f"{reformatted} {src}")
3434             self.change_count += 1
3435         else:
3436             if self.verbose:
3437                 if changed is Changed.NO:
3438                     msg = f"{src} already well formatted, good job."
3439                 else:
3440                     msg = f"{src} wasn't modified on disk since last run."
3441                 out(msg, bold=False)
3442             self.same_count += 1
3443
3444     def failed(self, src: Path, message: str) -> None:
3445         """Increment the counter for failed reformatting. Write out a message."""
3446         err(f"error: cannot format {src}: {message}")
3447         self.failure_count += 1
3448
3449     def path_ignored(self, path: Path, message: str) -> None:
3450         if self.verbose:
3451             out(f"{path} ignored: {message}", bold=False)
3452
3453     @property
3454     def return_code(self) -> int:
3455         """Return the exit code that the app should use.
3456
3457         This considers the current state of changed files and failures:
3458         - if there were any failures, return 123;
3459         - if any files were changed and --check is being used, return 1;
3460         - otherwise return 0.
3461         """
3462         # According to http://tldp.org/LDP/abs/html/exitcodes.html starting with
3463         # 126 we have special return codes reserved by the shell.
3464         if self.failure_count:
3465             return 123
3466
3467         elif self.change_count and self.check:
3468             return 1
3469
3470         return 0
3471
3472     def __str__(self) -> str:
3473         """Render a color report of the current state.
3474
3475         Use `click.unstyle` to remove colors.
3476         """
3477         if self.check:
3478             reformatted = "would be reformatted"
3479             unchanged = "would be left unchanged"
3480             failed = "would fail to reformat"
3481         else:
3482             reformatted = "reformatted"
3483             unchanged = "left unchanged"
3484             failed = "failed to reformat"
3485         report = []
3486         if self.change_count:
3487             s = "s" if self.change_count > 1 else ""
3488             report.append(
3489                 click.style(f"{self.change_count} file{s} {reformatted}", bold=True)
3490             )
3491         if self.same_count:
3492             s = "s" if self.same_count > 1 else ""
3493             report.append(f"{self.same_count} file{s} {unchanged}")
3494         if self.failure_count:
3495             s = "s" if self.failure_count > 1 else ""
3496             report.append(
3497                 click.style(f"{self.failure_count} file{s} {failed}", fg="red")
3498             )
3499         return ", ".join(report) + "."
3500
3501
3502 def parse_ast(src: str) -> Union[ast.AST, ast3.AST, ast27.AST]:
3503     filename = "<unknown>"
3504     if sys.version_info >= (3, 8):
3505         # TODO: support Python 4+ ;)
3506         for minor_version in range(sys.version_info[1], 4, -1):
3507             try:
3508                 return ast.parse(src, filename, feature_version=(3, minor_version))
3509             except SyntaxError:
3510                 continue
3511     else:
3512         for feature_version in (7, 6):
3513             try:
3514                 return ast3.parse(src, filename, feature_version=feature_version)
3515             except SyntaxError:
3516                 continue
3517
3518     return ast27.parse(src)
3519
3520
3521 def _fixup_ast_constants(
3522     node: Union[ast.AST, ast3.AST, ast27.AST]
3523 ) -> Union[ast.AST, ast3.AST, ast27.AST]:
3524     """Map ast nodes deprecated in 3.8 to Constant."""
3525     # casts are required until this is released:
3526     # https://github.com/python/typeshed/pull/3142
3527     if isinstance(node, (ast.Str, ast3.Str, ast27.Str, ast.Bytes, ast3.Bytes)):
3528         return cast(ast.AST, ast.Constant(value=node.s))
3529     elif isinstance(node, (ast.Num, ast3.Num, ast27.Num)):
3530         return cast(ast.AST, ast.Constant(value=node.n))
3531     elif isinstance(node, (ast.NameConstant, ast3.NameConstant)):
3532         return cast(ast.AST, ast.Constant(value=node.value))
3533     return node
3534
3535
3536 def assert_equivalent(src: str, dst: str) -> None:
3537     """Raise AssertionError if `src` and `dst` aren't equivalent."""
3538
3539     def _v(node: Union[ast.AST, ast3.AST, ast27.AST], depth: int = 0) -> Iterator[str]:
3540         """Simple visitor generating strings to compare ASTs by content."""
3541
3542         node = _fixup_ast_constants(node)
3543
3544         yield f"{'  ' * depth}{node.__class__.__name__}("
3545
3546         for field in sorted(node._fields):
3547             # TypeIgnore has only one field 'lineno' which breaks this comparison
3548             type_ignore_classes = (ast3.TypeIgnore, ast27.TypeIgnore)
3549             if sys.version_info >= (3, 8):
3550                 type_ignore_classes += (ast.TypeIgnore,)
3551             if isinstance(node, type_ignore_classes):
3552                 break
3553
3554             try:
3555                 value = getattr(node, field)
3556             except AttributeError:
3557                 continue
3558
3559             yield f"{'  ' * (depth+1)}{field}="
3560
3561             if isinstance(value, list):
3562                 for item in value:
3563                     # Ignore nested tuples within del statements, because we may insert
3564                     # parentheses and they change the AST.
3565                     if (
3566                         field == "targets"
3567                         and isinstance(node, (ast.Delete, ast3.Delete, ast27.Delete))
3568                         and isinstance(item, (ast.Tuple, ast3.Tuple, ast27.Tuple))
3569                     ):
3570                         for item in item.elts:
3571                             yield from _v(item, depth + 2)
3572                     elif isinstance(item, (ast.AST, ast3.AST, ast27.AST)):
3573                         yield from _v(item, depth + 2)
3574
3575             elif isinstance(value, (ast.AST, ast3.AST, ast27.AST)):
3576                 yield from _v(value, depth + 2)
3577
3578             else:
3579                 yield f"{'  ' * (depth+2)}{value!r},  # {value.__class__.__name__}"
3580
3581         yield f"{'  ' * depth})  # /{node.__class__.__name__}"
3582
3583     try:
3584         src_ast = parse_ast(src)
3585     except Exception as exc:
3586         raise AssertionError(
3587             f"cannot use --safe with this file; failed to parse source file.  "
3588             f"AST error message: {exc}"
3589         )
3590
3591     try:
3592         dst_ast = parse_ast(dst)
3593     except Exception as exc:
3594         log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
3595         raise AssertionError(
3596             f"INTERNAL ERROR: Black produced invalid code: {exc}. "
3597             f"Please report a bug on https://github.com/psf/black/issues.  "
3598             f"This invalid output might be helpful: {log}"
3599         ) from None
3600
3601     src_ast_str = "\n".join(_v(src_ast))
3602     dst_ast_str = "\n".join(_v(dst_ast))
3603     if src_ast_str != dst_ast_str:
3604         log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst"))
3605         raise AssertionError(
3606             f"INTERNAL ERROR: Black produced code that is not equivalent to "
3607             f"the source.  "
3608             f"Please report a bug on https://github.com/psf/black/issues.  "
3609             f"This diff might be helpful: {log}"
3610         ) from None
3611
3612
3613 def assert_stable(src: str, dst: str, mode: FileMode) -> None:
3614     """Raise AssertionError if `dst` reformats differently the second time."""
3615     newdst = format_str(dst, mode=mode)
3616     if dst != newdst:
3617         log = dump_to_file(
3618             diff(src, dst, "source", "first pass"),
3619             diff(dst, newdst, "first pass", "second pass"),
3620         )
3621         raise AssertionError(
3622             f"INTERNAL ERROR: Black produced different code on the second pass "
3623             f"of the formatter.  "
3624             f"Please report a bug on https://github.com/psf/black/issues.  "
3625             f"This diff might be helpful: {log}"
3626         ) from None
3627
3628
3629 def dump_to_file(*output: str) -> str:
3630     """Dump `output` to a temporary file. Return path to the file."""
3631     with tempfile.NamedTemporaryFile(
3632         mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
3633     ) as f:
3634         for lines in output:
3635             f.write(lines)
3636             if lines and lines[-1] != "\n":
3637                 f.write("\n")
3638     return f.name
3639
3640
3641 @contextmanager
3642 def nullcontext() -> Iterator[None]:
3643     """Return context manager that does nothing.
3644     Similar to `nullcontext` from python 3.7"""
3645     yield
3646
3647
3648 def diff(a: str, b: str, a_name: str, b_name: str) -> str:
3649     """Return a unified diff string between strings `a` and `b`."""
3650     import difflib
3651
3652     a_lines = [line + "\n" for line in a.split("\n")]
3653     b_lines = [line + "\n" for line in b.split("\n")]
3654     return "".join(
3655         difflib.unified_diff(a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5)
3656     )
3657
3658
3659 def cancel(tasks: Iterable[asyncio.Task]) -> None:
3660     """asyncio signal handler that cancels all `tasks` and reports to stderr."""
3661     err("Aborted!")
3662     for task in tasks:
3663         task.cancel()
3664
3665
3666 def shutdown(loop: asyncio.AbstractEventLoop) -> None:
3667     """Cancel all pending tasks on `loop`, wait for them, and close the loop."""
3668     try:
3669         if sys.version_info[:2] >= (3, 7):
3670             all_tasks = asyncio.all_tasks
3671         else:
3672             all_tasks = asyncio.Task.all_tasks
3673         # This part is borrowed from asyncio/runners.py in Python 3.7b2.
3674         to_cancel = [task for task in all_tasks(loop) if not task.done()]
3675         if not to_cancel:
3676             return
3677
3678         for task in to_cancel:
3679             task.cancel()
3680         loop.run_until_complete(
3681             asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
3682         )
3683     finally:
3684         # `concurrent.futures.Future` objects cannot be cancelled once they
3685         # are already running. There might be some when the `shutdown()` happened.
3686         # Silence their logger's spew about the event loop being closed.
3687         cf_logger = logging.getLogger("concurrent.futures")
3688         cf_logger.setLevel(logging.CRITICAL)
3689         loop.close()
3690
3691
3692 def sub_twice(regex: Pattern[str], replacement: str, original: str) -> str:
3693     """Replace `regex` with `replacement` twice on `original`.
3694
3695     This is used by string normalization to perform replaces on
3696     overlapping matches.
3697     """
3698     return regex.sub(replacement, regex.sub(replacement, original))
3699
3700
3701 def re_compile_maybe_verbose(regex: str) -> Pattern[str]:
3702     """Compile a regular expression string in `regex`.
3703
3704     If it contains newlines, use verbose mode.
3705     """
3706     if "\n" in regex:
3707         regex = "(?x)" + regex
3708     return re.compile(regex)
3709
3710
3711 def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
3712     """Like `reversed(enumerate(sequence))` if that were possible."""
3713     index = len(sequence) - 1
3714     for element in reversed(sequence):
3715         yield (index, element)
3716         index -= 1
3717
3718
3719 def enumerate_with_length(
3720     line: Line, reversed: bool = False
3721 ) -> Iterator[Tuple[Index, Leaf, int]]:
3722     """Return an enumeration of leaves with their length.
3723
3724     Stops prematurely on multiline strings and standalone comments.
3725     """
3726     op = cast(
3727         Callable[[Sequence[Leaf]], Iterator[Tuple[Index, Leaf]]],
3728         enumerate_reversed if reversed else enumerate,
3729     )
3730     for index, leaf in op(line.leaves):
3731         length = len(leaf.prefix) + len(leaf.value)
3732         if "\n" in leaf.value:
3733             return  # Multiline strings, we can't continue.
3734
3735         for comment in line.comments_after(leaf):
3736             length += len(comment.value)
3737
3738         yield index, leaf, length
3739
3740
3741 def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
3742     """Return True if `line` is no longer than `line_length`.
3743
3744     Uses the provided `line_str` rendering, if any, otherwise computes a new one.
3745     """
3746     if not line_str:
3747         line_str = str(line).strip("\n")
3748     return (
3749         len(line_str) <= line_length
3750         and "\n" not in line_str  # multiline strings
3751         and not line.contains_standalone_comments()
3752     )
3753
3754
3755 def can_be_split(line: Line) -> bool:
3756     """Return False if the line cannot be split *for sure*.
3757
3758     This is not an exhaustive search but a cheap heuristic that we can use to
3759     avoid some unfortunate formattings (mostly around wrapping unsplittable code
3760     in unnecessary parentheses).
3761     """
3762     leaves = line.leaves
3763     if len(leaves) < 2:
3764         return False
3765
3766     if leaves[0].type == token.STRING and leaves[1].type == token.DOT:
3767         call_count = 0
3768         dot_count = 0
3769         next = leaves[-1]
3770         for leaf in leaves[-2::-1]:
3771             if leaf.type in OPENING_BRACKETS:
3772                 if next.type not in CLOSING_BRACKETS:
3773                     return False
3774
3775                 call_count += 1
3776             elif leaf.type == token.DOT:
3777                 dot_count += 1
3778             elif leaf.type == token.NAME:
3779                 if not (next.type == token.DOT or next.type in OPENING_BRACKETS):
3780                     return False
3781
3782             elif leaf.type not in CLOSING_BRACKETS:
3783                 return False
3784
3785             if dot_count > 1 and call_count > 1:
3786                 return False
3787
3788     return True
3789
3790
3791 def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
3792     """Does `line` have a shape safe to reformat without optional parens around it?
3793
3794     Returns True for only a subset of potentially nice looking formattings but
3795     the point is to not return false positives that end up producing lines that
3796     are too long.
3797     """
3798     bt = line.bracket_tracker
3799     if not bt.delimiters:
3800         # Without delimiters the optional parentheses are useless.
3801         return True
3802
3803     max_priority = bt.max_delimiter_priority()
3804     if bt.delimiter_count_with_priority(max_priority) > 1:
3805         # With more than one delimiter of a kind the optional parentheses read better.
3806         return False
3807
3808     if max_priority == DOT_PRIORITY:
3809         # A single stranded method call doesn't require optional parentheses.
3810         return True
3811
3812     assert len(line.leaves) >= 2, "Stranded delimiter"
3813
3814     first = line.leaves[0]
3815     second = line.leaves[1]
3816     penultimate = line.leaves[-2]
3817     last = line.leaves[-1]
3818
3819     # With a single delimiter, omit if the expression starts or ends with
3820     # a bracket.
3821     if first.type in OPENING_BRACKETS and second.type not in CLOSING_BRACKETS:
3822         remainder = False
3823         length = 4 * line.depth
3824         for _index, leaf, leaf_length in enumerate_with_length(line):
3825             if leaf.type in CLOSING_BRACKETS and leaf.opening_bracket is first:
3826                 remainder = True
3827             if remainder:
3828                 length += leaf_length
3829                 if length > line_length:
3830                     break
3831
3832                 if leaf.type in OPENING_BRACKETS:
3833                     # There are brackets we can further split on.
3834                     remainder = False
3835
3836         else:
3837             # checked the entire string and line length wasn't exceeded
3838             if len(line.leaves) == _index + 1:
3839                 return True
3840
3841         # Note: we are not returning False here because a line might have *both*
3842         # a leading opening bracket and a trailing closing bracket.  If the
3843         # opening bracket doesn't match our rule, maybe the closing will.
3844
3845     if (
3846         last.type == token.RPAR
3847         or last.type == token.RBRACE
3848         or (
3849             # don't use indexing for omitting optional parentheses;
3850             # it looks weird
3851             last.type == token.RSQB
3852             and last.parent
3853             and last.parent.type != syms.trailer
3854         )
3855     ):
3856         if penultimate.type in OPENING_BRACKETS:
3857             # Empty brackets don't help.
3858             return False
3859
3860         if is_multiline_string(first):
3861             # Additional wrapping of a multiline string in this situation is
3862             # unnecessary.
3863             return True
3864
3865         length = 4 * line.depth
3866         seen_other_brackets = False
3867         for _index, leaf, leaf_length in enumerate_with_length(line):
3868             length += leaf_length
3869             if leaf is last.opening_bracket:
3870                 if seen_other_brackets or length <= line_length:
3871                     return True
3872
3873             elif leaf.type in OPENING_BRACKETS:
3874                 # There are brackets we can further split on.
3875                 seen_other_brackets = True
3876
3877     return False
3878
3879
3880 def get_cache_file(mode: FileMode) -> Path:
3881     return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
3882
3883
3884 def read_cache(mode: FileMode) -> Cache:
3885     """Read the cache if it exists and is well formed.
3886
3887     If it is not well formed, the call to write_cache later should resolve the issue.
3888     """
3889     cache_file = get_cache_file(mode)
3890     if not cache_file.exists():
3891         return {}
3892
3893     with cache_file.open("rb") as fobj:
3894         try:
3895             cache: Cache = pickle.load(fobj)
3896         except pickle.UnpicklingError:
3897             return {}
3898
3899     return cache
3900
3901
3902 def get_cache_info(path: Path) -> CacheInfo:
3903     """Return the information used to check if a file is already formatted or not."""
3904     stat = path.stat()
3905     return stat.st_mtime, stat.st_size
3906
3907
3908 def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
3909     """Split an iterable of paths in `sources` into two sets.
3910
3911     The first contains paths of files that modified on disk or are not in the
3912     cache. The other contains paths to non-modified files.
3913     """
3914     todo, done = set(), set()
3915     for src in sources:
3916         src = src.resolve()
3917         if cache.get(src) != get_cache_info(src):
3918             todo.add(src)
3919         else:
3920             done.add(src)
3921     return todo, done
3922
3923
3924 def write_cache(cache: Cache, sources: Iterable[Path], mode: FileMode) -> None:
3925     """Update the cache file."""
3926     cache_file = get_cache_file(mode)
3927     try:
3928         CACHE_DIR.mkdir(parents=True, exist_ok=True)
3929         new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
3930         with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
3931             pickle.dump(new_cache, f, protocol=pickle.HIGHEST_PROTOCOL)
3932         os.replace(f.name, cache_file)
3933     except OSError:
3934         pass
3935
3936
3937 def patch_click() -> None:
3938     """Make Click not crash.
3939
3940     On certain misconfigured environments, Python 3 selects the ASCII encoding as the
3941     default which restricts paths that it can access during the lifetime of the
3942     application.  Click refuses to work in this scenario by raising a RuntimeError.
3943
3944     In case of Black the likelihood that non-ASCII characters are going to be used in
3945     file paths is minimal since it's Python source code.  Moreover, this crash was
3946     spurious on Python 3.7 thanks to PEP 538 and PEP 540.
3947     """
3948     try:
3949         from click import core
3950         from click import _unicodefun  # type: ignore
3951     except ModuleNotFoundError:
3952         return
3953
3954     for module in (core, _unicodefun):
3955         if hasattr(module, "_verify_python3_env"):
3956             module._verify_python3_env = lambda: None
3957
3958
3959 def patched_main() -> None:
3960     freeze_support()
3961     patch_click()
3962     main()
3963
3964
3965 if __name__ == "__main__":
3966     patched_main()