+import ast
import asyncio
-from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
+from contextlib import contextmanager
from datetime import datetime
from enum import Enum
from functools import lru_cache, partial, wraps
# set for every version of python.
ASYNC_IDENTIFIERS = 6
ASYNC_KEYWORDS = 7
+ ASSIGNMENT_EXPRESSIONS = 8
+ POS_ONLY_ARGUMENTS = 9
VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
Feature.TRAILING_COMMA_IN_CALL,
Feature.TRAILING_COMMA_IN_DEF,
Feature.ASYNC_KEYWORDS,
+ Feature.ASSIGNMENT_EXPRESSIONS,
+ Feature.POS_ONLY_ARGUMENTS,
},
}
)
finally:
shutdown(loop)
+ executor.shutdown()
async def schedule_formatting(
write_back: WriteBack,
mode: FileMode,
report: "Report",
- loop: BaseEventLoop,
+ loop: asyncio.AbstractEventLoop,
executor: Executor,
) -> None:
"""Run formatting of `sources` in parallel using the provided `executor`.
(Use ProcessPoolExecutors for actual parallelism.)
- `line_length`, `write_back`, `fast`, and `pyi` options are passed to
+ `write_back`, `fast`, and `mode` options are passed to
:func:`format_file_in_place`.
"""
cache: Cache = {}
src_name = f"{src}\t{then} +0000"
dst_name = f"{src}\t{now} +0000"
diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
- if lock:
- lock.acquire()
- try:
+
+ with lock or nullcontext():
f = io.TextIOWrapper(
sys.stdout.buffer,
encoding=encoding,
)
f.write(diff_contents)
f.detach()
- finally:
- if lock:
- lock.release()
+
return True
token.DOUBLESTAR,
}
STARS = {token.STAR, token.DOUBLESTAR}
+VARARGS_SPECIALS = STARS | {token.SLASH}
VARARGS_PARENTS = {
syms.arglist,
syms.argument, # double star in arglist
try:
last_leaf = self.leaves[-1]
ignored_ids.add(id(last_leaf))
- if last_leaf.type == token.COMMA:
- # When trailing commas are inserted by Black for consistency, comments
- # after the previous last element are not moved (they don't have to,
- # rendering will still be correct). So we ignore trailing commas.
+ if last_leaf.type == token.COMMA or (
+ last_leaf.type == token.RPAR and not last_leaf.value
+ ):
+ # When trailing commas or optional parens are inserted by Black for
+ # consistency, comments after the previous last element are not moved
+ # (they don't have to, rendering will still be correct). So we ignore
+ # trailing commas and invisible.
last_leaf = self.leaves[-2]
ignored_ids.add(id(last_leaf))
except IndexError:
bracket_depth = leaf.bracket_depth
if bracket_depth == depth and leaf.type == token.COMMA:
commas += 1
- if leaf.parent and leaf.parent.type == syms.arglist:
+ if leaf.parent and leaf.parent.type in {
+ syms.arglist,
+ syms.typedargslist,
+ }:
commas += 1
break
comment.prefix = ""
return False
- self.comments.setdefault(id(self.leaves[-1]), []).append(comment)
+ last_leaf = self.leaves[-1]
+ if (
+ last_leaf.type == token.RPAR
+ and not last_leaf.value
+ and last_leaf.parent
+ and len(list(last_leaf.parent.leaves())) <= 3
+ and not is_type_comment(comment)
+ ):
+ # Comments on an optional parens wrapping a single leaf should belong to
+ # the wrapped node except if it's a type comment. Pinning the comment like
+ # this avoids unstable formatting caused by comment migration.
+ if len(self.leaves) < 2:
+ comment.type = STANDALONE_COMMENT
+ comment.prefix = ""
+ return False
+ last_leaf = self.leaves[-2]
+ self.comments.setdefault(id(last_leaf), []).append(comment)
return True
def comments_after(self, leaf: Leaf) -> List[Leaf]:
lines (two on module-level).
"""
before, after = self._maybe_empty_lines(current_line)
- before -= self.previous_after
+ before = (
+ # Black should not insert empty lines at the beginning
+ # of the file
+ 0
+ if self.previous_line is None
+ else before - self.previous_after
+ )
self.previous_after = after
self.previous_line = current_line
return before, after
node.children[2].value = ""
yield from super().visit_default(node)
+ def visit_factor(self, node: Node) -> Iterator[Line]:
+ """Force parentheses between a unary op and a binary power:
+
+ -2 ** 8 -> -(2 ** 8)
+ """
+ child = node.children[1]
+ if child.type == syms.power and len(child.children) == 3:
+ lpar = Leaf(token.LPAR, "(")
+ rpar = Leaf(token.RPAR, ")")
+ index = child.remove() or 0
+ node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
+ yield from self.visit_default(node)
+
def visit_INDENT(self, node: Node) -> Iterator[Line]:
"""Increase indentation level, maybe yield a line."""
# In blib2to3 INDENT never holds comments.
# that, too.
return prevp.prefix
- elif prevp.type in STARS:
+ elif prevp.type in VARARGS_SPECIALS:
if is_vararg(prevp, within=VARARGS_PARENTS | UNPACKING_PARENTS):
return NO
if not prevp or prevp.type == token.LPAR:
return NO
- elif prev.type in {token.EQUAL} | STARS:
+ elif prev.type in {token.EQUAL} | VARARGS_SPECIALS:
return NO
elif p.type == syms.decorator:
if leaves:
# Since body is a new indent level, remove spurious leading whitespace.
normalize_prefix(leaves[0], inside_brackets=True)
- # Ensure a trailing comma for imports, but be careful not to add one after
- # any comments.
- if original.is_import:
+ # Ensure a trailing comma for imports and standalone function arguments, but
+ # be careful not to add one after any comments.
+ no_commas = original.is_def and not any(
+ l.type == token.COMMA for l in leaves
+ )
+
+ if original.is_import or no_commas:
for i in range(len(leaves) - 1, -1, -1):
if leaves[i].type == STANDALONE_COMMENT:
continue
check_lpar = True
if check_lpar:
+ if is_walrus_assignment(child):
+ continue
if child.type == syms.atom:
if maybe_make_parens_invisible_in_atom(child, parent=node):
lpar = Leaf(token.LPAR, "")
# make parentheses invisible
first.value = "" # type: ignore
last.value = "" # type: ignore
- if len(node.children) > 1:
- maybe_make_parens_invisible_in_atom(node.children[1], parent=parent)
+ maybe_make_parens_invisible_in_atom(node.children[1], parent=parent)
return False
return True
)
+def unwrap_singleton_parenthesis(node: LN) -> Optional[LN]:
+ """Returns `wrapped` if `node` is of the shape ( wrapped ).
+
+ Parenthesis can be optional. Returns None otherwise"""
+ if len(node.children) != 3:
+ return None
+ lpar, wrapped, rpar = node.children
+ if not (lpar.type == token.LPAR and rpar.type == token.RPAR):
+ return None
+
+ return wrapped
+
+
def is_one_tuple(node: LN) -> bool:
"""Return True if `node` holds a tuple with one element, with or without parens."""
if node.type == syms.atom:
- if len(node.children) != 3:
- return False
-
- lpar, gexp, rpar = node.children
- if not (
- lpar.type == token.LPAR
- and gexp.type == syms.testlist_gexp
- and rpar.type == token.RPAR
- ):
+ gexp = unwrap_singleton_parenthesis(node)
+ if gexp is None or gexp.type != syms.testlist_gexp:
return False
return len(gexp.children) == 2 and gexp.children[1].type == token.COMMA
)
+def is_walrus_assignment(node: LN) -> bool:
+ """Return True iff `node` is of the shape ( test := test )"""
+ inner = unwrap_singleton_parenthesis(node)
+ return inner is not None and inner.type == syms.namedexpr_test
+
+
def is_yield(node: LN) -> bool:
"""Return True if `node` holds a `yield` or `yield from` expression."""
if node.type == syms.yield_expr:
extended iterable unpacking (PEP 3132) and additional unpacking
generalizations (PEP 448).
"""
- if leaf.type not in STARS or not leaf.parent:
+ if leaf.type not in VARARGS_SPECIALS or not leaf.parent:
return False
p = leaf.parent
"""Make sure parentheses are visible.
They could be invisible as part of some statements (see
- :func:`normalize_invible_parens` and :func:`visit_import_from`).
+ :func:`normalize_invisible_parens` and :func:`visit_import_from`).
"""
if leaf.type == token.LPAR:
leaf.value = "("
Currently looking for:
- f-strings;
- - underscores in numeric literals; and
- - trailing commas after * or ** in function signatures and calls.
+ - underscores in numeric literals;
+ - trailing commas after * or ** in function signatures and calls;
+ - positional only arguments in function signatures and lambdas;
"""
features: Set[Feature] = set()
for n in node.pre_order():
if "_" in n.value: # type: ignore
features.add(Feature.NUMERIC_UNDERSCORES)
+ elif n.type == token.SLASH:
+ if n.parent and n.parent.type in {syms.typedargslist, syms.arglist}:
+ features.add(Feature.POS_ONLY_ARGUMENTS)
+
+ elif n.type == token.COLONEQUAL:
+ features.add(Feature.ASSIGNMENT_EXPRESSIONS)
+
elif (
n.type in {syms.typedargslist, syms.arglist}
and n.children
return ", ".join(report) + "."
-def parse_ast(src: str) -> Union[ast3.AST, ast27.AST]:
- for feature_version in (7, 6):
- try:
- return ast3.parse(src, feature_version=feature_version)
- except SyntaxError:
- continue
+def parse_ast(src: str) -> Union[ast.AST, ast3.AST, ast27.AST]:
+ filename = "<unknown>"
+ if sys.version_info >= (3, 8):
+ # TODO: support Python 4+ ;)
+ for minor_version in range(sys.version_info[1], 4, -1):
+ try:
+ return ast.parse(src, filename, feature_version=(3, minor_version))
+ except SyntaxError:
+ continue
+ else:
+ for feature_version in (7, 6):
+ try:
+ return ast3.parse(src, filename, feature_version=feature_version)
+ except SyntaxError:
+ continue
return ast27.parse(src)
+def _fixup_ast_constants(
+ node: Union[ast.AST, ast3.AST, ast27.AST]
+) -> Union[ast.AST, ast3.AST, ast27.AST]:
+ """Map ast nodes deprecated in 3.8 to Constant."""
+ # casts are required until this is released:
+ # https://github.com/python/typeshed/pull/3142
+ if isinstance(node, (ast.Str, ast3.Str, ast27.Str, ast.Bytes, ast3.Bytes)):
+ return cast(ast.AST, ast.Constant(value=node.s))
+ elif isinstance(node, (ast.Num, ast3.Num, ast27.Num)):
+ return cast(ast.AST, ast.Constant(value=node.n))
+ elif isinstance(node, (ast.NameConstant, ast3.NameConstant)):
+ return cast(ast.AST, ast.Constant(value=node.value))
+ return node
+
+
def assert_equivalent(src: str, dst: str) -> None:
"""Raise AssertionError if `src` and `dst` aren't equivalent."""
- def _v(node: Union[ast3.AST, ast27.AST], depth: int = 0) -> Iterator[str]:
+ def _v(node: Union[ast.AST, ast3.AST, ast27.AST], depth: int = 0) -> Iterator[str]:
"""Simple visitor generating strings to compare ASTs by content."""
+
+ node = _fixup_ast_constants(node)
+
yield f"{' ' * depth}{node.__class__.__name__}("
for field in sorted(node._fields):
# TypeIgnore has only one field 'lineno' which breaks this comparison
- if isinstance(node, (ast3.TypeIgnore, ast27.TypeIgnore)):
+ type_ignore_classes = (ast3.TypeIgnore, ast27.TypeIgnore)
+ if sys.version_info >= (3, 8):
+ type_ignore_classes += (ast.TypeIgnore,)
+ if isinstance(node, type_ignore_classes):
break
- # Ignore str kind which is case sensitive / and ignores unicode_literals
- if isinstance(node, (ast3.Str, ast27.Str, ast3.Bytes)) and field == "kind":
- continue
-
try:
value = getattr(node, field)
except AttributeError:
# parentheses and they change the AST.
if (
field == "targets"
- and isinstance(node, (ast3.Delete, ast27.Delete))
- and isinstance(item, (ast3.Tuple, ast27.Tuple))
+ and isinstance(node, (ast.Delete, ast3.Delete, ast27.Delete))
+ and isinstance(item, (ast.Tuple, ast3.Tuple, ast27.Tuple))
):
for item in item.elts:
yield from _v(item, depth + 2)
- elif isinstance(item, (ast3.AST, ast27.AST)):
+ elif isinstance(item, (ast.AST, ast3.AST, ast27.AST)):
yield from _v(item, depth + 2)
- elif isinstance(value, (ast3.AST, ast27.AST)):
+ elif isinstance(value, (ast.AST, ast3.AST, ast27.AST)):
yield from _v(value, depth + 2)
else:
log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
raise AssertionError(
f"INTERNAL ERROR: Black produced invalid code: {exc}. "
- f"Please report a bug on https://github.com/python/black/issues. "
+ f"Please report a bug on https://github.com/psf/black/issues. "
f"This invalid output might be helpful: {log}"
) from None
raise AssertionError(
f"INTERNAL ERROR: Black produced code that is not equivalent to "
f"the source. "
- f"Please report a bug on https://github.com/python/black/issues. "
+ f"Please report a bug on https://github.com/psf/black/issues. "
f"This diff might be helpful: {log}"
) from None
raise AssertionError(
f"INTERNAL ERROR: Black produced different code on the second pass "
f"of the formatter. "
- f"Please report a bug on https://github.com/python/black/issues. "
+ f"Please report a bug on https://github.com/psf/black/issues. "
f"This diff might be helpful: {log}"
) from None
return f.name
+@contextmanager
+def nullcontext() -> Iterator[None]:
+ """Return context manager that does nothing.
+ Similar to `nullcontext` from python 3.7"""
+ yield
+
+
def diff(a: str, b: str, a_name: str, b_name: str) -> str:
"""Return a unified diff string between strings `a` and `b`."""
import difflib
task.cancel()
-def shutdown(loop: BaseEventLoop) -> None:
+def shutdown(loop: asyncio.AbstractEventLoop) -> None:
"""Cancel all pending tasks on `loop`, wait for them, and close the loop."""
try:
if sys.version_info[:2] >= (3, 7):