+import ast
import asyncio
-from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
+from contextlib import contextmanager
from datetime import datetime
from enum import Enum
from functools import lru_cache, partial, wraps
import os
from pathlib import Path
import pickle
-import re
+import regex as re
import signal
import sys
import tempfile
import tokenize
+import traceback
from typing import (
Any,
Callable,
import click
import toml
from typed_ast import ast3, ast27
+from pathspec import PathSpec
# lib2to3 fork
from blib2to3.pytree import Node, Leaf, type_repr
from blib2to3.pgen2.grammar import Grammar
from blib2to3.pgen2.parse import ParseError
+from _black_version import version as __version__
-__version__ = "19.3b0"
DEFAULT_LINE_LENGTH = 88
-DEFAULT_EXCLUDES = (
- r"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|_build|buck-out|build|dist)/"
-)
+DEFAULT_EXCLUDES = r"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|\.svn|_build|buck-out|build|dist)/" # noqa: B950
DEFAULT_INCLUDES = r"\.pyi?$"
CACHE_DIR = Path(user_cache_dir("black", version=__version__))
# set for every version of python.
ASYNC_IDENTIFIERS = 6
ASYNC_KEYWORDS = 7
+ ASSIGNMENT_EXPRESSIONS = 8
+ POS_ONLY_ARGUMENTS = 9
VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
Feature.TRAILING_COMMA_IN_CALL,
Feature.TRAILING_COMMA_IN_DEF,
Feature.ASYNC_KEYWORDS,
+ Feature.ASSIGNMENT_EXPRESSIONS,
+ Feature.POS_ONLY_ARGUMENTS,
},
}
"--quiet",
is_flag=True,
help=(
- "Don't emit non-error messages to stderr. Errors are still emitted, "
+ "Don't emit non-error messages to stderr. Errors are still emitted; "
"silence those with 2>/dev/null."
),
)
report = Report(check=check, quiet=quiet, verbose=verbose)
root = find_project_root(src)
sources: Set[Path] = set()
+ path_empty(src, quiet, verbose, ctx)
for s in src:
p = Path(s)
if p.is_dir():
sources.update(
- gen_python_files_in_dir(p, root, include_regex, exclude_regex, report)
+ gen_python_files_in_dir(
+ p, root, include_regex, exclude_regex, report, get_gitignore(root)
+ )
)
elif p.is_file() or s == "-":
# if a file was explicitly given, we don't care about its extension
err(f"invalid path: {s}")
if len(sources) == 0:
if verbose or not quiet:
- out("No paths given. Nothing to do 😴")
+ out("No Python files are present to be formatted. Nothing to do 😴")
ctx.exit(0)
if len(sources) == 1:
ctx.exit(report.return_code)
+def path_empty(src: Tuple[str], quiet: bool, verbose: bool, ctx: click.Context) -> None:
+ """
+ Exit if there is no `src` provided for formatting
+ """
+ if not src:
+ if verbose or not quiet:
+ out("No Path provided. Nothing to do 😴")
+ ctx.exit(0)
+
+
def reformat_one(
src: Path, fast: bool, write_back: WriteBack, mode: FileMode, report: "Report"
) -> None:
"""Reformat a single file under `src` without spawning child processes.
- If `quiet` is True, non-error messages are not output. `line_length`,
- `write_back`, `fast` and `pyi` options are passed to
+ `fast`, `write_back`, and `mode` options are passed to
:func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
"""
try:
)
finally:
shutdown(loop)
+ executor.shutdown()
async def schedule_formatting(
write_back: WriteBack,
mode: FileMode,
report: "Report",
- loop: BaseEventLoop,
+ loop: asyncio.AbstractEventLoop,
executor: Executor,
) -> None:
"""Run formatting of `sources` in parallel using the provided `executor`.
(Use ProcessPoolExecutors for actual parallelism.)
- `line_length`, `write_back`, `fast`, and `pyi` options are passed to
+ `write_back`, `fast`, and `mode` options are passed to
:func:`format_file_in_place`.
"""
cache: Cache = {}
If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted
code to the file.
- `line_length` and `fast` options are passed to :func:`format_file_contents`.
+ `mode` and `fast` options are passed to :func:`format_file_contents`.
"""
if src.suffix == ".pyi":
mode = evolve(mode, is_pyi=True)
src_name = f"{src}\t{then} +0000"
dst_name = f"{src}\t{now} +0000"
diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
- if lock:
- lock.acquire()
- try:
+
+ with lock or nullcontext():
f = io.TextIOWrapper(
sys.stdout.buffer,
encoding=encoding,
)
f.write(diff_contents)
f.detach()
- finally:
- if lock:
- lock.release()
+
return True
If `fast` is False, additionally confirm that the reformatted code is
valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
- `line_length` is passed to :func:`format_str`.
+ `mode` is passed to :func:`format_str`.
"""
if src_contents.strip() == "":
raise NothingChanged
def format_str(src_contents: str, *, mode: FileMode) -> FileContent:
"""Reformat a string and return new contents.
- `line_length` determines how many characters per line are allowed.
+ `mode` determines formatting options, such as how many characters per line are
+ allowed.
"""
src_node = lib2to3_parse(src_contents.lstrip(), mode.target_versions)
dst_contents = []
token.DOUBLESTAR,
}
STARS = {token.STAR, token.DOUBLESTAR}
+VARARGS_SPECIALS = STARS | {token.SLASH}
VARARGS_PARENTS = {
syms.arglist,
syms.argument, # double star in arglist
"""Return True if there is an yet unmatched open bracket on the line."""
return bool(self.bracket_match)
- def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> int:
+ def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> Priority:
"""Return the highest priority of a delimiter found on the line.
Values are consistent with what `is_split_*_delimiter()` return.
"""
return max(v for k, v in self.delimiters.items() if k not in exclude)
- def delimiter_count_with_priority(self, priority: int = 0) -> int:
+ def delimiter_count_with_priority(self, priority: Priority = 0) -> int:
"""Return the number of delimiters with the given `priority`.
If no `priority` is passed, defaults to max priority on the line.
Leaf(token.DOT, ".") for _ in range(3)
]
+ @property
+ def is_collection_with_optional_trailing_comma(self) -> bool:
+ """Is this line a collection literal with a trailing comma that's optional?
+
+ Note that the trailing comma in a 1-tuple is not optional.
+ """
+ if not self.leaves or len(self.leaves) < 4:
+ return False
+ # Look for and address a trailing colon.
+ if self.leaves[-1].type == token.COLON:
+ closer = self.leaves[-2]
+ close_index = -2
+ else:
+ closer = self.leaves[-1]
+ close_index = -1
+ if closer.type not in CLOSING_BRACKETS or self.inside_brackets:
+ return False
+ if closer.type == token.RPAR:
+ # Tuples require an extra check, because if there's only
+ # one element in the tuple removing the comma unmakes the
+ # tuple.
+ #
+ # We also check for parens before looking for the trailing
+ # comma because in some cases (eg assigning a dict
+ # literal) the literal gets wrapped in temporary parens
+ # during parsing. This case is covered by the
+ # collections.py test data.
+ opener = closer.opening_bracket
+ for _open_index, leaf in enumerate(self.leaves):
+ if leaf is opener:
+ break
+ else:
+ # Couldn't find the matching opening paren, play it safe.
+ return False
+ commas = 0
+ comma_depth = self.leaves[close_index - 1].bracket_depth
+ for leaf in self.leaves[_open_index + 1 : close_index]:
+ if leaf.bracket_depth == comma_depth and leaf.type == token.COMMA:
+ commas += 1
+ if commas > 1:
+ # We haven't looked yet for the trailing comma because
+ # we might also have caught noop parens.
+ return self.leaves[close_index - 1].type == token.COMMA
+ elif commas == 1:
+ return False # it's either a one-tuple or didn't have a trailing comma
+ if self.leaves[close_index - 1].type in CLOSING_BRACKETS:
+ close_index -= 1
+ closer = self.leaves[close_index]
+ if closer.type == token.RPAR:
+ # TODO: this is a gut feeling. Will we ever see this?
+ return False
+ if self.leaves[close_index - 1].type != token.COMMA:
+ return False
+ return True
+
@property
def is_def(self) -> bool:
"""Is this a function definition? (Also returns True for async defs.)"""
return True
return False
- def contains_inner_type_comments(self) -> bool:
+ def contains_uncollapsable_type_comments(self) -> bool:
ignored_ids = set()
try:
last_leaf = self.leaves[-1]
ignored_ids.add(id(last_leaf))
- if last_leaf.type == token.COMMA:
- # When trailing commas are inserted by Black for consistency, comments
- # after the previous last element are not moved (they don't have to,
- # rendering will still be correct). So we ignore trailing commas.
+ if last_leaf.type == token.COMMA or (
+ last_leaf.type == token.RPAR and not last_leaf.value
+ ):
+ # When trailing commas or optional parens are inserted by Black for
+ # consistency, comments after the previous last element are not moved
+ # (they don't have to, rendering will still be correct). So we ignore
+ # trailing commas and invisible.
last_leaf = self.leaves[-2]
ignored_ids.add(id(last_leaf))
except IndexError:
return False
+ # A type comment is uncollapsable if it is attached to a leaf
+ # that isn't at the end of the line (since that could cause it
+ # to get associated to a different argument) or if there are
+ # comments before it (since that could cause it to get hidden
+ # behind a comment.
+ comment_seen = False
for leaf_id, comments in self.comments.items():
- if leaf_id in ignored_ids:
- continue
-
for comment in comments:
if is_type_comment(comment):
- return True
+ if leaf_id not in ignored_ids or comment_seen:
+ return True
+
+ comment_seen = True
+
+ return False
+
+ def contains_unsplittable_type_ignore(self) -> bool:
+ if not self.leaves:
+ return False
+
+ # If a 'type: ignore' is attached to the end of a line, we
+ # can't split the line, because we can't know which of the
+ # subexpressions the ignore was meant to apply to.
+ #
+ # We only want this to apply to actual physical lines from the
+ # original source, though: we don't want the presence of a
+ # 'type: ignore' at the end of a multiline expression to
+ # justify pushing it all onto one line. Thus we
+ # (unfortunately) need to check the actual source lines and
+ # only report an unsplittable 'type: ignore' if this line was
+ # one line in the original code.
+
+ # Grab the first and last line numbers, skipping generated leaves
+ first_line = next((l.lineno for l in self.leaves if l.lineno != 0), 0)
+ last_line = next((l.lineno for l in reversed(self.leaves) if l.lineno != 0), 0)
+
+ if first_line == last_line:
+ # We look at the last two leaves since a comma or an
+ # invisible paren could have been added at the end of the
+ # line.
+ for node in self.leaves[-2:]:
+ for comment in self.comments.get(id(node), []):
+ if is_type_comment(comment, " ignore"):
+ return True
return False
def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
"""Remove trailing comma if there is one and it's safe."""
+ if not (self.leaves and self.leaves[-1].type == token.COMMA):
+ return False
+ # We remove trailing commas only in the case of importing a
+ # single name from a module.
if not (
self.leaves
+ and self.is_import
+ and len(self.leaves) > 4
and self.leaves[-1].type == token.COMMA
and closing.type in CLOSING_BRACKETS
+ and self.leaves[-4].type == token.NAME
+ and (
+ # regular `from foo import bar,`
+ self.leaves[-4].value == "import"
+ # `from foo import (bar as baz,)
+ or (
+ len(self.leaves) > 6
+ and self.leaves[-6].value == "import"
+ and self.leaves[-3].value == "as"
+ )
+ # `from foo import bar as baz,`
+ or (
+ len(self.leaves) > 5
+ and self.leaves[-5].value == "import"
+ and self.leaves[-3].value == "as"
+ )
+ )
+ and closing.type == token.RPAR
):
return False
- if closing.type == token.RBRACE:
- self.remove_trailing_comma()
- return True
-
- if closing.type == token.RSQB:
- comma = self.leaves[-1]
- if comma.parent and comma.parent.type == syms.listmaker:
- self.remove_trailing_comma()
- return True
-
- # For parens let's check if it's safe to remove the comma.
- # Imports are always safe.
- if self.is_import:
- self.remove_trailing_comma()
- return True
-
- # Otherwise, if the trailing one is the only one, we might mistakenly
- # change a tuple into a different type by removing the comma.
- depth = closing.bracket_depth + 1
- commas = 0
- opening = closing.opening_bracket
- for _opening_index, leaf in enumerate(self.leaves):
- if leaf is opening:
- break
-
- else:
- return False
-
- for leaf in self.leaves[_opening_index + 1 :]:
- if leaf is closing:
- break
-
- bracket_depth = leaf.bracket_depth
- if bracket_depth == depth and leaf.type == token.COMMA:
- commas += 1
- if leaf.parent and leaf.parent.type == syms.arglist:
- commas += 1
- break
-
- if commas > 1:
- self.remove_trailing_comma()
- return True
-
- return False
+ self.remove_trailing_comma()
+ return True
def append_comment(self, comment: Leaf) -> bool:
"""Add an inline or standalone comment to the line."""
comment.prefix = ""
return False
- self.comments.setdefault(id(self.leaves[-1]), []).append(comment)
+ last_leaf = self.leaves[-1]
+ if (
+ last_leaf.type == token.RPAR
+ and not last_leaf.value
+ and last_leaf.parent
+ and len(list(last_leaf.parent.leaves())) <= 3
+ and not is_type_comment(comment)
+ ):
+ # Comments on an optional parens wrapping a single leaf should belong to
+ # the wrapped node except if it's a type comment. Pinning the comment like
+ # this avoids unstable formatting caused by comment migration.
+ if len(self.leaves) < 2:
+ comment.type = STANDALONE_COMMENT
+ comment.prefix = ""
+ return False
+ last_leaf = self.leaves[-2]
+ self.comments.setdefault(id(last_leaf), []).append(comment)
return True
def comments_after(self, leaf: Leaf) -> List[Leaf]:
lines (two on module-level).
"""
before, after = self._maybe_empty_lines(current_line)
- before -= self.previous_after
+ before = (
+ # Black should not insert empty lines at the beginning
+ # of the file
+ 0
+ if self.previous_line is None
+ else before - self.previous_after
+ )
self.previous_after = after
self.previous_line = current_line
return before, after
self.current_line.append(node)
yield from super().visit_default(node)
- def visit_atom(self, node: Node) -> Iterator[Line]:
- # Always make parentheses invisible around a single node, because it should
- # not be needed (except in the case of yield, where removing the parentheses
- # produces a SyntaxError).
- if (
- len(node.children) == 3
- and isinstance(node.children[0], Leaf)
- and node.children[0].type == token.LPAR
- and isinstance(node.children[2], Leaf)
- and node.children[2].type == token.RPAR
- and isinstance(node.children[1], Leaf)
- and not (
- node.children[1].type == token.NAME
- and node.children[1].value == "yield"
- )
- ):
- node.children[0].value = ""
- node.children[2].value = ""
- yield from super().visit_default(node)
-
def visit_INDENT(self, node: Node) -> Iterator[Line]:
"""Increase indentation level, maybe yield a line."""
# In blib2to3 INDENT never holds comments.
yield from self.line()
yield from self.visit_default(leaf)
+ def visit_factor(self, node: Node) -> Iterator[Line]:
+ """Force parentheses between a unary op and a binary power:
+
+ -2 ** 8 -> -(2 ** 8)
+ """
+ _operator, operand = node.children
+ if (
+ operand.type == syms.power
+ and len(operand.children) == 3
+ and operand.children[1].type == token.DOUBLESTAR
+ ):
+ lpar = Leaf(token.LPAR, "(")
+ rpar = Leaf(token.RPAR, ")")
+ index = operand.remove() or 0
+ node.insert_child(index, Node(syms.atom, [lpar, operand, rpar]))
+ yield from self.visit_default(node)
+
def __attrs_post_init__(self) -> None:
"""You are in a twisty little maze of passages."""
v = self.visit_stmt
# that, too.
return prevp.prefix
- elif prevp.type in STARS:
+ elif prevp.type in VARARGS_SPECIALS:
if is_vararg(prevp, within=VARARGS_PARENTS | UNPACKING_PARENTS):
return NO
if not prevp or prevp.type == token.LPAR:
return NO
- elif prev.type in {token.EQUAL} | STARS:
+ elif prev.type in {token.EQUAL} | VARARGS_SPECIALS:
return NO
elif p.type == syms.decorator:
return container
-def is_split_after_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> int:
+def is_split_after_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
"""Return the priority of the `leaf` delimiter, given a line break after it.
The delimiter priorities returned here are from those delimiters that would
return 0
-def is_split_before_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> int:
+def is_split_before_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
"""Return the priority of the `leaf` delimiter, given a line break before it.
The delimiter priorities returned here are from those delimiters that would
line_str = str(line).strip("\n")
if (
- not line.contains_inner_type_comments()
+ not line.contains_uncollapsable_type_comments()
and not line.should_explode
- and is_line_short_enough(line, line_length=line_length, line_str=line_str)
+ and not line.is_collection_with_optional_trailing_comma
+ and (
+ is_line_short_enough(line, line_length=line_length, line_str=line_str)
+ or line.contains_unsplittable_type_ignore()
+ )
):
yield line
return
# All splits failed, best effort split with no omits.
# This mostly happens to multiline strings that are by definition
# reported as not fitting a single line.
- yield from right_hand_split(line, line_length, features=features)
+ # line_length=1 is silly, but somehow produces better formatting
+ # than other things we've tried so far. See #762 and #781.
+ yield from right_hand_split(line, line_length=1, features=features)
if line.inside_brackets:
split_funcs = [delimiter_split, standalone_comment_split, rhs]
if leaves:
# Since body is a new indent level, remove spurious leading whitespace.
normalize_prefix(leaves[0], inside_brackets=True)
- # Ensure a trailing comma for imports, but be careful not to add one after
- # any comments.
- if original.is_import:
+ # Ensure a trailing comma for imports and standalone function arguments, but
+ # be careful not to add one after any comments or within type annotations.
+ no_commas = (
+ original.is_def
+ and opening_bracket.value == "("
+ and not any(l.type == token.COMMA for l in leaves)
+ )
+
+ if original.is_import or no_commas:
for i in range(len(leaves) - 1, -1, -1):
if leaves[i].type == STANDALONE_COMMENT:
continue
)
-def is_type_comment(leaf: Leaf) -> bool:
+def is_type_comment(leaf: Leaf, suffix: str = "") -> bool:
"""Return True if the given leaf is a special comment.
Only returns true for type comments for now."""
t = leaf.type
v = leaf.value
- return t in {token.COMMENT, t == STANDALONE_COMMENT} and v.startswith("# type:")
+ return t in {token.COMMENT, STANDALONE_COMMENT} and v.startswith("# type:" + suffix)
def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
def normalize_invisible_parens(node: Node, parens_after: Set[str]) -> None:
"""Make existing optional parentheses invisible or create new ones.
- `parens_after` is a set of string leaf values immeditely after which parens
+ `parens_after` is a set of string leaf values immediately after which parens
should be put.
Standardizes on visible parentheses for single-element tuples, and keeps
check_lpar = True
if check_lpar:
+ if is_walrus_assignment(child):
+ continue
if child.type == syms.atom:
- if maybe_make_parens_invisible_in_atom(child, parent=node):
+ # Determines if the underlying atom should be surrounded with
+ # invisible params - also makes parens invisible recursively
+ # within the atom and removes repeated invisible parens within
+ # the atom
+ should_surround_with_parens = maybe_make_parens_invisible_in_atom(
+ child, parent=node
+ )
+
+ if should_surround_with_parens:
lpar = Leaf(token.LPAR, "")
rpar = Leaf(token.RPAR, "")
index = child.remove() or 0
def maybe_make_parens_invisible_in_atom(node: LN, parent: LN) -> bool:
"""If it's safe, make the parens in the atom `node` invisible, recursively.
+ Additionally, remove repeated, adjacent invisible parens from the atom `node`
+ as they are redundant.
Returns whether the node should itself be wrapped in invisible parentheses.
first = node.children[0]
last = node.children[-1]
if first.type == token.LPAR and last.type == token.RPAR:
+ middle = node.children[1]
# make parentheses invisible
first.value = "" # type: ignore
last.value = "" # type: ignore
- if len(node.children) > 1:
- maybe_make_parens_invisible_in_atom(node.children[1], parent=parent)
+ maybe_make_parens_invisible_in_atom(middle, parent=parent)
+
+ if is_atom_with_invisible_parens(middle):
+ # Strip the invisible parens from `middle` by replacing
+ # it with the child in-between the invisible parens
+ middle.replace(middle.children[1])
+
return False
return True
+def is_atom_with_invisible_parens(node: LN) -> bool:
+ """Given a `LN`, determines whether it's an atom `node` with invisible
+ parens. Useful in dedupe-ing and normalizing parens.
+ """
+ if isinstance(node, Leaf) or node.type != syms.atom:
+ return False
+
+ first, last = node.children[0], node.children[-1]
+ return (
+ isinstance(first, Leaf)
+ and first.type == token.LPAR
+ and first.value == ""
+ and isinstance(last, Leaf)
+ and last.type == token.RPAR
+ and last.value == ""
+ )
+
+
def is_empty_tuple(node: LN) -> bool:
"""Return True if `node` holds an empty tuple."""
return (
)
+def unwrap_singleton_parenthesis(node: LN) -> Optional[LN]:
+ """Returns `wrapped` if `node` is of the shape ( wrapped ).
+
+ Parenthesis can be optional. Returns None otherwise"""
+ if len(node.children) != 3:
+ return None
+ lpar, wrapped, rpar = node.children
+ if not (lpar.type == token.LPAR and rpar.type == token.RPAR):
+ return None
+
+ return wrapped
+
+
def is_one_tuple(node: LN) -> bool:
"""Return True if `node` holds a tuple with one element, with or without parens."""
if node.type == syms.atom:
- if len(node.children) != 3:
- return False
-
- lpar, gexp, rpar = node.children
- if not (
- lpar.type == token.LPAR
- and gexp.type == syms.testlist_gexp
- and rpar.type == token.RPAR
- ):
+ gexp = unwrap_singleton_parenthesis(node)
+ if gexp is None or gexp.type != syms.testlist_gexp:
return False
return len(gexp.children) == 2 and gexp.children[1].type == token.COMMA
)
+def is_walrus_assignment(node: LN) -> bool:
+ """Return True iff `node` is of the shape ( test := test )"""
+ inner = unwrap_singleton_parenthesis(node)
+ return inner is not None and inner.type == syms.namedexpr_test
+
+
def is_yield(node: LN) -> bool:
"""Return True if `node` holds a `yield` or `yield from` expression."""
if node.type == syms.yield_expr:
extended iterable unpacking (PEP 3132) and additional unpacking
generalizations (PEP 448).
"""
- if leaf.type not in STARS or not leaf.parent:
+ if leaf.type not in VARARGS_SPECIALS or not leaf.parent:
return False
p = leaf.parent
)
-def max_delimiter_priority_in_atom(node: LN) -> int:
+def max_delimiter_priority_in_atom(node: LN) -> Priority:
"""Return maximum delimiter priority inside `node`.
This is specific to atoms with contents contained in a pair of parentheses.
"""Make sure parentheses are visible.
They could be invisible as part of some statements (see
- :func:`normalize_invible_parens` and :func:`visit_import_from`).
+ :func:`normalize_invisible_parens` and :func:`visit_import_from`).
"""
if leaf.type == token.LPAR:
leaf.value = "("
Currently looking for:
- f-strings;
- - underscores in numeric literals; and
- - trailing commas after * or ** in function signatures and calls.
+ - underscores in numeric literals;
+ - trailing commas after * or ** in function signatures and calls;
+ - positional only arguments in function signatures and lambdas;
"""
features: Set[Feature] = set()
for n in node.pre_order():
if "_" in n.value: # type: ignore
features.add(Feature.NUMERIC_UNDERSCORES)
+ elif n.type == token.SLASH:
+ if n.parent and n.parent.type in {syms.typedargslist, syms.arglist}:
+ features.add(Feature.POS_ONLY_ARGUMENTS)
+
+ elif n.type == token.COLONEQUAL:
+ features.add(Feature.ASSIGNMENT_EXPRESSIONS)
+
elif (
n.type in {syms.typedargslist, syms.arglist}
and n.children
return imports
+@lru_cache()
+def get_gitignore(root: Path) -> PathSpec:
+ """ Return a PathSpec matching gitignore content if present."""
+ gitignore = root / ".gitignore"
+ if not gitignore.is_file():
+ return PathSpec.from_lines("gitwildmatch", [])
+ else:
+ return PathSpec.from_lines("gitwildmatch", gitignore.open())
+
+
def gen_python_files_in_dir(
path: Path,
root: Path,
include: Pattern[str],
exclude: Pattern[str],
report: "Report",
+ gitignore: PathSpec,
) -> Iterator[Path]:
"""Generate all files under `path` whose paths are not excluded by the
`exclude` regex, but are included by the `include` regex.
"""
assert root.is_absolute(), f"INTERNAL ERROR: `root` must be absolute but is {root}"
for child in path.iterdir():
+ # First ignore files matching .gitignore
+ if gitignore.match_file(child.as_posix()):
+ report.path_ignored(child, f"matches the .gitignore file content")
+ continue
+
+ # Then ignore with `exclude` option.
try:
normalized_path = "/" + child.resolve().relative_to(root).as_posix()
+ except OSError as e:
+ report.path_ignored(child, f"cannot be read because {e}")
+ continue
except ValueError:
if child.is_symlink():
report.path_ignored(
if child.is_dir():
normalized_path += "/"
+
exclude_match = exclude.search(normalized_path)
if exclude_match and exclude_match.group(0):
report.path_ignored(child, f"matches the --exclude regular expression")
continue
if child.is_dir():
- yield from gen_python_files_in_dir(child, root, include, exclude, report)
+ yield from gen_python_files_in_dir(
+ child, root, include, exclude, report, gitignore
+ )
elif child.is_file():
include_match = include.search(normalized_path)
return ", ".join(report) + "."
-def parse_ast(src: str) -> Union[ast3.AST, ast27.AST]:
- for feature_version in (7, 6):
- try:
- return ast3.parse(src, feature_version=feature_version)
- except SyntaxError:
- continue
+def parse_ast(src: str) -> Union[ast.AST, ast3.AST, ast27.AST]:
+ filename = "<unknown>"
+ if sys.version_info >= (3, 8):
+ # TODO: support Python 4+ ;)
+ for minor_version in range(sys.version_info[1], 4, -1):
+ try:
+ return ast.parse(src, filename, feature_version=(3, minor_version))
+ except SyntaxError:
+ continue
+ else:
+ for feature_version in (7, 6):
+ try:
+ return ast3.parse(src, filename, feature_version=feature_version)
+ except SyntaxError:
+ continue
return ast27.parse(src)
+def _fixup_ast_constants(
+ node: Union[ast.AST, ast3.AST, ast27.AST]
+) -> Union[ast.AST, ast3.AST, ast27.AST]:
+ """Map ast nodes deprecated in 3.8 to Constant."""
+ # casts are required until this is released:
+ # https://github.com/python/typeshed/pull/3142
+ if isinstance(node, (ast.Str, ast3.Str, ast27.Str, ast.Bytes, ast3.Bytes)):
+ return cast(ast.AST, ast.Constant(value=node.s))
+ elif isinstance(node, (ast.Num, ast3.Num, ast27.Num)):
+ return cast(ast.AST, ast.Constant(value=node.n))
+ elif isinstance(node, (ast.NameConstant, ast3.NameConstant)):
+ return cast(ast.AST, ast.Constant(value=node.value))
+ return node
+
+
def assert_equivalent(src: str, dst: str) -> None:
"""Raise AssertionError if `src` and `dst` aren't equivalent."""
- import traceback
-
- def _v(node: Union[ast3.AST, ast27.AST], depth: int = 0) -> Iterator[str]:
+ def _v(node: Union[ast.AST, ast3.AST, ast27.AST], depth: int = 0) -> Iterator[str]:
"""Simple visitor generating strings to compare ASTs by content."""
+
+ node = _fixup_ast_constants(node)
+
yield f"{' ' * depth}{node.__class__.__name__}("
for field in sorted(node._fields):
# TypeIgnore has only one field 'lineno' which breaks this comparison
- if isinstance(node, (ast3.TypeIgnore, ast27.TypeIgnore)):
+ type_ignore_classes = (ast3.TypeIgnore, ast27.TypeIgnore)
+ if sys.version_info >= (3, 8):
+ type_ignore_classes += (ast.TypeIgnore,)
+ if isinstance(node, type_ignore_classes):
break
- # Ignore str kind which is case sensitive / and ignores unicode_literals
- if isinstance(node, (ast3.Str, ast27.Str, ast3.Bytes)) and field == "kind":
- continue
-
try:
value = getattr(node, field)
except AttributeError:
# parentheses and they change the AST.
if (
field == "targets"
- and isinstance(node, (ast3.Delete, ast27.Delete))
- and isinstance(item, (ast3.Tuple, ast27.Tuple))
+ and isinstance(node, (ast.Delete, ast3.Delete, ast27.Delete))
+ and isinstance(item, (ast.Tuple, ast3.Tuple, ast27.Tuple))
):
for item in item.elts:
yield from _v(item, depth + 2)
- elif isinstance(item, (ast3.AST, ast27.AST)):
+ elif isinstance(item, (ast.AST, ast3.AST, ast27.AST)):
yield from _v(item, depth + 2)
- elif isinstance(value, (ast3.AST, ast27.AST)):
+ elif isinstance(value, (ast.AST, ast3.AST, ast27.AST)):
yield from _v(value, depth + 2)
else:
log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
raise AssertionError(
f"INTERNAL ERROR: Black produced invalid code: {exc}. "
- f"Please report a bug on https://github.com/python/black/issues. "
+ f"Please report a bug on https://github.com/psf/black/issues. "
f"This invalid output might be helpful: {log}"
) from None
raise AssertionError(
f"INTERNAL ERROR: Black produced code that is not equivalent to "
f"the source. "
- f"Please report a bug on https://github.com/python/black/issues. "
+ f"Please report a bug on https://github.com/psf/black/issues. "
f"This diff might be helpful: {log}"
) from None
raise AssertionError(
f"INTERNAL ERROR: Black produced different code on the second pass "
f"of the formatter. "
- f"Please report a bug on https://github.com/python/black/issues. "
+ f"Please report a bug on https://github.com/psf/black/issues. "
f"This diff might be helpful: {log}"
) from None
def dump_to_file(*output: str) -> str:
"""Dump `output` to a temporary file. Return path to the file."""
- import tempfile
-
with tempfile.NamedTemporaryFile(
mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
) as f:
return f.name
+@contextmanager
+def nullcontext() -> Iterator[None]:
+ """Return an empty context manager.
+
+ To be used like `nullcontext` in Python 3.7.
+ """
+ yield
+
+
def diff(a: str, b: str, a_name: str, b_name: str) -> str:
"""Return a unified diff string between strings `a` and `b`."""
import difflib
task.cancel()
-def shutdown(loop: BaseEventLoop) -> None:
+def shutdown(loop: asyncio.AbstractEventLoop) -> None:
"""Cancel all pending tasks on `loop`, wait for them, and close the loop."""
try:
if sys.version_info[:2] >= (3, 7):
"""
if "\n" in regex:
regex = "(?x)" + regex
- return re.compile(regex)
+ compiled: Pattern[str] = re.compile(regex)
+ return compiled
def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
if "\n" in leaf.value:
return # Multiline strings, we can't continue.
- comment: Optional[Leaf]
for comment in line.comments_after(leaf):
length += len(comment.value)
with cache_file.open("rb") as fobj:
try:
cache: Cache = pickle.load(fobj)
- except pickle.UnpicklingError:
+ except (pickle.UnpicklingError, ValueError):
return {}
return cache
CACHE_DIR.mkdir(parents=True, exist_ok=True)
new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
- pickle.dump(new_cache, f, protocol=pickle.HIGHEST_PROTOCOL)
+ pickle.dump(new_cache, f, protocol=4)
os.replace(f.name, cache_file)
except OSError:
pass