]> git.madduck.net Git - etc/vim.git/blob - black.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Simplify `is_trivial_*` methods
[etc/vim.git] / black.py
1 import asyncio
2 import pickle
3 from asyncio.base_events import BaseEventLoop
4 from concurrent.futures import Executor, ProcessPoolExecutor
5 from enum import Enum
6 from functools import partial, wraps
7 import keyword
8 import logging
9 from multiprocessing import Manager
10 import os
11 from pathlib import Path
12 import re
13 import tokenize
14 import signal
15 import sys
16 from typing import (
17     Any,
18     Callable,
19     Collection,
20     Dict,
21     Generic,
22     Iterable,
23     Iterator,
24     List,
25     Optional,
26     Pattern,
27     Sequence,
28     Set,
29     Tuple,
30     Type,
31     TypeVar,
32     Union,
33 )
34
35 from appdirs import user_cache_dir
36 from attr import dataclass, Factory
37 import click
38
39 # lib2to3 fork
40 from blib2to3.pytree import Node, Leaf, type_repr
41 from blib2to3 import pygram, pytree
42 from blib2to3.pgen2 import driver, token
43 from blib2to3.pgen2.parse import ParseError
44
45
46 __version__ = "18.4a6"
47 DEFAULT_LINE_LENGTH = 88
48
49 # types
50 syms = pygram.python_symbols
51 FileContent = str
52 Encoding = str
53 Depth = int
54 NodeType = int
55 LeafID = int
56 Priority = int
57 Index = int
58 LN = Union[Leaf, Node]
59 SplitFunc = Callable[["Line", bool], Iterator["Line"]]
60 Timestamp = float
61 FileSize = int
62 CacheInfo = Tuple[Timestamp, FileSize]
63 Cache = Dict[Path, CacheInfo]
64 out = partial(click.secho, bold=True, err=True)
65 err = partial(click.secho, fg="red", err=True)
66
67
68 class NothingChanged(UserWarning):
69     """Raised by :func:`format_file` when reformatted code is the same as source."""
70
71
72 class CannotSplit(Exception):
73     """A readable split that fits the allotted line length is impossible.
74
75     Raised by :func:`left_hand_split`, :func:`right_hand_split`, and
76     :func:`delimiter_split`.
77     """
78
79
80 class FormatError(Exception):
81     """Base exception for `# fmt: on` and `# fmt: off` handling.
82
83     It holds the number of bytes of the prefix consumed before the format
84     control comment appeared.
85     """
86
87     def __init__(self, consumed: int) -> None:
88         super().__init__(consumed)
89         self.consumed = consumed
90
91     def trim_prefix(self, leaf: Leaf) -> None:
92         leaf.prefix = leaf.prefix[self.consumed :]
93
94     def leaf_from_consumed(self, leaf: Leaf) -> Leaf:
95         """Returns a new Leaf from the consumed part of the prefix."""
96         unformatted_prefix = leaf.prefix[: self.consumed]
97         return Leaf(token.NEWLINE, unformatted_prefix)
98
99
100 class FormatOn(FormatError):
101     """Found a comment like `# fmt: on` in the file."""
102
103
104 class FormatOff(FormatError):
105     """Found a comment like `# fmt: off` in the file."""
106
107
108 class WriteBack(Enum):
109     NO = 0
110     YES = 1
111     DIFF = 2
112
113
114 class Changed(Enum):
115     NO = 0
116     CACHED = 1
117     YES = 2
118
119
120 @click.command()
121 @click.option(
122     "-l",
123     "--line-length",
124     type=int,
125     default=DEFAULT_LINE_LENGTH,
126     help="How many character per line to allow.",
127     show_default=True,
128 )
129 @click.option(
130     "--check",
131     is_flag=True,
132     help=(
133         "Don't write the files back, just return the status.  Return code 0 "
134         "means nothing would change.  Return code 1 means some files would be "
135         "reformatted.  Return code 123 means there was an internal error."
136     ),
137 )
138 @click.option(
139     "--diff",
140     is_flag=True,
141     help="Don't write the files back, just output a diff for each file on stdout.",
142 )
143 @click.option(
144     "--fast/--safe",
145     is_flag=True,
146     help="If --fast given, skip temporary sanity checks. [default: --safe]",
147 )
148 @click.option(
149     "-q",
150     "--quiet",
151     is_flag=True,
152     help=(
153         "Don't emit non-error messages to stderr. Errors are still emitted, "
154         "silence those with 2>/dev/null."
155     ),
156 )
157 @click.version_option(version=__version__)
158 @click.argument(
159     "src",
160     nargs=-1,
161     type=click.Path(
162         exists=True, file_okay=True, dir_okay=True, readable=True, allow_dash=True
163     ),
164 )
165 @click.pass_context
166 def main(
167     ctx: click.Context,
168     line_length: int,
169     check: bool,
170     diff: bool,
171     fast: bool,
172     quiet: bool,
173     src: List[str],
174 ) -> None:
175     """The uncompromising code formatter."""
176     sources: List[Path] = []
177     for s in src:
178         p = Path(s)
179         if p.is_dir():
180             sources.extend(gen_python_files_in_dir(p))
181         elif p.is_file():
182             # if a file was explicitly given, we don't care about its extension
183             sources.append(p)
184         elif s == "-":
185             sources.append(Path("-"))
186         else:
187             err(f"invalid path: {s}")
188
189     if check and not diff:
190         write_back = WriteBack.NO
191     elif diff:
192         write_back = WriteBack.DIFF
193     else:
194         write_back = WriteBack.YES
195     report = Report(check=check, quiet=quiet)
196     if len(sources) == 0:
197         out("No paths given. Nothing to do 😴")
198         ctx.exit(0)
199         return
200
201     elif len(sources) == 1:
202         reformat_one(sources[0], line_length, fast, write_back, report)
203     else:
204         loop = asyncio.get_event_loop()
205         executor = ProcessPoolExecutor(max_workers=os.cpu_count())
206         try:
207             loop.run_until_complete(
208                 schedule_formatting(
209                     sources, line_length, fast, write_back, report, loop, executor
210                 )
211             )
212         finally:
213             shutdown(loop)
214         if not quiet:
215             out("All done! ✨ 🍰 ✨")
216             click.echo(str(report))
217     ctx.exit(report.return_code)
218
219
220 def reformat_one(
221     src: Path, line_length: int, fast: bool, write_back: WriteBack, report: "Report"
222 ) -> None:
223     """Reformat a single file under `src` without spawning child processes.
224
225     If `quiet` is True, non-error messages are not output. `line_length`,
226     `write_back`, and `fast` options are passed to :func:`format_file_in_place`.
227     """
228     try:
229         changed = Changed.NO
230         if not src.is_file() and str(src) == "-":
231             if format_stdin_to_stdout(
232                 line_length=line_length, fast=fast, write_back=write_back
233             ):
234                 changed = Changed.YES
235         else:
236             cache: Cache = {}
237             if write_back != WriteBack.DIFF:
238                 cache = read_cache(line_length)
239                 src = src.resolve()
240                 if src in cache and cache[src] == get_cache_info(src):
241                     changed = Changed.CACHED
242             if (
243                 changed is not Changed.CACHED
244                 and format_file_in_place(
245                     src, line_length=line_length, fast=fast, write_back=write_back
246                 )
247             ):
248                 changed = Changed.YES
249             if write_back == WriteBack.YES and changed is not Changed.NO:
250                 write_cache(cache, [src], line_length)
251         report.done(src, changed)
252     except Exception as exc:
253         report.failed(src, str(exc))
254
255
256 async def schedule_formatting(
257     sources: List[Path],
258     line_length: int,
259     fast: bool,
260     write_back: WriteBack,
261     report: "Report",
262     loop: BaseEventLoop,
263     executor: Executor,
264 ) -> None:
265     """Run formatting of `sources` in parallel using the provided `executor`.
266
267     (Use ProcessPoolExecutors for actual parallelism.)
268
269     `line_length`, `write_back`, and `fast` options are passed to
270     :func:`format_file_in_place`.
271     """
272     cache: Cache = {}
273     if write_back != WriteBack.DIFF:
274         cache = read_cache(line_length)
275         sources, cached = filter_cached(cache, sources)
276         for src in cached:
277             report.done(src, Changed.CACHED)
278     cancelled = []
279     formatted = []
280     if sources:
281         lock = None
282         if write_back == WriteBack.DIFF:
283             # For diff output, we need locks to ensure we don't interleave output
284             # from different processes.
285             manager = Manager()
286             lock = manager.Lock()
287         tasks = {
288             src: loop.run_in_executor(
289                 executor, format_file_in_place, src, line_length, fast, write_back, lock
290             )
291             for src in sources
292         }
293         _task_values = list(tasks.values())
294         try:
295             loop.add_signal_handler(signal.SIGINT, cancel, _task_values)
296             loop.add_signal_handler(signal.SIGTERM, cancel, _task_values)
297         except NotImplementedError:
298             # There are no good alternatives for these on Windows
299             pass
300         await asyncio.wait(_task_values)
301         for src, task in tasks.items():
302             if not task.done():
303                 report.failed(src, "timed out, cancelling")
304                 task.cancel()
305                 cancelled.append(task)
306             elif task.cancelled():
307                 cancelled.append(task)
308             elif task.exception():
309                 report.failed(src, str(task.exception()))
310             else:
311                 formatted.append(src)
312                 report.done(src, Changed.YES if task.result() else Changed.NO)
313
314     if cancelled:
315         await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
316     if write_back == WriteBack.YES and formatted:
317         write_cache(cache, formatted, line_length)
318
319
320 def format_file_in_place(
321     src: Path,
322     line_length: int,
323     fast: bool,
324     write_back: WriteBack = WriteBack.NO,
325     lock: Any = None,  # multiprocessing.Manager().Lock() is some crazy proxy
326 ) -> bool:
327     """Format file under `src` path. Return True if changed.
328
329     If `write_back` is True, write reformatted code back to stdout.
330     `line_length` and `fast` options are passed to :func:`format_file_contents`.
331     """
332     is_pyi = src.suffix == ".pyi"
333
334     with tokenize.open(src) as src_buffer:
335         src_contents = src_buffer.read()
336     try:
337         dst_contents = format_file_contents(
338             src_contents, line_length=line_length, fast=fast, is_pyi=is_pyi
339         )
340     except NothingChanged:
341         return False
342
343     if write_back == write_back.YES:
344         with open(src, "w", encoding=src_buffer.encoding) as f:
345             f.write(dst_contents)
346     elif write_back == write_back.DIFF:
347         src_name = f"{src}  (original)"
348         dst_name = f"{src}  (formatted)"
349         diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
350         if lock:
351             lock.acquire()
352         try:
353             sys.stdout.write(diff_contents)
354         finally:
355             if lock:
356                 lock.release()
357     return True
358
359
360 def format_stdin_to_stdout(
361     line_length: int, fast: bool, write_back: WriteBack = WriteBack.NO
362 ) -> bool:
363     """Format file on stdin. Return True if changed.
364
365     If `write_back` is True, write reformatted code back to stdout.
366     `line_length` and `fast` arguments are passed to :func:`format_file_contents`.
367     """
368     src = sys.stdin.read()
369     dst = src
370     try:
371         dst = format_file_contents(src, line_length=line_length, fast=fast)
372         return True
373
374     except NothingChanged:
375         return False
376
377     finally:
378         if write_back == WriteBack.YES:
379             sys.stdout.write(dst)
380         elif write_back == WriteBack.DIFF:
381             src_name = "<stdin>  (original)"
382             dst_name = "<stdin>  (formatted)"
383             sys.stdout.write(diff(src, dst, src_name, dst_name))
384
385
386 def format_file_contents(
387     src_contents: str, *, line_length: int, fast: bool, is_pyi: bool = False
388 ) -> FileContent:
389     """Reformat contents a file and return new contents.
390
391     If `fast` is False, additionally confirm that the reformatted code is
392     valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
393     `line_length` is passed to :func:`format_str`.
394     """
395     if src_contents.strip() == "":
396         raise NothingChanged
397
398     dst_contents = format_str(src_contents, line_length=line_length, is_pyi=is_pyi)
399     if src_contents == dst_contents:
400         raise NothingChanged
401
402     if not fast:
403         assert_equivalent(src_contents, dst_contents)
404         assert_stable(
405             src_contents, dst_contents, line_length=line_length, is_pyi=is_pyi
406         )
407     return dst_contents
408
409
410 def format_str(
411     src_contents: str, line_length: int, *, is_pyi: bool = False
412 ) -> FileContent:
413     """Reformat a string and return new contents.
414
415     `line_length` determines how many characters per line are allowed.
416     """
417     src_node = lib2to3_parse(src_contents)
418     dst_contents = ""
419     future_imports = get_future_imports(src_node)
420     elt = EmptyLineTracker(is_pyi=is_pyi)
421     py36 = is_python36(src_node)
422     lines = LineGenerator(
423         remove_u_prefix=py36 or "unicode_literals" in future_imports, is_pyi=is_pyi
424     )
425     empty_line = Line()
426     after = 0
427     for current_line in lines.visit(src_node):
428         for _ in range(after):
429             dst_contents += str(empty_line)
430         before, after = elt.maybe_empty_lines(current_line)
431         for _ in range(before):
432             dst_contents += str(empty_line)
433         for line in split_line(current_line, line_length=line_length, py36=py36):
434             dst_contents += str(line)
435     return dst_contents
436
437
438 GRAMMARS = [
439     pygram.python_grammar_no_print_statement_no_exec_statement,
440     pygram.python_grammar_no_print_statement,
441     pygram.python_grammar,
442 ]
443
444
445 def lib2to3_parse(src_txt: str) -> Node:
446     """Given a string with source, return the lib2to3 Node."""
447     grammar = pygram.python_grammar_no_print_statement
448     if src_txt[-1] != "\n":
449         nl = "\r\n" if "\r\n" in src_txt[:1024] else "\n"
450         src_txt += nl
451     for grammar in GRAMMARS:
452         drv = driver.Driver(grammar, pytree.convert)
453         try:
454             result = drv.parse_string(src_txt, True)
455             break
456
457         except ParseError as pe:
458             lineno, column = pe.context[1]
459             lines = src_txt.splitlines()
460             try:
461                 faulty_line = lines[lineno - 1]
462             except IndexError:
463                 faulty_line = "<line number missing in source>"
464             exc = ValueError(f"Cannot parse: {lineno}:{column}: {faulty_line}")
465     else:
466         raise exc from None
467
468     if isinstance(result, Leaf):
469         result = Node(syms.file_input, [result])
470     return result
471
472
473 def lib2to3_unparse(node: Node) -> str:
474     """Given a lib2to3 node, return its string representation."""
475     code = str(node)
476     return code
477
478
479 T = TypeVar("T")
480
481
482 class Visitor(Generic[T]):
483     """Basic lib2to3 visitor that yields things of type `T` on `visit()`."""
484
485     def visit(self, node: LN) -> Iterator[T]:
486         """Main method to visit `node` and its children.
487
488         It tries to find a `visit_*()` method for the given `node.type`, like
489         `visit_simple_stmt` for Node objects or `visit_INDENT` for Leaf objects.
490         If no dedicated `visit_*()` method is found, chooses `visit_default()`
491         instead.
492
493         Then yields objects of type `T` from the selected visitor.
494         """
495         if node.type < 256:
496             name = token.tok_name[node.type]
497         else:
498             name = type_repr(node.type)
499         yield from getattr(self, f"visit_{name}", self.visit_default)(node)
500
501     def visit_default(self, node: LN) -> Iterator[T]:
502         """Default `visit_*()` implementation. Recurses to children of `node`."""
503         if isinstance(node, Node):
504             for child in node.children:
505                 yield from self.visit(child)
506
507
508 @dataclass
509 class DebugVisitor(Visitor[T]):
510     tree_depth: int = 0
511
512     def visit_default(self, node: LN) -> Iterator[T]:
513         indent = " " * (2 * self.tree_depth)
514         if isinstance(node, Node):
515             _type = type_repr(node.type)
516             out(f"{indent}{_type}", fg="yellow")
517             self.tree_depth += 1
518             for child in node.children:
519                 yield from self.visit(child)
520
521             self.tree_depth -= 1
522             out(f"{indent}/{_type}", fg="yellow", bold=False)
523         else:
524             _type = token.tok_name.get(node.type, str(node.type))
525             out(f"{indent}{_type}", fg="blue", nl=False)
526             if node.prefix:
527                 # We don't have to handle prefixes for `Node` objects since
528                 # that delegates to the first child anyway.
529                 out(f" {node.prefix!r}", fg="green", bold=False, nl=False)
530             out(f" {node.value!r}", fg="blue", bold=False)
531
532     @classmethod
533     def show(cls, code: str) -> None:
534         """Pretty-print the lib2to3 AST of a given string of `code`.
535
536         Convenience method for debugging.
537         """
538         v: DebugVisitor[None] = DebugVisitor()
539         list(v.visit(lib2to3_parse(code)))
540
541
542 KEYWORDS = set(keyword.kwlist)
543 WHITESPACE = {token.DEDENT, token.INDENT, token.NEWLINE}
544 FLOW_CONTROL = {"return", "raise", "break", "continue"}
545 STATEMENT = {
546     syms.if_stmt,
547     syms.while_stmt,
548     syms.for_stmt,
549     syms.try_stmt,
550     syms.except_clause,
551     syms.with_stmt,
552     syms.funcdef,
553     syms.classdef,
554 }
555 STANDALONE_COMMENT = 153
556 LOGIC_OPERATORS = {"and", "or"}
557 COMPARATORS = {
558     token.LESS,
559     token.GREATER,
560     token.EQEQUAL,
561     token.NOTEQUAL,
562     token.LESSEQUAL,
563     token.GREATEREQUAL,
564 }
565 MATH_OPERATORS = {
566     token.VBAR,
567     token.CIRCUMFLEX,
568     token.AMPER,
569     token.LEFTSHIFT,
570     token.RIGHTSHIFT,
571     token.PLUS,
572     token.MINUS,
573     token.STAR,
574     token.SLASH,
575     token.DOUBLESLASH,
576     token.PERCENT,
577     token.AT,
578     token.TILDE,
579     token.DOUBLESTAR,
580 }
581 STARS = {token.STAR, token.DOUBLESTAR}
582 VARARGS_PARENTS = {
583     syms.arglist,
584     syms.argument,  # double star in arglist
585     syms.trailer,  # single argument to call
586     syms.typedargslist,
587     syms.varargslist,  # lambdas
588 }
589 UNPACKING_PARENTS = {
590     syms.atom,  # single element of a list or set literal
591     syms.dictsetmaker,
592     syms.listmaker,
593     syms.testlist_gexp,
594 }
595 TEST_DESCENDANTS = {
596     syms.test,
597     syms.lambdef,
598     syms.or_test,
599     syms.and_test,
600     syms.not_test,
601     syms.comparison,
602     syms.star_expr,
603     syms.expr,
604     syms.xor_expr,
605     syms.and_expr,
606     syms.shift_expr,
607     syms.arith_expr,
608     syms.trailer,
609     syms.term,
610     syms.power,
611 }
612 ASSIGNMENTS = {
613     "=",
614     "+=",
615     "-=",
616     "*=",
617     "@=",
618     "/=",
619     "%=",
620     "&=",
621     "|=",
622     "^=",
623     "<<=",
624     ">>=",
625     "**=",
626     "//=",
627 }
628 COMPREHENSION_PRIORITY = 20
629 COMMA_PRIORITY = 18
630 TERNARY_PRIORITY = 16
631 LOGIC_PRIORITY = 14
632 STRING_PRIORITY = 12
633 COMPARATOR_PRIORITY = 10
634 MATH_PRIORITIES = {
635     token.VBAR: 8,
636     token.CIRCUMFLEX: 7,
637     token.AMPER: 6,
638     token.LEFTSHIFT: 5,
639     token.RIGHTSHIFT: 5,
640     token.PLUS: 4,
641     token.MINUS: 4,
642     token.STAR: 3,
643     token.SLASH: 3,
644     token.DOUBLESLASH: 3,
645     token.PERCENT: 3,
646     token.AT: 3,
647     token.TILDE: 2,
648     token.DOUBLESTAR: 1,
649 }
650
651
652 @dataclass
653 class BracketTracker:
654     """Keeps track of brackets on a line."""
655
656     depth: int = 0
657     bracket_match: Dict[Tuple[Depth, NodeType], Leaf] = Factory(dict)
658     delimiters: Dict[LeafID, Priority] = Factory(dict)
659     previous: Optional[Leaf] = None
660     _for_loop_variable: int = 0
661     _lambda_arguments: int = 0
662
663     def mark(self, leaf: Leaf) -> None:
664         """Mark `leaf` with bracket-related metadata. Keep track of delimiters.
665
666         All leaves receive an int `bracket_depth` field that stores how deep
667         within brackets a given leaf is. 0 means there are no enclosing brackets
668         that started on this line.
669
670         If a leaf is itself a closing bracket, it receives an `opening_bracket`
671         field that it forms a pair with. This is a one-directional link to
672         avoid reference cycles.
673
674         If a leaf is a delimiter (a token on which Black can split the line if
675         needed) and it's on depth 0, its `id()` is stored in the tracker's
676         `delimiters` field.
677         """
678         if leaf.type == token.COMMENT:
679             return
680
681         self.maybe_decrement_after_for_loop_variable(leaf)
682         self.maybe_decrement_after_lambda_arguments(leaf)
683         if leaf.type in CLOSING_BRACKETS:
684             self.depth -= 1
685             opening_bracket = self.bracket_match.pop((self.depth, leaf.type))
686             leaf.opening_bracket = opening_bracket
687         leaf.bracket_depth = self.depth
688         if self.depth == 0:
689             delim = is_split_before_delimiter(leaf, self.previous)
690             if delim and self.previous is not None:
691                 self.delimiters[id(self.previous)] = delim
692             else:
693                 delim = is_split_after_delimiter(leaf, self.previous)
694                 if delim:
695                     self.delimiters[id(leaf)] = delim
696         if leaf.type in OPENING_BRACKETS:
697             self.bracket_match[self.depth, BRACKET[leaf.type]] = leaf
698             self.depth += 1
699         self.previous = leaf
700         self.maybe_increment_lambda_arguments(leaf)
701         self.maybe_increment_for_loop_variable(leaf)
702
703     def any_open_brackets(self) -> bool:
704         """Return True if there is an yet unmatched open bracket on the line."""
705         return bool(self.bracket_match)
706
707     def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> int:
708         """Return the highest priority of a delimiter found on the line.
709
710         Values are consistent with what `is_split_*_delimiter()` return.
711         Raises ValueError on no delimiters.
712         """
713         return max(v for k, v in self.delimiters.items() if k not in exclude)
714
715     def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
716         """In a for loop, or comprehension, the variables are often unpacks.
717
718         To avoid splitting on the comma in this situation, increase the depth of
719         tokens between `for` and `in`.
720         """
721         if leaf.type == token.NAME and leaf.value == "for":
722             self.depth += 1
723             self._for_loop_variable += 1
724             return True
725
726         return False
727
728     def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
729         """See `maybe_increment_for_loop_variable` above for explanation."""
730         if self._for_loop_variable and leaf.type == token.NAME and leaf.value == "in":
731             self.depth -= 1
732             self._for_loop_variable -= 1
733             return True
734
735         return False
736
737     def maybe_increment_lambda_arguments(self, leaf: Leaf) -> bool:
738         """In a lambda expression, there might be more than one argument.
739
740         To avoid splitting on the comma in this situation, increase the depth of
741         tokens between `lambda` and `:`.
742         """
743         if leaf.type == token.NAME and leaf.value == "lambda":
744             self.depth += 1
745             self._lambda_arguments += 1
746             return True
747
748         return False
749
750     def maybe_decrement_after_lambda_arguments(self, leaf: Leaf) -> bool:
751         """See `maybe_increment_lambda_arguments` above for explanation."""
752         if self._lambda_arguments and leaf.type == token.COLON:
753             self.depth -= 1
754             self._lambda_arguments -= 1
755             return True
756
757         return False
758
759     def get_open_lsqb(self) -> Optional[Leaf]:
760         """Return the most recent opening square bracket (if any)."""
761         return self.bracket_match.get((self.depth - 1, token.RSQB))
762
763
764 @dataclass
765 class Line:
766     """Holds leaves and comments. Can be printed with `str(line)`."""
767
768     depth: int = 0
769     leaves: List[Leaf] = Factory(list)
770     comments: List[Tuple[Index, Leaf]] = Factory(list)
771     bracket_tracker: BracketTracker = Factory(BracketTracker)
772     inside_brackets: bool = False
773
774     def append(self, leaf: Leaf, preformatted: bool = False) -> None:
775         """Add a new `leaf` to the end of the line.
776
777         Unless `preformatted` is True, the `leaf` will receive a new consistent
778         whitespace prefix and metadata applied by :class:`BracketTracker`.
779         Trailing commas are maybe removed, unpacked for loop variables are
780         demoted from being delimiters.
781
782         Inline comments are put aside.
783         """
784         has_value = leaf.type in BRACKETS or bool(leaf.value.strip())
785         if not has_value:
786             return
787
788         if token.COLON == leaf.type and self.is_class_paren_empty:
789             del self.leaves[-2:]
790         if self.leaves and not preformatted:
791             # Note: at this point leaf.prefix should be empty except for
792             # imports, for which we only preserve newlines.
793             leaf.prefix += whitespace(
794                 leaf, complex_subscript=self.is_complex_subscript(leaf)
795             )
796         if self.inside_brackets or not preformatted:
797             self.bracket_tracker.mark(leaf)
798             self.maybe_remove_trailing_comma(leaf)
799         if not self.append_comment(leaf):
800             self.leaves.append(leaf)
801
802     def append_safe(self, leaf: Leaf, preformatted: bool = False) -> None:
803         """Like :func:`append()` but disallow invalid standalone comment structure.
804
805         Raises ValueError when any `leaf` is appended after a standalone comment
806         or when a standalone comment is not the first leaf on the line.
807         """
808         if self.bracket_tracker.depth == 0:
809             if self.is_comment:
810                 raise ValueError("cannot append to standalone comments")
811
812             if self.leaves and leaf.type == STANDALONE_COMMENT:
813                 raise ValueError(
814                     "cannot append standalone comments to a populated line"
815                 )
816
817         self.append(leaf, preformatted=preformatted)
818
819     @property
820     def is_comment(self) -> bool:
821         """Is this line a standalone comment?"""
822         return len(self.leaves) == 1 and self.leaves[0].type == STANDALONE_COMMENT
823
824     @property
825     def is_decorator(self) -> bool:
826         """Is this line a decorator?"""
827         return bool(self) and self.leaves[0].type == token.AT
828
829     @property
830     def is_import(self) -> bool:
831         """Is this an import line?"""
832         return bool(self) and is_import(self.leaves[0])
833
834     @property
835     def is_class(self) -> bool:
836         """Is this line a class definition?"""
837         return (
838             bool(self)
839             and self.leaves[0].type == token.NAME
840             and self.leaves[0].value == "class"
841         )
842
843     @property
844     def is_stub_class(self) -> bool:
845         """Is this line a class definition with a body consisting only of "..."?"""
846         return (
847             self.is_class
848             and self.leaves[-3:] == [Leaf(token.DOT, ".") for _ in range(3)]
849         )
850
851     @property
852     def is_def(self) -> bool:
853         """Is this a function definition? (Also returns True for async defs.)"""
854         try:
855             first_leaf = self.leaves[0]
856         except IndexError:
857             return False
858
859         try:
860             second_leaf: Optional[Leaf] = self.leaves[1]
861         except IndexError:
862             second_leaf = None
863         return (
864             (first_leaf.type == token.NAME and first_leaf.value == "def")
865             or (
866                 first_leaf.type == token.ASYNC
867                 and second_leaf is not None
868                 and second_leaf.type == token.NAME
869                 and second_leaf.value == "def"
870             )
871         )
872
873     @property
874     def is_flow_control(self) -> bool:
875         """Is this line a flow control statement?
876
877         Those are `return`, `raise`, `break`, and `continue`.
878         """
879         return (
880             bool(self)
881             and self.leaves[0].type == token.NAME
882             and self.leaves[0].value in FLOW_CONTROL
883         )
884
885     @property
886     def is_yield(self) -> bool:
887         """Is this line a yield statement?"""
888         return (
889             bool(self)
890             and self.leaves[0].type == token.NAME
891             and self.leaves[0].value == "yield"
892         )
893
894     @property
895     def is_class_paren_empty(self) -> bool:
896         """Is this a class with no base classes but using parentheses?
897
898         Those are unnecessary and should be removed.
899         """
900         return (
901             bool(self)
902             and len(self.leaves) == 4
903             and self.is_class
904             and self.leaves[2].type == token.LPAR
905             and self.leaves[2].value == "("
906             and self.leaves[3].type == token.RPAR
907             and self.leaves[3].value == ")"
908         )
909
910     def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
911         """If so, needs to be split before emitting."""
912         for leaf in self.leaves:
913             if leaf.type == STANDALONE_COMMENT:
914                 if leaf.bracket_depth <= depth_limit:
915                     return True
916
917         return False
918
919     def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
920         """Remove trailing comma if there is one and it's safe."""
921         if not (
922             self.leaves
923             and self.leaves[-1].type == token.COMMA
924             and closing.type in CLOSING_BRACKETS
925         ):
926             return False
927
928         if closing.type == token.RBRACE:
929             self.remove_trailing_comma()
930             return True
931
932         if closing.type == token.RSQB:
933             comma = self.leaves[-1]
934             if comma.parent and comma.parent.type == syms.listmaker:
935                 self.remove_trailing_comma()
936                 return True
937
938         # For parens let's check if it's safe to remove the comma.
939         # Imports are always safe.
940         if self.is_import:
941             self.remove_trailing_comma()
942             return True
943
944         # Otheriwsse, if the trailing one is the only one, we might mistakenly
945         # change a tuple into a different type by removing the comma.
946         depth = closing.bracket_depth + 1
947         commas = 0
948         opening = closing.opening_bracket
949         for _opening_index, leaf in enumerate(self.leaves):
950             if leaf is opening:
951                 break
952
953         else:
954             return False
955
956         for leaf in self.leaves[_opening_index + 1 :]:
957             if leaf is closing:
958                 break
959
960             bracket_depth = leaf.bracket_depth
961             if bracket_depth == depth and leaf.type == token.COMMA:
962                 commas += 1
963                 if leaf.parent and leaf.parent.type == syms.arglist:
964                     commas += 1
965                     break
966
967         if commas > 1:
968             self.remove_trailing_comma()
969             return True
970
971         return False
972
973     def append_comment(self, comment: Leaf) -> bool:
974         """Add an inline or standalone comment to the line."""
975         if (
976             comment.type == STANDALONE_COMMENT
977             and self.bracket_tracker.any_open_brackets()
978         ):
979             comment.prefix = ""
980             return False
981
982         if comment.type != token.COMMENT:
983             return False
984
985         after = len(self.leaves) - 1
986         if after == -1:
987             comment.type = STANDALONE_COMMENT
988             comment.prefix = ""
989             return False
990
991         else:
992             self.comments.append((after, comment))
993             return True
994
995     def comments_after(self, leaf: Leaf, _index: int = -1) -> Iterator[Leaf]:
996         """Generate comments that should appear directly after `leaf`.
997
998         Provide a non-negative leaf `_index` to speed up the function.
999         """
1000         if _index == -1:
1001             for _index, _leaf in enumerate(self.leaves):
1002                 if leaf is _leaf:
1003                     break
1004
1005             else:
1006                 return
1007
1008         for index, comment_after in self.comments:
1009             if _index == index:
1010                 yield comment_after
1011
1012     def remove_trailing_comma(self) -> None:
1013         """Remove the trailing comma and moves the comments attached to it."""
1014         comma_index = len(self.leaves) - 1
1015         for i in range(len(self.comments)):
1016             comment_index, comment = self.comments[i]
1017             if comment_index == comma_index:
1018                 self.comments[i] = (comma_index - 1, comment)
1019         self.leaves.pop()
1020
1021     def is_complex_subscript(self, leaf: Leaf) -> bool:
1022         """Return True iff `leaf` is part of a slice with non-trivial exprs."""
1023         open_lsqb = (
1024             leaf if leaf.type == token.LSQB else self.bracket_tracker.get_open_lsqb()
1025         )
1026         if open_lsqb is None:
1027             return False
1028
1029         subscript_start = open_lsqb.next_sibling
1030         if (
1031             isinstance(subscript_start, Node)
1032             and subscript_start.type == syms.subscriptlist
1033         ):
1034             subscript_start = child_towards(subscript_start, leaf)
1035         return (
1036             subscript_start is not None
1037             and any(n.type in TEST_DESCENDANTS for n in subscript_start.pre_order())
1038         )
1039
1040     def __str__(self) -> str:
1041         """Render the line."""
1042         if not self:
1043             return "\n"
1044
1045         indent = "    " * self.depth
1046         leaves = iter(self.leaves)
1047         first = next(leaves)
1048         res = f"{first.prefix}{indent}{first.value}"
1049         for leaf in leaves:
1050             res += str(leaf)
1051         for _, comment in self.comments:
1052             res += str(comment)
1053         return res + "\n"
1054
1055     def __bool__(self) -> bool:
1056         """Return True if the line has leaves or comments."""
1057         return bool(self.leaves or self.comments)
1058
1059
1060 class UnformattedLines(Line):
1061     """Just like :class:`Line` but stores lines which aren't reformatted."""
1062
1063     def append(self, leaf: Leaf, preformatted: bool = True) -> None:
1064         """Just add a new `leaf` to the end of the lines.
1065
1066         The `preformatted` argument is ignored.
1067
1068         Keeps track of indentation `depth`, which is useful when the user
1069         says `# fmt: on`. Otherwise, doesn't do anything with the `leaf`.
1070         """
1071         try:
1072             list(generate_comments(leaf))
1073         except FormatOn as f_on:
1074             self.leaves.append(f_on.leaf_from_consumed(leaf))
1075             raise
1076
1077         self.leaves.append(leaf)
1078         if leaf.type == token.INDENT:
1079             self.depth += 1
1080         elif leaf.type == token.DEDENT:
1081             self.depth -= 1
1082
1083     def __str__(self) -> str:
1084         """Render unformatted lines from leaves which were added with `append()`.
1085
1086         `depth` is not used for indentation in this case.
1087         """
1088         if not self:
1089             return "\n"
1090
1091         res = ""
1092         for leaf in self.leaves:
1093             res += str(leaf)
1094         return res
1095
1096     def append_comment(self, comment: Leaf) -> bool:
1097         """Not implemented in this class. Raises `NotImplementedError`."""
1098         raise NotImplementedError("Unformatted lines don't store comments separately.")
1099
1100     def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
1101         """Does nothing and returns False."""
1102         return False
1103
1104     def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
1105         """Does nothing and returns False."""
1106         return False
1107
1108
1109 @dataclass
1110 class EmptyLineTracker:
1111     """Provides a stateful method that returns the number of potential extra
1112     empty lines needed before and after the currently processed line.
1113
1114     Note: this tracker works on lines that haven't been split yet.  It assumes
1115     the prefix of the first leaf consists of optional newlines.  Those newlines
1116     are consumed by `maybe_empty_lines()` and included in the computation.
1117     """
1118     is_pyi: bool = False
1119     previous_line: Optional[Line] = None
1120     previous_after: int = 0
1121     previous_defs: List[int] = Factory(list)
1122
1123     def maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1124         """Return the number of extra empty lines before and after the `current_line`.
1125
1126         This is for separating `def`, `async def` and `class` with extra empty
1127         lines (two on module-level), as well as providing an extra empty line
1128         after flow control keywords to make them more prominent.
1129         """
1130         if isinstance(current_line, UnformattedLines):
1131             return 0, 0
1132
1133         before, after = self._maybe_empty_lines(current_line)
1134         before -= self.previous_after
1135         self.previous_after = after
1136         self.previous_line = current_line
1137         return before, after
1138
1139     def _maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
1140         max_allowed = 1
1141         if current_line.depth == 0:
1142             max_allowed = 1 if self.is_pyi else 2
1143         if current_line.leaves:
1144             # Consume the first leaf's extra newlines.
1145             first_leaf = current_line.leaves[0]
1146             before = first_leaf.prefix.count("\n")
1147             before = min(before, max_allowed)
1148             first_leaf.prefix = ""
1149         else:
1150             before = 0
1151         depth = current_line.depth
1152         while self.previous_defs and self.previous_defs[-1] >= depth:
1153             self.previous_defs.pop()
1154             if self.is_pyi:
1155                 before = 0 if depth else 1
1156             else:
1157                 before = 1 if depth else 2
1158         is_decorator = current_line.is_decorator
1159         if is_decorator or current_line.is_def or current_line.is_class:
1160             if not is_decorator:
1161                 self.previous_defs.append(depth)
1162             if self.previous_line is None:
1163                 # Don't insert empty lines before the first line in the file.
1164                 return 0, 0
1165
1166             if self.previous_line.is_decorator:
1167                 return 0, 0
1168
1169             if (
1170                 self.previous_line.is_comment
1171                 and self.previous_line.depth == current_line.depth
1172                 and before == 0
1173             ):
1174                 return 0, 0
1175
1176             if self.is_pyi:
1177                 if self.previous_line.depth > current_line.depth:
1178                     newlines = 1
1179                 elif current_line.is_class or self.previous_line.is_class:
1180                     if current_line.is_stub_class and self.previous_line.is_stub_class:
1181                         newlines = 0
1182                     else:
1183                         newlines = 1
1184                 else:
1185                     newlines = 0
1186             else:
1187                 newlines = 2
1188             if current_line.depth and newlines:
1189                 newlines -= 1
1190             return newlines, 0
1191
1192         if (
1193             self.previous_line
1194             and self.previous_line.is_import
1195             and not current_line.is_import
1196             and depth == self.previous_line.depth
1197         ):
1198             return (before or 1), 0
1199
1200         return before, 0
1201
1202
1203 @dataclass
1204 class LineGenerator(Visitor[Line]):
1205     """Generates reformatted Line objects.  Empty lines are not emitted.
1206
1207     Note: destroys the tree it's visiting by mutating prefixes of its leaves
1208     in ways that will no longer stringify to valid Python code on the tree.
1209     """
1210     is_pyi: bool = False
1211     current_line: Line = Factory(Line)
1212     remove_u_prefix: bool = False
1213
1214     def line(self, indent: int = 0, type: Type[Line] = Line) -> Iterator[Line]:
1215         """Generate a line.
1216
1217         If the line is empty, only emit if it makes sense.
1218         If the line is too long, split it first and then generate.
1219
1220         If any lines were generated, set up a new current_line.
1221         """
1222         if not self.current_line:
1223             if self.current_line.__class__ == type:
1224                 self.current_line.depth += indent
1225             else:
1226                 self.current_line = type(depth=self.current_line.depth + indent)
1227             return  # Line is empty, don't emit. Creating a new one unnecessary.
1228
1229         complete_line = self.current_line
1230         self.current_line = type(depth=complete_line.depth + indent)
1231         yield complete_line
1232
1233     def visit(self, node: LN) -> Iterator[Line]:
1234         """Main method to visit `node` and its children.
1235
1236         Yields :class:`Line` objects.
1237         """
1238         if isinstance(self.current_line, UnformattedLines):
1239             # File contained `# fmt: off`
1240             yield from self.visit_unformatted(node)
1241
1242         else:
1243             yield from super().visit(node)
1244
1245     def visit_default(self, node: LN) -> Iterator[Line]:
1246         """Default `visit_*()` implementation. Recurses to children of `node`."""
1247         if isinstance(node, Leaf):
1248             any_open_brackets = self.current_line.bracket_tracker.any_open_brackets()
1249             try:
1250                 for comment in generate_comments(node):
1251                     if any_open_brackets:
1252                         # any comment within brackets is subject to splitting
1253                         self.current_line.append(comment)
1254                     elif comment.type == token.COMMENT:
1255                         # regular trailing comment
1256                         self.current_line.append(comment)
1257                         yield from self.line()
1258
1259                     else:
1260                         # regular standalone comment
1261                         yield from self.line()
1262
1263                         self.current_line.append(comment)
1264                         yield from self.line()
1265
1266             except FormatOff as f_off:
1267                 f_off.trim_prefix(node)
1268                 yield from self.line(type=UnformattedLines)
1269                 yield from self.visit(node)
1270
1271             except FormatOn as f_on:
1272                 # This only happens here if somebody says "fmt: on" multiple
1273                 # times in a row.
1274                 f_on.trim_prefix(node)
1275                 yield from self.visit_default(node)
1276
1277             else:
1278                 normalize_prefix(node, inside_brackets=any_open_brackets)
1279                 if node.type == token.STRING:
1280                     normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
1281                     normalize_string_quotes(node)
1282                 if node.type not in WHITESPACE:
1283                     self.current_line.append(node)
1284         yield from super().visit_default(node)
1285
1286     def visit_INDENT(self, node: Node) -> Iterator[Line]:
1287         """Increase indentation level, maybe yield a line."""
1288         # In blib2to3 INDENT never holds comments.
1289         yield from self.line(+1)
1290         yield from self.visit_default(node)
1291
1292     def visit_DEDENT(self, node: Node) -> Iterator[Line]:
1293         """Decrease indentation level, maybe yield a line."""
1294         # The current line might still wait for trailing comments.  At DEDENT time
1295         # there won't be any (they would be prefixes on the preceding NEWLINE).
1296         # Emit the line then.
1297         yield from self.line()
1298
1299         # While DEDENT has no value, its prefix may contain standalone comments
1300         # that belong to the current indentation level.  Get 'em.
1301         yield from self.visit_default(node)
1302
1303         # Finally, emit the dedent.
1304         yield from self.line(-1)
1305
1306     def visit_stmt(
1307         self, node: Node, keywords: Set[str], parens: Set[str]
1308     ) -> Iterator[Line]:
1309         """Visit a statement.
1310
1311         This implementation is shared for `if`, `while`, `for`, `try`, `except`,
1312         `def`, `with`, `class`, `assert` and assignments.
1313
1314         The relevant Python language `keywords` for a given statement will be
1315         NAME leaves within it. This methods puts those on a separate line.
1316
1317         `parens` holds a set of string leaf values immeditely after which
1318         invisible parens should be put.
1319         """
1320         normalize_invisible_parens(node, parens_after=parens)
1321         for child in node.children:
1322             if child.type == token.NAME and child.value in keywords:  # type: ignore
1323                 yield from self.line()
1324
1325             yield from self.visit(child)
1326
1327     def visit_suite(self, node: Node) -> Iterator[Line]:
1328         """Visit a suite."""
1329         if self.is_pyi and is_stub_suite(node):
1330             yield from self.visit(node.children[2])
1331         else:
1332             yield from self.visit_default(node)
1333
1334     def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
1335         """Visit a statement without nested statements."""
1336         is_suite_like = node.parent and node.parent.type in STATEMENT
1337         if is_suite_like:
1338             if self.is_pyi and is_stub_body(node):
1339                 yield from self.visit_default(node)
1340             else:
1341                 yield from self.line(+1)
1342                 yield from self.visit_default(node)
1343                 yield from self.line(-1)
1344
1345         else:
1346             if not self.is_pyi or not node.parent or not is_stub_suite(node.parent):
1347                 yield from self.line()
1348             yield from self.visit_default(node)
1349
1350     def visit_async_stmt(self, node: Node) -> Iterator[Line]:
1351         """Visit `async def`, `async for`, `async with`."""
1352         yield from self.line()
1353
1354         children = iter(node.children)
1355         for child in children:
1356             yield from self.visit(child)
1357
1358             if child.type == token.ASYNC:
1359                 break
1360
1361         internal_stmt = next(children)
1362         for child in internal_stmt.children:
1363             yield from self.visit(child)
1364
1365     def visit_decorators(self, node: Node) -> Iterator[Line]:
1366         """Visit decorators."""
1367         for child in node.children:
1368             yield from self.line()
1369             yield from self.visit(child)
1370
1371     def visit_import_from(self, node: Node) -> Iterator[Line]:
1372         """Visit import_from and maybe put invisible parentheses.
1373
1374         This is separate from `visit_stmt` because import statements don't
1375         support arbitrary atoms and thus handling of parentheses is custom.
1376         """
1377         check_lpar = False
1378         for index, child in enumerate(node.children):
1379             if check_lpar:
1380                 if child.type == token.LPAR:
1381                     # make parentheses invisible
1382                     child.value = ""  # type: ignore
1383                     node.children[-1].value = ""  # type: ignore
1384                 else:
1385                     # insert invisible parentheses
1386                     node.insert_child(index, Leaf(token.LPAR, ""))
1387                     node.append_child(Leaf(token.RPAR, ""))
1388                 break
1389
1390             check_lpar = (
1391                 child.type == token.NAME and child.value == "import"  # type: ignore
1392             )
1393
1394         for child in node.children:
1395             yield from self.visit(child)
1396
1397     def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
1398         """Remove a semicolon and put the other statement on a separate line."""
1399         yield from self.line()
1400
1401     def visit_ENDMARKER(self, leaf: Leaf) -> Iterator[Line]:
1402         """End of file. Process outstanding comments and end with a newline."""
1403         yield from self.visit_default(leaf)
1404         yield from self.line()
1405
1406     def visit_unformatted(self, node: LN) -> Iterator[Line]:
1407         """Used when file contained a `# fmt: off`."""
1408         if isinstance(node, Node):
1409             for child in node.children:
1410                 yield from self.visit(child)
1411
1412         else:
1413             try:
1414                 self.current_line.append(node)
1415             except FormatOn as f_on:
1416                 f_on.trim_prefix(node)
1417                 yield from self.line()
1418                 yield from self.visit(node)
1419
1420             if node.type == token.ENDMARKER:
1421                 # somebody decided not to put a final `# fmt: on`
1422                 yield from self.line()
1423
1424     def __attrs_post_init__(self) -> None:
1425         """You are in a twisty little maze of passages."""
1426         v = self.visit_stmt
1427         Ø: Set[str] = set()
1428         self.visit_assert_stmt = partial(v, keywords={"assert"}, parens={"assert", ","})
1429         self.visit_if_stmt = partial(
1430             v, keywords={"if", "else", "elif"}, parens={"if", "elif"}
1431         )
1432         self.visit_while_stmt = partial(v, keywords={"while", "else"}, parens={"while"})
1433         self.visit_for_stmt = partial(v, keywords={"for", "else"}, parens={"for", "in"})
1434         self.visit_try_stmt = partial(
1435             v, keywords={"try", "except", "else", "finally"}, parens=Ø
1436         )
1437         self.visit_except_clause = partial(v, keywords={"except"}, parens=Ø)
1438         self.visit_with_stmt = partial(v, keywords={"with"}, parens=Ø)
1439         self.visit_funcdef = partial(v, keywords={"def"}, parens=Ø)
1440         self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
1441         self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
1442         self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
1443         self.visit_async_funcdef = self.visit_async_stmt
1444         self.visit_decorated = self.visit_decorators
1445
1446
1447 IMPLICIT_TUPLE = {syms.testlist, syms.testlist_star_expr, syms.exprlist}
1448 BRACKET = {token.LPAR: token.RPAR, token.LSQB: token.RSQB, token.LBRACE: token.RBRACE}
1449 OPENING_BRACKETS = set(BRACKET.keys())
1450 CLOSING_BRACKETS = set(BRACKET.values())
1451 BRACKETS = OPENING_BRACKETS | CLOSING_BRACKETS
1452 ALWAYS_NO_SPACE = CLOSING_BRACKETS | {token.COMMA, STANDALONE_COMMENT}
1453
1454
1455 def whitespace(leaf: Leaf, *, complex_subscript: bool) -> str:  # noqa C901
1456     """Return whitespace prefix if needed for the given `leaf`.
1457
1458     `complex_subscript` signals whether the given leaf is part of a subscription
1459     which has non-trivial arguments, like arithmetic expressions or function calls.
1460     """
1461     NO = ""
1462     SPACE = " "
1463     DOUBLESPACE = "  "
1464     t = leaf.type
1465     p = leaf.parent
1466     v = leaf.value
1467     if t in ALWAYS_NO_SPACE:
1468         return NO
1469
1470     if t == token.COMMENT:
1471         return DOUBLESPACE
1472
1473     assert p is not None, f"INTERNAL ERROR: hand-made leaf without parent: {leaf!r}"
1474     if (
1475         t == token.COLON
1476         and p.type not in {syms.subscript, syms.subscriptlist, syms.sliceop}
1477     ):
1478         return NO
1479
1480     prev = leaf.prev_sibling
1481     if not prev:
1482         prevp = preceding_leaf(p)
1483         if not prevp or prevp.type in OPENING_BRACKETS:
1484             return NO
1485
1486         if t == token.COLON:
1487             if prevp.type == token.COLON:
1488                 return NO
1489
1490             elif prevp.type != token.COMMA and not complex_subscript:
1491                 return NO
1492
1493             return SPACE
1494
1495         if prevp.type == token.EQUAL:
1496             if prevp.parent:
1497                 if prevp.parent.type in {
1498                     syms.arglist, syms.argument, syms.parameters, syms.varargslist
1499                 }:
1500                     return NO
1501
1502                 elif prevp.parent.type == syms.typedargslist:
1503                     # A bit hacky: if the equal sign has whitespace, it means we
1504                     # previously found it's a typed argument.  So, we're using
1505                     # that, too.
1506                     return prevp.prefix
1507
1508         elif prevp.type in STARS:
1509             if is_vararg(prevp, within=VARARGS_PARENTS | UNPACKING_PARENTS):
1510                 return NO
1511
1512         elif prevp.type == token.COLON:
1513             if prevp.parent and prevp.parent.type in {syms.subscript, syms.sliceop}:
1514                 return SPACE if complex_subscript else NO
1515
1516         elif (
1517             prevp.parent
1518             and prevp.parent.type == syms.factor
1519             and prevp.type in MATH_OPERATORS
1520         ):
1521             return NO
1522
1523         elif (
1524             prevp.type == token.RIGHTSHIFT
1525             and prevp.parent
1526             and prevp.parent.type == syms.shift_expr
1527             and prevp.prev_sibling
1528             and prevp.prev_sibling.type == token.NAME
1529             and prevp.prev_sibling.value == "print"  # type: ignore
1530         ):
1531             # Python 2 print chevron
1532             return NO
1533
1534     elif prev.type in OPENING_BRACKETS:
1535         return NO
1536
1537     if p.type in {syms.parameters, syms.arglist}:
1538         # untyped function signatures or calls
1539         if not prev or prev.type != token.COMMA:
1540             return NO
1541
1542     elif p.type == syms.varargslist:
1543         # lambdas
1544         if prev and prev.type != token.COMMA:
1545             return NO
1546
1547     elif p.type == syms.typedargslist:
1548         # typed function signatures
1549         if not prev:
1550             return NO
1551
1552         if t == token.EQUAL:
1553             if prev.type != syms.tname:
1554                 return NO
1555
1556         elif prev.type == token.EQUAL:
1557             # A bit hacky: if the equal sign has whitespace, it means we
1558             # previously found it's a typed argument.  So, we're using that, too.
1559             return prev.prefix
1560
1561         elif prev.type != token.COMMA:
1562             return NO
1563
1564     elif p.type == syms.tname:
1565         # type names
1566         if not prev:
1567             prevp = preceding_leaf(p)
1568             if not prevp or prevp.type != token.COMMA:
1569                 return NO
1570
1571     elif p.type == syms.trailer:
1572         # attributes and calls
1573         if t == token.LPAR or t == token.RPAR:
1574             return NO
1575
1576         if not prev:
1577             if t == token.DOT:
1578                 prevp = preceding_leaf(p)
1579                 if not prevp or prevp.type != token.NUMBER:
1580                     return NO
1581
1582             elif t == token.LSQB:
1583                 return NO
1584
1585         elif prev.type != token.COMMA:
1586             return NO
1587
1588     elif p.type == syms.argument:
1589         # single argument
1590         if t == token.EQUAL:
1591             return NO
1592
1593         if not prev:
1594             prevp = preceding_leaf(p)
1595             if not prevp or prevp.type == token.LPAR:
1596                 return NO
1597
1598         elif prev.type in {token.EQUAL} | STARS:
1599             return NO
1600
1601     elif p.type == syms.decorator:
1602         # decorators
1603         return NO
1604
1605     elif p.type == syms.dotted_name:
1606         if prev:
1607             return NO
1608
1609         prevp = preceding_leaf(p)
1610         if not prevp or prevp.type == token.AT or prevp.type == token.DOT:
1611             return NO
1612
1613     elif p.type == syms.classdef:
1614         if t == token.LPAR:
1615             return NO
1616
1617         if prev and prev.type == token.LPAR:
1618             return NO
1619
1620     elif p.type in {syms.subscript, syms.sliceop}:
1621         # indexing
1622         if not prev:
1623             assert p.parent is not None, "subscripts are always parented"
1624             if p.parent.type == syms.subscriptlist:
1625                 return SPACE
1626
1627             return NO
1628
1629         elif not complex_subscript:
1630             return NO
1631
1632     elif p.type == syms.atom:
1633         if prev and t == token.DOT:
1634             # dots, but not the first one.
1635             return NO
1636
1637     elif p.type == syms.dictsetmaker:
1638         # dict unpacking
1639         if prev and prev.type == token.DOUBLESTAR:
1640             return NO
1641
1642     elif p.type in {syms.factor, syms.star_expr}:
1643         # unary ops
1644         if not prev:
1645             prevp = preceding_leaf(p)
1646             if not prevp or prevp.type in OPENING_BRACKETS:
1647                 return NO
1648
1649             prevp_parent = prevp.parent
1650             assert prevp_parent is not None
1651             if (
1652                 prevp.type == token.COLON
1653                 and prevp_parent.type in {syms.subscript, syms.sliceop}
1654             ):
1655                 return NO
1656
1657             elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
1658                 return NO
1659
1660         elif t == token.NAME or t == token.NUMBER:
1661             return NO
1662
1663     elif p.type == syms.import_from:
1664         if t == token.DOT:
1665             if prev and prev.type == token.DOT:
1666                 return NO
1667
1668         elif t == token.NAME:
1669             if v == "import":
1670                 return SPACE
1671
1672             if prev and prev.type == token.DOT:
1673                 return NO
1674
1675     elif p.type == syms.sliceop:
1676         return NO
1677
1678     return SPACE
1679
1680
1681 def preceding_leaf(node: Optional[LN]) -> Optional[Leaf]:
1682     """Return the first leaf that precedes `node`, if any."""
1683     while node:
1684         res = node.prev_sibling
1685         if res:
1686             if isinstance(res, Leaf):
1687                 return res
1688
1689             try:
1690                 return list(res.leaves())[-1]
1691
1692             except IndexError:
1693                 return None
1694
1695         node = node.parent
1696     return None
1697
1698
1699 def child_towards(ancestor: Node, descendant: LN) -> Optional[LN]:
1700     """Return the child of `ancestor` that contains `descendant`."""
1701     node: Optional[LN] = descendant
1702     while node and node.parent != ancestor:
1703         node = node.parent
1704     return node
1705
1706
1707 def is_split_after_delimiter(leaf: Leaf, previous: Leaf = None) -> int:
1708     """Return the priority of the `leaf` delimiter, given a line break after it.
1709
1710     The delimiter priorities returned here are from those delimiters that would
1711     cause a line break after themselves.
1712
1713     Higher numbers are higher priority.
1714     """
1715     if leaf.type == token.COMMA:
1716         return COMMA_PRIORITY
1717
1718     return 0
1719
1720
1721 def is_split_before_delimiter(leaf: Leaf, previous: Leaf = None) -> int:
1722     """Return the priority of the `leaf` delimiter, given a line before after it.
1723
1724     The delimiter priorities returned here are from those delimiters that would
1725     cause a line break before themselves.
1726
1727     Higher numbers are higher priority.
1728     """
1729     if is_vararg(leaf, within=VARARGS_PARENTS | UNPACKING_PARENTS):
1730         # * and ** might also be MATH_OPERATORS but in this case they are not.
1731         # Don't treat them as a delimiter.
1732         return 0
1733
1734     if (
1735         leaf.type in MATH_OPERATORS
1736         and leaf.parent
1737         and leaf.parent.type not in {syms.factor, syms.star_expr}
1738     ):
1739         return MATH_PRIORITIES[leaf.type]
1740
1741     if leaf.type in COMPARATORS:
1742         return COMPARATOR_PRIORITY
1743
1744     if (
1745         leaf.type == token.STRING
1746         and previous is not None
1747         and previous.type == token.STRING
1748     ):
1749         return STRING_PRIORITY
1750
1751     if (
1752         leaf.type == token.NAME
1753         and leaf.value == "for"
1754         and leaf.parent
1755         and leaf.parent.type in {syms.comp_for, syms.old_comp_for}
1756     ):
1757         return COMPREHENSION_PRIORITY
1758
1759     if (
1760         leaf.type == token.NAME
1761         and leaf.value == "if"
1762         and leaf.parent
1763         and leaf.parent.type in {syms.comp_if, syms.old_comp_if}
1764     ):
1765         return COMPREHENSION_PRIORITY
1766
1767     if (
1768         leaf.type == token.NAME
1769         and leaf.value in {"if", "else"}
1770         and leaf.parent
1771         and leaf.parent.type == syms.test
1772     ):
1773         return TERNARY_PRIORITY
1774
1775     if leaf.type == token.NAME and leaf.value in LOGIC_OPERATORS and leaf.parent:
1776         return LOGIC_PRIORITY
1777
1778     return 0
1779
1780
1781 def generate_comments(leaf: Leaf) -> Iterator[Leaf]:
1782     """Clean the prefix of the `leaf` and generate comments from it, if any.
1783
1784     Comments in lib2to3 are shoved into the whitespace prefix.  This happens
1785     in `pgen2/driver.py:Driver.parse_tokens()`.  This was a brilliant implementation
1786     move because it does away with modifying the grammar to include all the
1787     possible places in which comments can be placed.
1788
1789     The sad consequence for us though is that comments don't "belong" anywhere.
1790     This is why this function generates simple parentless Leaf objects for
1791     comments.  We simply don't know what the correct parent should be.
1792
1793     No matter though, we can live without this.  We really only need to
1794     differentiate between inline and standalone comments.  The latter don't
1795     share the line with any code.
1796
1797     Inline comments are emitted as regular token.COMMENT leaves.  Standalone
1798     are emitted with a fake STANDALONE_COMMENT token identifier.
1799     """
1800     p = leaf.prefix
1801     if not p:
1802         return
1803
1804     if "#" not in p:
1805         return
1806
1807     consumed = 0
1808     nlines = 0
1809     for index, line in enumerate(p.split("\n")):
1810         consumed += len(line) + 1  # adding the length of the split '\n'
1811         line = line.lstrip()
1812         if not line:
1813             nlines += 1
1814         if not line.startswith("#"):
1815             continue
1816
1817         if index == 0 and leaf.type != token.ENDMARKER:
1818             comment_type = token.COMMENT  # simple trailing comment
1819         else:
1820             comment_type = STANDALONE_COMMENT
1821         comment = make_comment(line)
1822         yield Leaf(comment_type, comment, prefix="\n" * nlines)
1823
1824         if comment in {"# fmt: on", "# yapf: enable"}:
1825             raise FormatOn(consumed)
1826
1827         if comment in {"# fmt: off", "# yapf: disable"}:
1828             if comment_type == STANDALONE_COMMENT:
1829                 raise FormatOff(consumed)
1830
1831             prev = preceding_leaf(leaf)
1832             if not prev or prev.type in WHITESPACE:  # standalone comment in disguise
1833                 raise FormatOff(consumed)
1834
1835         nlines = 0
1836
1837
1838 def make_comment(content: str) -> str:
1839     """Return a consistently formatted comment from the given `content` string.
1840
1841     All comments (except for "##", "#!", "#:") should have a single space between
1842     the hash sign and the content.
1843
1844     If `content` didn't start with a hash sign, one is provided.
1845     """
1846     content = content.rstrip()
1847     if not content:
1848         return "#"
1849
1850     if content[0] == "#":
1851         content = content[1:]
1852     if content and content[0] not in " !:#":
1853         content = " " + content
1854     return "#" + content
1855
1856
1857 def split_line(
1858     line: Line, line_length: int, inner: bool = False, py36: bool = False
1859 ) -> Iterator[Line]:
1860     """Split a `line` into potentially many lines.
1861
1862     They should fit in the allotted `line_length` but might not be able to.
1863     `inner` signifies that there were a pair of brackets somewhere around the
1864     current `line`, possibly transitively. This means we can fallback to splitting
1865     by delimiters if the LHS/RHS don't yield any results.
1866
1867     If `py36` is True, splitting may generate syntax that is only compatible
1868     with Python 3.6 and later.
1869     """
1870     if isinstance(line, UnformattedLines) or line.is_comment:
1871         yield line
1872         return
1873
1874     line_str = str(line).strip("\n")
1875     if is_line_short_enough(line, line_length=line_length, line_str=line_str):
1876         yield line
1877         return
1878
1879     split_funcs: List[SplitFunc]
1880     if line.is_def:
1881         split_funcs = [left_hand_split]
1882     elif line.is_import:
1883         split_funcs = [explode_split]
1884     else:
1885
1886         def rhs(line: Line, py36: bool = False) -> Iterator[Line]:
1887             for omit in generate_trailers_to_omit(line, line_length):
1888                 lines = list(right_hand_split(line, py36, omit=omit))
1889                 if is_line_short_enough(lines[0], line_length=line_length):
1890                     yield from lines
1891                     return
1892
1893             # All splits failed, best effort split with no omits.
1894             yield from right_hand_split(line, py36)
1895
1896         if line.inside_brackets:
1897             split_funcs = [delimiter_split, standalone_comment_split, rhs]
1898         else:
1899             split_funcs = [rhs]
1900     for split_func in split_funcs:
1901         # We are accumulating lines in `result` because we might want to abort
1902         # mission and return the original line in the end, or attempt a different
1903         # split altogether.
1904         result: List[Line] = []
1905         try:
1906             for l in split_func(line, py36):
1907                 if str(l).strip("\n") == line_str:
1908                     raise CannotSplit("Split function returned an unchanged result")
1909
1910                 result.extend(
1911                     split_line(l, line_length=line_length, inner=True, py36=py36)
1912                 )
1913         except CannotSplit as cs:
1914             continue
1915
1916         else:
1917             yield from result
1918             break
1919
1920     else:
1921         yield line
1922
1923
1924 def left_hand_split(line: Line, py36: bool = False) -> Iterator[Line]:
1925     """Split line into many lines, starting with the first matching bracket pair.
1926
1927     Note: this usually looks weird, only use this for function definitions.
1928     Prefer RHS otherwise.  This is why this function is not symmetrical with
1929     :func:`right_hand_split` which also handles optional parentheses.
1930     """
1931     head = Line(depth=line.depth)
1932     body = Line(depth=line.depth + 1, inside_brackets=True)
1933     tail = Line(depth=line.depth)
1934     tail_leaves: List[Leaf] = []
1935     body_leaves: List[Leaf] = []
1936     head_leaves: List[Leaf] = []
1937     current_leaves = head_leaves
1938     matching_bracket = None
1939     for leaf in line.leaves:
1940         if (
1941             current_leaves is body_leaves
1942             and leaf.type in CLOSING_BRACKETS
1943             and leaf.opening_bracket is matching_bracket
1944         ):
1945             current_leaves = tail_leaves if body_leaves else head_leaves
1946         current_leaves.append(leaf)
1947         if current_leaves is head_leaves:
1948             if leaf.type in OPENING_BRACKETS:
1949                 matching_bracket = leaf
1950                 current_leaves = body_leaves
1951     # Since body is a new indent level, remove spurious leading whitespace.
1952     if body_leaves:
1953         normalize_prefix(body_leaves[0], inside_brackets=True)
1954     # Build the new lines.
1955     for result, leaves in (head, head_leaves), (body, body_leaves), (tail, tail_leaves):
1956         for leaf in leaves:
1957             result.append(leaf, preformatted=True)
1958             for comment_after in line.comments_after(leaf):
1959                 result.append(comment_after, preformatted=True)
1960     bracket_split_succeeded_or_raise(head, body, tail)
1961     for result in (head, body, tail):
1962         if result:
1963             yield result
1964
1965
1966 def right_hand_split(
1967     line: Line, py36: bool = False, omit: Collection[LeafID] = ()
1968 ) -> Iterator[Line]:
1969     """Split line into many lines, starting with the last matching bracket pair.
1970
1971     If the split was by optional parentheses, attempt splitting without them, too.
1972     `omit` is a collection of closing bracket IDs that shouldn't be considered for
1973     this split.
1974
1975     Note: running this function modifies `bracket_depth` on the leaves of `line`.
1976     """
1977     head = Line(depth=line.depth)
1978     body = Line(depth=line.depth + 1, inside_brackets=True)
1979     tail = Line(depth=line.depth)
1980     tail_leaves: List[Leaf] = []
1981     body_leaves: List[Leaf] = []
1982     head_leaves: List[Leaf] = []
1983     current_leaves = tail_leaves
1984     opening_bracket = None
1985     closing_bracket = None
1986     for leaf in reversed(line.leaves):
1987         if current_leaves is body_leaves:
1988             if leaf is opening_bracket:
1989                 current_leaves = head_leaves if body_leaves else tail_leaves
1990         current_leaves.append(leaf)
1991         if current_leaves is tail_leaves:
1992             if leaf.type in CLOSING_BRACKETS and id(leaf) not in omit:
1993                 opening_bracket = leaf.opening_bracket
1994                 closing_bracket = leaf
1995                 current_leaves = body_leaves
1996     tail_leaves.reverse()
1997     body_leaves.reverse()
1998     head_leaves.reverse()
1999     # Since body is a new indent level, remove spurious leading whitespace.
2000     if body_leaves:
2001         normalize_prefix(body_leaves[0], inside_brackets=True)
2002     elif not head_leaves:
2003         # No `head` and no `body` means the split failed. `tail` has all content.
2004         raise CannotSplit("No brackets found")
2005
2006     # Build the new lines.
2007     for result, leaves in (head, head_leaves), (body, body_leaves), (tail, tail_leaves):
2008         for leaf in leaves:
2009             result.append(leaf, preformatted=True)
2010             for comment_after in line.comments_after(leaf):
2011                 result.append(comment_after, preformatted=True)
2012     bracket_split_succeeded_or_raise(head, body, tail)
2013     assert opening_bracket and closing_bracket
2014     if (
2015         # the opening bracket is an optional paren
2016         opening_bracket.type == token.LPAR
2017         and not opening_bracket.value
2018         # the closing bracket is an optional paren
2019         and closing_bracket.type == token.RPAR
2020         and not closing_bracket.value
2021         # there are no delimiters or standalone comments in the body
2022         and not body.bracket_tracker.delimiters
2023         and not line.contains_standalone_comments(0)
2024         # and it's not an import (optional parens are the only thing we can split
2025         # on in this case; attempting a split without them is a waste of time)
2026         and not line.is_import
2027     ):
2028         omit = {id(closing_bracket), *omit}
2029         try:
2030             yield from right_hand_split(line, py36=py36, omit=omit)
2031             return
2032         except CannotSplit:
2033             pass
2034
2035     ensure_visible(opening_bracket)
2036     ensure_visible(closing_bracket)
2037     for result in (head, body, tail):
2038         if result:
2039             yield result
2040
2041
2042 def bracket_split_succeeded_or_raise(head: Line, body: Line, tail: Line) -> None:
2043     """Raise :exc:`CannotSplit` if the last left- or right-hand split failed.
2044
2045     Do nothing otherwise.
2046
2047     A left- or right-hand split is based on a pair of brackets. Content before
2048     (and including) the opening bracket is left on one line, content inside the
2049     brackets is put on a separate line, and finally content starting with and
2050     following the closing bracket is put on a separate line.
2051
2052     Those are called `head`, `body`, and `tail`, respectively. If the split
2053     produced the same line (all content in `head`) or ended up with an empty `body`
2054     and the `tail` is just the closing bracket, then it's considered failed.
2055     """
2056     tail_len = len(str(tail).strip())
2057     if not body:
2058         if tail_len == 0:
2059             raise CannotSplit("Splitting brackets produced the same line")
2060
2061         elif tail_len < 3:
2062             raise CannotSplit(
2063                 f"Splitting brackets on an empty body to save "
2064                 f"{tail_len} characters is not worth it"
2065             )
2066
2067
2068 def dont_increase_indentation(split_func: SplitFunc) -> SplitFunc:
2069     """Normalize prefix of the first leaf in every line returned by `split_func`.
2070
2071     This is a decorator over relevant split functions.
2072     """
2073
2074     @wraps(split_func)
2075     def split_wrapper(line: Line, py36: bool = False) -> Iterator[Line]:
2076         for l in split_func(line, py36):
2077             normalize_prefix(l.leaves[0], inside_brackets=True)
2078             yield l
2079
2080     return split_wrapper
2081
2082
2083 @dont_increase_indentation
2084 def delimiter_split(line: Line, py36: bool = False) -> Iterator[Line]:
2085     """Split according to delimiters of the highest priority.
2086
2087     If `py36` is True, the split will add trailing commas also in function
2088     signatures that contain `*` and `**`.
2089     """
2090     try:
2091         last_leaf = line.leaves[-1]
2092     except IndexError:
2093         raise CannotSplit("Line empty")
2094
2095     delimiters = line.bracket_tracker.delimiters
2096     try:
2097         delimiter_priority = line.bracket_tracker.max_delimiter_priority(
2098             exclude={id(last_leaf)}
2099         )
2100     except ValueError:
2101         raise CannotSplit("No delimiters found")
2102
2103     current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2104     lowest_depth = sys.maxsize
2105     trailing_comma_safe = True
2106
2107     def append_to_line(leaf: Leaf) -> Iterator[Line]:
2108         """Append `leaf` to current line or to new line if appending impossible."""
2109         nonlocal current_line
2110         try:
2111             current_line.append_safe(leaf, preformatted=True)
2112         except ValueError as ve:
2113             yield current_line
2114
2115             current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2116             current_line.append(leaf)
2117
2118     for index, leaf in enumerate(line.leaves):
2119         yield from append_to_line(leaf)
2120
2121         for comment_after in line.comments_after(leaf, index):
2122             yield from append_to_line(comment_after)
2123
2124         lowest_depth = min(lowest_depth, leaf.bracket_depth)
2125         if (
2126             leaf.bracket_depth == lowest_depth
2127             and is_vararg(leaf, within=VARARGS_PARENTS)
2128         ):
2129             trailing_comma_safe = trailing_comma_safe and py36
2130         leaf_priority = delimiters.get(id(leaf))
2131         if leaf_priority == delimiter_priority:
2132             yield current_line
2133
2134             current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2135     if current_line:
2136         if (
2137             trailing_comma_safe
2138             and delimiter_priority == COMMA_PRIORITY
2139             and current_line.leaves[-1].type != token.COMMA
2140             and current_line.leaves[-1].type != STANDALONE_COMMENT
2141         ):
2142             current_line.append(Leaf(token.COMMA, ","))
2143         yield current_line
2144
2145
2146 @dont_increase_indentation
2147 def standalone_comment_split(line: Line, py36: bool = False) -> Iterator[Line]:
2148     """Split standalone comments from the rest of the line."""
2149     if not line.contains_standalone_comments(0):
2150         raise CannotSplit("Line does not have any standalone comments")
2151
2152     current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2153
2154     def append_to_line(leaf: Leaf) -> Iterator[Line]:
2155         """Append `leaf` to current line or to new line if appending impossible."""
2156         nonlocal current_line
2157         try:
2158             current_line.append_safe(leaf, preformatted=True)
2159         except ValueError as ve:
2160             yield current_line
2161
2162             current_line = Line(depth=line.depth, inside_brackets=line.inside_brackets)
2163             current_line.append(leaf)
2164
2165     for index, leaf in enumerate(line.leaves):
2166         yield from append_to_line(leaf)
2167
2168         for comment_after in line.comments_after(leaf, index):
2169             yield from append_to_line(comment_after)
2170
2171     if current_line:
2172         yield current_line
2173
2174
2175 def explode_split(
2176     line: Line, py36: bool = False, omit: Collection[LeafID] = ()
2177 ) -> Iterator[Line]:
2178     """Split by rightmost bracket and immediately split contents by a delimiter."""
2179     new_lines = list(right_hand_split(line, py36, omit))
2180     if len(new_lines) != 3:
2181         yield from new_lines
2182         return
2183
2184     yield new_lines[0]
2185
2186     try:
2187         yield from delimiter_split(new_lines[1], py36)
2188
2189     except CannotSplit:
2190         yield new_lines[1]
2191
2192     yield new_lines[2]
2193
2194
2195 def is_import(leaf: Leaf) -> bool:
2196     """Return True if the given leaf starts an import statement."""
2197     p = leaf.parent
2198     t = leaf.type
2199     v = leaf.value
2200     return bool(
2201         t == token.NAME
2202         and (
2203             (v == "import" and p and p.type == syms.import_name)
2204             or (v == "from" and p and p.type == syms.import_from)
2205         )
2206     )
2207
2208
2209 def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
2210     """Leave existing extra newlines if not `inside_brackets`. Remove everything
2211     else.
2212
2213     Note: don't use backslashes for formatting or you'll lose your voting rights.
2214     """
2215     if not inside_brackets:
2216         spl = leaf.prefix.split("#")
2217         if "\\" not in spl[0]:
2218             nl_count = spl[-1].count("\n")
2219             if len(spl) > 1:
2220                 nl_count -= 1
2221             leaf.prefix = "\n" * nl_count
2222             return
2223
2224     leaf.prefix = ""
2225
2226
2227 def normalize_string_prefix(leaf: Leaf, remove_u_prefix: bool = False) -> None:
2228     """Make all string prefixes lowercase.
2229
2230     If remove_u_prefix is given, also removes any u prefix from the string.
2231
2232     Note: Mutates its argument.
2233     """
2234     match = re.match(r"^([furbFURB]*)(.*)$", leaf.value, re.DOTALL)
2235     assert match is not None, f"failed to match string {leaf.value!r}"
2236     orig_prefix = match.group(1)
2237     new_prefix = orig_prefix.lower()
2238     if remove_u_prefix:
2239         new_prefix = new_prefix.replace("u", "")
2240     leaf.value = f"{new_prefix}{match.group(2)}"
2241
2242
2243 def normalize_string_quotes(leaf: Leaf) -> None:
2244     """Prefer double quotes but only if it doesn't cause more escaping.
2245
2246     Adds or removes backslashes as appropriate. Doesn't parse and fix
2247     strings nested in f-strings (yet).
2248
2249     Note: Mutates its argument.
2250     """
2251     value = leaf.value.lstrip("furbFURB")
2252     if value[:3] == '"""':
2253         return
2254
2255     elif value[:3] == "'''":
2256         orig_quote = "'''"
2257         new_quote = '"""'
2258     elif value[0] == '"':
2259         orig_quote = '"'
2260         new_quote = "'"
2261     else:
2262         orig_quote = "'"
2263         new_quote = '"'
2264     first_quote_pos = leaf.value.find(orig_quote)
2265     if first_quote_pos == -1:
2266         return  # There's an internal error
2267
2268     prefix = leaf.value[:first_quote_pos]
2269     unescaped_new_quote = re.compile(rf"(([^\\]|^)(\\\\)*){new_quote}")
2270     escaped_new_quote = re.compile(rf"([^\\]|^)\\(\\\\)*{new_quote}")
2271     escaped_orig_quote = re.compile(rf"([^\\]|^)\\(\\\\)*{orig_quote}")
2272     body = leaf.value[first_quote_pos + len(orig_quote) : -len(orig_quote)]
2273     if "r" in prefix.casefold():
2274         if unescaped_new_quote.search(body):
2275             # There's at least one unescaped new_quote in this raw string
2276             # so converting is impossible
2277             return
2278
2279         # Do not introduce or remove backslashes in raw strings
2280         new_body = body
2281     else:
2282         # remove unnecessary quotes
2283         new_body = sub_twice(escaped_new_quote, rf"\1\2{new_quote}", body)
2284         if body != new_body:
2285             # Consider the string without unnecessary quotes as the original
2286             body = new_body
2287             leaf.value = f"{prefix}{orig_quote}{body}{orig_quote}"
2288         new_body = sub_twice(escaped_orig_quote, rf"\1\2{orig_quote}", new_body)
2289         new_body = sub_twice(unescaped_new_quote, rf"\1\\{new_quote}", new_body)
2290     if new_quote == '"""' and new_body[-1] == '"':
2291         # edge case:
2292         new_body = new_body[:-1] + '\\"'
2293     orig_escape_count = body.count("\\")
2294     new_escape_count = new_body.count("\\")
2295     if new_escape_count > orig_escape_count:
2296         return  # Do not introduce more escaping
2297
2298     if new_escape_count == orig_escape_count and orig_quote == '"':
2299         return  # Prefer double quotes
2300
2301     leaf.value = f"{prefix}{new_quote}{new_body}{new_quote}"
2302
2303
2304 def normalize_invisible_parens(node: Node, parens_after: Set[str]) -> None:
2305     """Make existing optional parentheses invisible or create new ones.
2306
2307     `parens_after` is a set of string leaf values immeditely after which parens
2308     should be put.
2309
2310     Standardizes on visible parentheses for single-element tuples, and keeps
2311     existing visible parentheses for other tuples and generator expressions.
2312     """
2313     check_lpar = False
2314     for child in list(node.children):
2315         if check_lpar:
2316             if child.type == syms.atom:
2317                 maybe_make_parens_invisible_in_atom(child)
2318             elif is_one_tuple(child):
2319                 # wrap child in visible parentheses
2320                 lpar = Leaf(token.LPAR, "(")
2321                 rpar = Leaf(token.RPAR, ")")
2322                 index = child.remove() or 0
2323                 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2324             else:
2325                 # wrap child in invisible parentheses
2326                 lpar = Leaf(token.LPAR, "")
2327                 rpar = Leaf(token.RPAR, "")
2328                 index = child.remove() or 0
2329                 node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
2330
2331         check_lpar = isinstance(child, Leaf) and child.value in parens_after
2332
2333
2334 def maybe_make_parens_invisible_in_atom(node: LN) -> bool:
2335     """If it's safe, make the parens in the atom `node` invisible, recusively."""
2336     if (
2337         node.type != syms.atom
2338         or is_empty_tuple(node)
2339         or is_one_tuple(node)
2340         or is_yield(node)
2341         or max_delimiter_priority_in_atom(node) >= COMMA_PRIORITY
2342     ):
2343         return False
2344
2345     first = node.children[0]
2346     last = node.children[-1]
2347     if first.type == token.LPAR and last.type == token.RPAR:
2348         # make parentheses invisible
2349         first.value = ""  # type: ignore
2350         last.value = ""  # type: ignore
2351         if len(node.children) > 1:
2352             maybe_make_parens_invisible_in_atom(node.children[1])
2353         return True
2354
2355     return False
2356
2357
2358 def is_empty_tuple(node: LN) -> bool:
2359     """Return True if `node` holds an empty tuple."""
2360     return (
2361         node.type == syms.atom
2362         and len(node.children) == 2
2363         and node.children[0].type == token.LPAR
2364         and node.children[1].type == token.RPAR
2365     )
2366
2367
2368 def is_one_tuple(node: LN) -> bool:
2369     """Return True if `node` holds a tuple with one element, with or without parens."""
2370     if node.type == syms.atom:
2371         if len(node.children) != 3:
2372             return False
2373
2374         lpar, gexp, rpar = node.children
2375         if not (
2376             lpar.type == token.LPAR
2377             and gexp.type == syms.testlist_gexp
2378             and rpar.type == token.RPAR
2379         ):
2380             return False
2381
2382         return len(gexp.children) == 2 and gexp.children[1].type == token.COMMA
2383
2384     return (
2385         node.type in IMPLICIT_TUPLE
2386         and len(node.children) == 2
2387         and node.children[1].type == token.COMMA
2388     )
2389
2390
2391 def is_yield(node: LN) -> bool:
2392     """Return True if `node` holds a `yield` or `yield from` expression."""
2393     if node.type == syms.yield_expr:
2394         return True
2395
2396     if node.type == token.NAME and node.value == "yield":  # type: ignore
2397         return True
2398
2399     if node.type != syms.atom:
2400         return False
2401
2402     if len(node.children) != 3:
2403         return False
2404
2405     lpar, expr, rpar = node.children
2406     if lpar.type == token.LPAR and rpar.type == token.RPAR:
2407         return is_yield(expr)
2408
2409     return False
2410
2411
2412 def is_vararg(leaf: Leaf, within: Set[NodeType]) -> bool:
2413     """Return True if `leaf` is a star or double star in a vararg or kwarg.
2414
2415     If `within` includes VARARGS_PARENTS, this applies to function signatures.
2416     If `within` includes UNPACKING_PARENTS, it applies to right hand-side
2417     extended iterable unpacking (PEP 3132) and additional unpacking
2418     generalizations (PEP 448).
2419     """
2420     if leaf.type not in STARS or not leaf.parent:
2421         return False
2422
2423     p = leaf.parent
2424     if p.type == syms.star_expr:
2425         # Star expressions are also used as assignment targets in extended
2426         # iterable unpacking (PEP 3132).  See what its parent is instead.
2427         if not p.parent:
2428             return False
2429
2430         p = p.parent
2431
2432     return p.type in within
2433
2434
2435 def is_stub_suite(node: Node) -> bool:
2436     """Return True if `node` is a suite with a stub body."""
2437     if (
2438         len(node.children) != 4
2439         or node.children[0].type != token.NEWLINE
2440         or node.children[1].type != token.INDENT
2441         or node.children[3].type != token.DEDENT
2442     ):
2443         return False
2444
2445     return is_stub_body(node.children[2])
2446
2447
2448 def is_stub_body(node: LN) -> bool:
2449     """Return True if `node` is a simple statement containing an ellipsis."""
2450     if not isinstance(node, Node) or node.type != syms.simple_stmt:
2451         return False
2452
2453     if len(node.children) != 2:
2454         return False
2455
2456     child = node.children[0]
2457     return (
2458         child.type == syms.atom
2459         and len(child.children) == 3
2460         and all(leaf == Leaf(token.DOT, ".") for leaf in child.children)
2461     )
2462
2463
2464 def max_delimiter_priority_in_atom(node: LN) -> int:
2465     """Return maximum delimiter priority inside `node`.
2466
2467     This is specific to atoms with contents contained in a pair of parentheses.
2468     If `node` isn't an atom or there are no enclosing parentheses, returns 0.
2469     """
2470     if node.type != syms.atom:
2471         return 0
2472
2473     first = node.children[0]
2474     last = node.children[-1]
2475     if not (first.type == token.LPAR and last.type == token.RPAR):
2476         return 0
2477
2478     bt = BracketTracker()
2479     for c in node.children[1:-1]:
2480         if isinstance(c, Leaf):
2481             bt.mark(c)
2482         else:
2483             for leaf in c.leaves():
2484                 bt.mark(leaf)
2485     try:
2486         return bt.max_delimiter_priority()
2487
2488     except ValueError:
2489         return 0
2490
2491
2492 def ensure_visible(leaf: Leaf) -> None:
2493     """Make sure parentheses are visible.
2494
2495     They could be invisible as part of some statements (see
2496     :func:`normalize_invible_parens` and :func:`visit_import_from`).
2497     """
2498     if leaf.type == token.LPAR:
2499         leaf.value = "("
2500     elif leaf.type == token.RPAR:
2501         leaf.value = ")"
2502
2503
2504 def is_python36(node: Node) -> bool:
2505     """Return True if the current file is using Python 3.6+ features.
2506
2507     Currently looking for:
2508     - f-strings; and
2509     - trailing commas after * or ** in function signatures and calls.
2510     """
2511     for n in node.pre_order():
2512         if n.type == token.STRING:
2513             value_head = n.value[:2]  # type: ignore
2514             if value_head in {'f"', 'F"', "f'", "F'", "rf", "fr", "RF", "FR"}:
2515                 return True
2516
2517         elif (
2518             n.type in {syms.typedargslist, syms.arglist}
2519             and n.children
2520             and n.children[-1].type == token.COMMA
2521         ):
2522             for ch in n.children:
2523                 if ch.type in STARS:
2524                     return True
2525
2526                 if ch.type == syms.argument:
2527                     for argch in ch.children:
2528                         if argch.type in STARS:
2529                             return True
2530
2531     return False
2532
2533
2534 def generate_trailers_to_omit(line: Line, line_length: int) -> Iterator[Set[LeafID]]:
2535     """Generate sets of closing bracket IDs that should be omitted in a RHS.
2536
2537     Brackets can be omitted if the entire trailer up to and including
2538     a preceding closing bracket fits in one line.
2539
2540     Yielded sets are cumulative (contain results of previous yields, too).  First
2541     set is empty.
2542     """
2543
2544     omit: Set[LeafID] = set()
2545     yield omit
2546
2547     length = 4 * line.depth
2548     opening_bracket = None
2549     closing_bracket = None
2550     optional_brackets: Set[LeafID] = set()
2551     inner_brackets: Set[LeafID] = set()
2552     for index, leaf in enumerate_reversed(line.leaves):
2553         length += len(leaf.prefix) + len(leaf.value)
2554         if length > line_length:
2555             break
2556
2557         comment: Optional[Leaf]
2558         for comment in line.comments_after(leaf, index):
2559             if "\n" in comment.prefix:
2560                 break  # Oops, standalone comment!
2561
2562             length += len(comment.value)
2563         else:
2564             comment = None
2565         if comment is not None:
2566             break  # There was a standalone comment, we can't continue.
2567
2568         optional_brackets.discard(id(leaf))
2569         if opening_bracket:
2570             if leaf is opening_bracket:
2571                 opening_bracket = None
2572             elif leaf.type in CLOSING_BRACKETS:
2573                 inner_brackets.add(id(leaf))
2574         elif leaf.type in CLOSING_BRACKETS:
2575             if not leaf.value:
2576                 optional_brackets.add(id(opening_bracket))
2577                 continue
2578
2579             if index > 0 and line.leaves[index - 1].type in OPENING_BRACKETS:
2580                 # Empty brackets would fail a split so treat them as "inner"
2581                 # brackets (e.g. only add them to the `omit` set if another
2582                 # pair of brackets was good enough.
2583                 inner_brackets.add(id(leaf))
2584                 continue
2585
2586             opening_bracket = leaf.opening_bracket
2587             if closing_bracket:
2588                 omit.add(id(closing_bracket))
2589                 omit.update(inner_brackets)
2590                 inner_brackets.clear()
2591                 yield omit
2592             closing_bracket = leaf
2593
2594
2595 def get_future_imports(node: Node) -> Set[str]:
2596     """Return a set of __future__ imports in the file."""
2597     imports = set()
2598     for child in node.children:
2599         if child.type != syms.simple_stmt:
2600             break
2601         first_child = child.children[0]
2602         if isinstance(first_child, Leaf):
2603             # Continue looking if we see a docstring; otherwise stop.
2604             if (
2605                 len(child.children) == 2
2606                 and first_child.type == token.STRING
2607                 and child.children[1].type == token.NEWLINE
2608             ):
2609                 continue
2610             else:
2611                 break
2612         elif first_child.type == syms.import_from:
2613             module_name = first_child.children[1]
2614             if not isinstance(module_name, Leaf) or module_name.value != "__future__":
2615                 break
2616             for import_from_child in first_child.children[3:]:
2617                 if isinstance(import_from_child, Leaf):
2618                     if import_from_child.type == token.NAME:
2619                         imports.add(import_from_child.value)
2620                 else:
2621                     assert import_from_child.type == syms.import_as_names
2622                     for leaf in import_from_child.children:
2623                         if isinstance(leaf, Leaf) and leaf.type == token.NAME:
2624                             imports.add(leaf.value)
2625         else:
2626             break
2627     return imports
2628
2629
2630 PYTHON_EXTENSIONS = {".py", ".pyi"}
2631 BLACKLISTED_DIRECTORIES = {
2632     "build", "buck-out", "dist", "_build", ".git", ".hg", ".mypy_cache", ".tox", ".venv"
2633 }
2634
2635
2636 def gen_python_files_in_dir(path: Path) -> Iterator[Path]:
2637     """Generate all files under `path` which aren't under BLACKLISTED_DIRECTORIES
2638     and have one of the PYTHON_EXTENSIONS.
2639     """
2640     for child in path.iterdir():
2641         if child.is_dir():
2642             if child.name in BLACKLISTED_DIRECTORIES:
2643                 continue
2644
2645             yield from gen_python_files_in_dir(child)
2646
2647         elif child.is_file() and child.suffix in PYTHON_EXTENSIONS:
2648             yield child
2649
2650
2651 @dataclass
2652 class Report:
2653     """Provides a reformatting counter. Can be rendered with `str(report)`."""
2654     check: bool = False
2655     quiet: bool = False
2656     change_count: int = 0
2657     same_count: int = 0
2658     failure_count: int = 0
2659
2660     def done(self, src: Path, changed: Changed) -> None:
2661         """Increment the counter for successful reformatting. Write out a message."""
2662         if changed is Changed.YES:
2663             reformatted = "would reformat" if self.check else "reformatted"
2664             if not self.quiet:
2665                 out(f"{reformatted} {src}")
2666             self.change_count += 1
2667         else:
2668             if not self.quiet:
2669                 if changed is Changed.NO:
2670                     msg = f"{src} already well formatted, good job."
2671                 else:
2672                     msg = f"{src} wasn't modified on disk since last run."
2673                 out(msg, bold=False)
2674             self.same_count += 1
2675
2676     def failed(self, src: Path, message: str) -> None:
2677         """Increment the counter for failed reformatting. Write out a message."""
2678         err(f"error: cannot format {src}: {message}")
2679         self.failure_count += 1
2680
2681     @property
2682     def return_code(self) -> int:
2683         """Return the exit code that the app should use.
2684
2685         This considers the current state of changed files and failures:
2686         - if there were any failures, return 123;
2687         - if any files were changed and --check is being used, return 1;
2688         - otherwise return 0.
2689         """
2690         # According to http://tldp.org/LDP/abs/html/exitcodes.html starting with
2691         # 126 we have special returncodes reserved by the shell.
2692         if self.failure_count:
2693             return 123
2694
2695         elif self.change_count and self.check:
2696             return 1
2697
2698         return 0
2699
2700     def __str__(self) -> str:
2701         """Render a color report of the current state.
2702
2703         Use `click.unstyle` to remove colors.
2704         """
2705         if self.check:
2706             reformatted = "would be reformatted"
2707             unchanged = "would be left unchanged"
2708             failed = "would fail to reformat"
2709         else:
2710             reformatted = "reformatted"
2711             unchanged = "left unchanged"
2712             failed = "failed to reformat"
2713         report = []
2714         if self.change_count:
2715             s = "s" if self.change_count > 1 else ""
2716             report.append(
2717                 click.style(f"{self.change_count} file{s} {reformatted}", bold=True)
2718             )
2719         if self.same_count:
2720             s = "s" if self.same_count > 1 else ""
2721             report.append(f"{self.same_count} file{s} {unchanged}")
2722         if self.failure_count:
2723             s = "s" if self.failure_count > 1 else ""
2724             report.append(
2725                 click.style(f"{self.failure_count} file{s} {failed}", fg="red")
2726             )
2727         return ", ".join(report) + "."
2728
2729
2730 def assert_equivalent(src: str, dst: str) -> None:
2731     """Raise AssertionError if `src` and `dst` aren't equivalent."""
2732
2733     import ast
2734     import traceback
2735
2736     def _v(node: ast.AST, depth: int = 0) -> Iterator[str]:
2737         """Simple visitor generating strings to compare ASTs by content."""
2738         yield f"{'  ' * depth}{node.__class__.__name__}("
2739
2740         for field in sorted(node._fields):
2741             try:
2742                 value = getattr(node, field)
2743             except AttributeError:
2744                 continue
2745
2746             yield f"{'  ' * (depth+1)}{field}="
2747
2748             if isinstance(value, list):
2749                 for item in value:
2750                     if isinstance(item, ast.AST):
2751                         yield from _v(item, depth + 2)
2752
2753             elif isinstance(value, ast.AST):
2754                 yield from _v(value, depth + 2)
2755
2756             else:
2757                 yield f"{'  ' * (depth+2)}{value!r},  # {value.__class__.__name__}"
2758
2759         yield f"{'  ' * depth})  # /{node.__class__.__name__}"
2760
2761     try:
2762         src_ast = ast.parse(src)
2763     except Exception as exc:
2764         major, minor = sys.version_info[:2]
2765         raise AssertionError(
2766             f"cannot use --safe with this file; failed to parse source file "
2767             f"with Python {major}.{minor}'s builtin AST. Re-run with --fast "
2768             f"or stop using deprecated Python 2 syntax. AST error message: {exc}"
2769         )
2770
2771     try:
2772         dst_ast = ast.parse(dst)
2773     except Exception as exc:
2774         log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
2775         raise AssertionError(
2776             f"INTERNAL ERROR: Black produced invalid code: {exc}. "
2777             f"Please report a bug on https://github.com/ambv/black/issues.  "
2778             f"This invalid output might be helpful: {log}"
2779         ) from None
2780
2781     src_ast_str = "\n".join(_v(src_ast))
2782     dst_ast_str = "\n".join(_v(dst_ast))
2783     if src_ast_str != dst_ast_str:
2784         log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst"))
2785         raise AssertionError(
2786             f"INTERNAL ERROR: Black produced code that is not equivalent to "
2787             f"the source.  "
2788             f"Please report a bug on https://github.com/ambv/black/issues.  "
2789             f"This diff might be helpful: {log}"
2790         ) from None
2791
2792
2793 def assert_stable(src: str, dst: str, line_length: int, is_pyi: bool = False) -> None:
2794     """Raise AssertionError if `dst` reformats differently the second time."""
2795     newdst = format_str(dst, line_length=line_length, is_pyi=is_pyi)
2796     if dst != newdst:
2797         log = dump_to_file(
2798             diff(src, dst, "source", "first pass"),
2799             diff(dst, newdst, "first pass", "second pass"),
2800         )
2801         raise AssertionError(
2802             f"INTERNAL ERROR: Black produced different code on the second pass "
2803             f"of the formatter.  "
2804             f"Please report a bug on https://github.com/ambv/black/issues.  "
2805             f"This diff might be helpful: {log}"
2806         ) from None
2807
2808
2809 def dump_to_file(*output: str) -> str:
2810     """Dump `output` to a temporary file. Return path to the file."""
2811     import tempfile
2812
2813     with tempfile.NamedTemporaryFile(
2814         mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
2815     ) as f:
2816         for lines in output:
2817             f.write(lines)
2818             if lines and lines[-1] != "\n":
2819                 f.write("\n")
2820     return f.name
2821
2822
2823 def diff(a: str, b: str, a_name: str, b_name: str) -> str:
2824     """Return a unified diff string between strings `a` and `b`."""
2825     import difflib
2826
2827     a_lines = [line + "\n" for line in a.split("\n")]
2828     b_lines = [line + "\n" for line in b.split("\n")]
2829     return "".join(
2830         difflib.unified_diff(a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5)
2831     )
2832
2833
2834 def cancel(tasks: List[asyncio.Task]) -> None:
2835     """asyncio signal handler that cancels all `tasks` and reports to stderr."""
2836     err("Aborted!")
2837     for task in tasks:
2838         task.cancel()
2839
2840
2841 def shutdown(loop: BaseEventLoop) -> None:
2842     """Cancel all pending tasks on `loop`, wait for them, and close the loop."""
2843     try:
2844         # This part is borrowed from asyncio/runners.py in Python 3.7b2.
2845         to_cancel = [task for task in asyncio.Task.all_tasks(loop) if not task.done()]
2846         if not to_cancel:
2847             return
2848
2849         for task in to_cancel:
2850             task.cancel()
2851         loop.run_until_complete(
2852             asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
2853         )
2854     finally:
2855         # `concurrent.futures.Future` objects cannot be cancelled once they
2856         # are already running. There might be some when the `shutdown()` happened.
2857         # Silence their logger's spew about the event loop being closed.
2858         cf_logger = logging.getLogger("concurrent.futures")
2859         cf_logger.setLevel(logging.CRITICAL)
2860         loop.close()
2861
2862
2863 def sub_twice(regex: Pattern[str], replacement: str, original: str) -> str:
2864     """Replace `regex` with `replacement` twice on `original`.
2865
2866     This is used by string normalization to perform replaces on
2867     overlapping matches.
2868     """
2869     return regex.sub(replacement, regex.sub(replacement, original))
2870
2871
2872 def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
2873     """Like `reversed(enumerate(sequence))` if that were possible."""
2874     index = len(sequence) - 1
2875     for element in reversed(sequence):
2876         yield (index, element)
2877         index -= 1
2878
2879
2880 def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
2881     """Return True if `line` is no longer than `line_length`.
2882
2883     Uses the provided `line_str` rendering, if any, otherwise computes a new one.
2884     """
2885     if not line_str:
2886         line_str = str(line).strip("\n")
2887     return (
2888         len(line_str) <= line_length
2889         and "\n" not in line_str  # multiline strings
2890         and not line.contains_standalone_comments()
2891     )
2892
2893
2894 CACHE_DIR = Path(user_cache_dir("black", version=__version__))
2895
2896
2897 def get_cache_file(line_length: int) -> Path:
2898     return CACHE_DIR / f"cache.{line_length}.pickle"
2899
2900
2901 def read_cache(line_length: int) -> Cache:
2902     """Read the cache if it exists and is well formed.
2903
2904     If it is not well formed, the call to write_cache later should resolve the issue.
2905     """
2906     cache_file = get_cache_file(line_length)
2907     if not cache_file.exists():
2908         return {}
2909
2910     with cache_file.open("rb") as fobj:
2911         try:
2912             cache: Cache = pickle.load(fobj)
2913         except pickle.UnpicklingError:
2914             return {}
2915
2916     return cache
2917
2918
2919 def get_cache_info(path: Path) -> CacheInfo:
2920     """Return the information used to check if a file is already formatted or not."""
2921     stat = path.stat()
2922     return stat.st_mtime, stat.st_size
2923
2924
2925 def filter_cached(
2926     cache: Cache, sources: Iterable[Path]
2927 ) -> Tuple[List[Path], List[Path]]:
2928     """Split a list of paths into two.
2929
2930     The first list contains paths of files that modified on disk or are not in the
2931     cache. The other list contains paths to non-modified files.
2932     """
2933     todo, done = [], []
2934     for src in sources:
2935         src = src.resolve()
2936         if cache.get(src) != get_cache_info(src):
2937             todo.append(src)
2938         else:
2939             done.append(src)
2940     return todo, done
2941
2942
2943 def write_cache(cache: Cache, sources: List[Path], line_length: int) -> None:
2944     """Update the cache file."""
2945     cache_file = get_cache_file(line_length)
2946     try:
2947         if not CACHE_DIR.exists():
2948             CACHE_DIR.mkdir(parents=True)
2949         new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
2950         with cache_file.open("wb") as fobj:
2951             pickle.dump(new_cache, fobj, protocol=pickle.HIGHEST_PROTOCOL)
2952     except OSError:
2953         pass
2954
2955
2956 if __name__ == "__main__":
2957     main()