from attr import dataclass, evolve, Factory
import click
import toml
+from typed_ast import ast3, ast27
# lib2to3 fork
from blib2to3.pytree import Node, Leaf, type_repr
Priority = int
Index = int
LN = Union[Leaf, Node]
-SplitFunc = Callable[["Line", bool], Iterator["Line"]]
+SplitFunc = Callable[["Line", Collection["Feature"]], Iterator["Line"]]
Timestamp = float
FileSize = int
CacheInfo = Tuple[Timestamp, FileSize]
UNICODE_LITERALS = 1
F_STRINGS = 2
NUMERIC_UNDERSCORES = 3
- TRAILING_COMMA = 4
+ TRAILING_COMMA_IN_CALL = 4
+ TRAILING_COMMA_IN_DEF = 5
VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
TargetVersion.PY27: set(),
TargetVersion.PY33: {Feature.UNICODE_LITERALS},
TargetVersion.PY34: {Feature.UNICODE_LITERALS},
- TargetVersion.PY35: {Feature.UNICODE_LITERALS, Feature.TRAILING_COMMA},
+ TargetVersion.PY35: {Feature.UNICODE_LITERALS, Feature.TRAILING_COMMA_IN_CALL},
TargetVersion.PY36: {
Feature.UNICODE_LITERALS,
Feature.F_STRINGS,
Feature.NUMERIC_UNDERSCORES,
- Feature.TRAILING_COMMA,
+ Feature.TRAILING_COMMA_IN_CALL,
+ Feature.TRAILING_COMMA_IN_DEF,
},
TargetVersion.PY37: {
Feature.UNICODE_LITERALS,
Feature.F_STRINGS,
Feature.NUMERIC_UNDERSCORES,
- Feature.TRAILING_COMMA,
+ Feature.TRAILING_COMMA_IN_CALL,
+ Feature.TRAILING_COMMA_IN_DEF,
},
TargetVersion.PY38: {
Feature.UNICODE_LITERALS,
Feature.F_STRINGS,
Feature.NUMERIC_UNDERSCORES,
- Feature.TRAILING_COMMA,
+ Feature.TRAILING_COMMA_IN_CALL,
+ Feature.TRAILING_COMMA_IN_DEF,
},
}
@click.command(context_settings=dict(help_option_names=["-h", "--help"]))
+@click.option("-c", "--code", type=str, help="Format the code passed in as a string.")
@click.option(
"-l",
"--line-length",
@click.pass_context
def main(
ctx: click.Context,
+ code: Optional[str],
line_length: int,
target_version: List[TargetVersion],
check: bool,
)
if config and verbose:
out(f"Using configuration from {config}.", bold=False, fg="blue")
+ if code is not None:
+ print(format_str(code, mode=mode))
+ ctx.exit(0)
try:
include_regex = re_compile_maybe_verbose(include)
except re.error:
report=report,
)
else:
- loop = asyncio.get_event_loop()
- executor = ProcessPoolExecutor(max_workers=os.cpu_count())
- try:
- loop.run_until_complete(
- schedule_formatting(
- sources=sources,
- fast=fast,
- write_back=write_back,
- mode=mode,
- report=report,
- loop=loop,
- executor=executor,
- )
- )
- finally:
- shutdown(loop)
+ reformat_many(
+ sources=sources, fast=fast, write_back=write_back, mode=mode, report=report
+ )
+
if verbose or not quiet:
bang = "💥 💔 💥" if report.return_code else "✨ 🍰 ✨"
out(f"All done! {bang}")
report.failed(src, str(exc))
+def reformat_many(
+ sources: Set[Path],
+ fast: bool,
+ write_back: WriteBack,
+ mode: FileMode,
+ report: "Report",
+) -> None:
+ """Reformat multiple files using a ProcessPoolExecutor."""
+ loop = asyncio.get_event_loop()
+ worker_count = os.cpu_count()
+ if sys.platform == "win32":
+ # Work around https://bugs.python.org/issue26903
+ worker_count = min(worker_count, 61)
+ executor = ProcessPoolExecutor(max_workers=worker_count)
+ try:
+ loop.run_until_complete(
+ schedule_formatting(
+ sources=sources,
+ fast=fast,
+ write_back=write_back,
+ mode=mode,
+ report=report,
+ loop=loop,
+ executor=executor,
+ )
+ )
+ finally:
+ shutdown(loop)
+
+
async def schedule_formatting(
sources: Set[Path],
fast: bool,
manager = Manager()
lock = manager.Lock()
tasks = {
- loop.run_in_executor(
- executor, format_file_in_place, src, fast, mode, write_back, lock
+ asyncio.ensure_future(
+ loop.run_in_executor(
+ executor, format_file_in_place, src, fast, mode, write_back, lock
+ )
): src
for src in sorted(sources)
}
- pending: Iterable[asyncio.Task] = tasks.keys()
+ pending: Iterable[asyncio.Future] = tasks.keys()
try:
loop.add_signal_handler(signal.SIGINT, cancel, pending)
loop.add_signal_handler(signal.SIGTERM, cancel, pending)
elt = EmptyLineTracker(is_pyi=mode.is_pyi)
empty_line = Line()
after = 0
+ split_line_features = {
+ feature
+ for feature in {Feature.TRAILING_COMMA_IN_CALL, Feature.TRAILING_COMMA_IN_DEF}
+ if supports_feature(versions, feature)
+ }
for current_line in lines.visit(src_node):
for _ in range(after):
dst_contents += str(empty_line)
for _ in range(before):
dst_contents += str(empty_line)
for line in split_line(
- current_line,
- line_length=mode.line_length,
- supports_trailing_commas=supports_feature(versions, Feature.TRAILING_COMMA),
+ current_line, line_length=mode.line_length, features=split_line_features
):
dst_contents += str(line)
return dst_contents
return tiow.read(), encoding, newline
-GRAMMARS = [
- pygram.python_grammar_no_print_statement_no_exec_statement,
- pygram.python_grammar_no_print_statement,
- pygram.python_grammar,
-]
-
-
def get_grammars(target_versions: Set[TargetVersion]) -> List[Grammar]:
if not target_versions:
- return GRAMMARS
- elif all(not version.is_python2() for version in target_versions):
- # Python 3-compatible code, so don't try Python 2 grammar
+ # No target_version specified, so try all grammars.
return [
pygram.python_grammar_no_print_statement_no_exec_statement,
pygram.python_grammar_no_print_statement,
+ pygram.python_grammar,
]
- else:
+ elif all(version.is_python2() for version in target_versions):
+ # Python 2-only code, so try Python 2 grammars.
return [pygram.python_grammar_no_print_statement, pygram.python_grammar]
+ else:
+ # Python 3-compatible code, so only try Python 3 grammar.
+ return [pygram.python_grammar_no_print_statement_no_exec_statement]
def lib2to3_parse(src_txt: str, target_versions: Iterable[TargetVersion] = ()) -> Node:
consumed = 0
nlines = 0
+ ignored_lines = 0
for index, line in enumerate(prefix.split("\n")):
consumed += len(line) + 1 # adding the length of the split '\n'
line = line.lstrip()
if not line:
nlines += 1
if not line.startswith("#"):
+ # Escaped newlines outside of a comment are not really newlines at
+ # all. We treat a single-line comment following an escaped newline
+ # as a simple trailing comment.
+ if line.endswith("\\"):
+ ignored_lines += 1
continue
- if index == 0 and not is_endmarker:
+ if index == ignored_lines and not is_endmarker:
comment_type = token.COMMENT # simple trailing comment
else:
comment_type = STANDALONE_COMMENT
line: Line,
line_length: int,
inner: bool = False,
- supports_trailing_commas: bool = False,
+ features: Collection[Feature] = (),
) -> Iterator[Line]:
"""Split a `line` into potentially many lines.
current `line`, possibly transitively. This means we can fallback to splitting
by delimiters if the LHS/RHS don't yield any results.
- If `supports_trailing_commas` is True, splitting may use the TRAILING_COMMA feature.
+ `features` are syntactical features that may be used in the output.
"""
if line.is_comment:
yield line
split_funcs = [left_hand_split]
else:
- def rhs(line: Line, supports_trailing_commas: bool = False) -> Iterator[Line]:
+ def rhs(line: Line, features: Collection[Feature]) -> Iterator[Line]:
for omit in generate_trailers_to_omit(line, line_length):
- lines = list(
- right_hand_split(
- line, line_length, supports_trailing_commas, omit=omit
- )
- )
+ lines = list(right_hand_split(line, line_length, features, omit=omit))
if is_line_short_enough(lines[0], line_length=line_length):
yield from lines
return
# All splits failed, best effort split with no omits.
# This mostly happens to multiline strings that are by definition
# reported as not fitting a single line.
- yield from right_hand_split(line, supports_trailing_commas)
+ yield from right_hand_split(line, line_length, features=features)
if line.inside_brackets:
split_funcs = [delimiter_split, standalone_comment_split, rhs]
# split altogether.
result: List[Line] = []
try:
- for l in split_func(line, supports_trailing_commas):
+ for l in split_func(line, features):
if str(l).strip("\n") == line_str:
raise CannotSplit("Split function returned an unchanged result")
result.extend(
split_line(
- l,
- line_length=line_length,
- inner=True,
- supports_trailing_commas=supports_trailing_commas,
+ l, line_length=line_length, inner=True, features=features
)
)
except CannotSplit:
yield line
-def left_hand_split(
- line: Line, supports_trailing_commas: bool = False
-) -> Iterator[Line]:
+def left_hand_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
"""Split line into many lines, starting with the first matching bracket pair.
Note: this usually looks weird, only use this for function definitions.
def right_hand_split(
line: Line,
line_length: int,
- supports_trailing_commas: bool = False,
+ features: Collection[Feature] = (),
omit: Collection[LeafID] = (),
) -> Iterator[Line]:
"""Split line into many lines, starting with the last matching bracket pair.
):
omit = {id(closing_bracket), *omit}
try:
- yield from right_hand_split(
- line,
- line_length,
- supports_trailing_commas=supports_trailing_commas,
- omit=omit,
- )
+ yield from right_hand_split(line, line_length, features=features, omit=omit)
return
except CannotSplit:
if leaves:
# Since body is a new indent level, remove spurious leading whitespace.
normalize_prefix(leaves[0], inside_brackets=True)
- # Ensure a trailing comma when expected.
+ # Ensure a trailing comma for imports, but be careful not to add one after
+ # any comments.
if original.is_import:
- if leaves[-1].type != token.COMMA:
- leaves.append(Leaf(token.COMMA, ","))
+ for i in range(len(leaves) - 1, -1, -1):
+ if leaves[i].type == STANDALONE_COMMENT:
+ continue
+ elif leaves[i].type == token.COMMA:
+ break
+ else:
+ leaves.insert(i + 1, Leaf(token.COMMA, ","))
+ break
# Populate the line
for leaf in leaves:
result.append(leaf, preformatted=True)
"""
@wraps(split_func)
- def split_wrapper(
- line: Line, supports_trailing_commas: bool = False
- ) -> Iterator[Line]:
- for l in split_func(line, supports_trailing_commas):
+ def split_wrapper(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
+ for l in split_func(line, features):
normalize_prefix(l.leaves[0], inside_brackets=True)
yield l
@dont_increase_indentation
-def delimiter_split(
- line: Line, supports_trailing_commas: bool = False
-) -> Iterator[Line]:
+def delimiter_split(line: Line, features: Collection[Feature] = ()) -> Iterator[Line]:
"""Split according to delimiters of the highest priority.
- If `supports_trailing_commas` is True, the split will add trailing commas
- also in function signatures that contain `*` and `**`.
+ If the appropriate Features are given, the split will add trailing commas
+ also in function signatures and calls that contain `*` and `**`.
"""
try:
last_leaf = line.leaves[-1]
yield from append_to_line(comment_after)
lowest_depth = min(lowest_depth, leaf.bracket_depth)
- if leaf.bracket_depth == lowest_depth and is_vararg(
- leaf, within=VARARGS_PARENTS
- ):
- trailing_comma_safe = trailing_comma_safe and supports_trailing_commas
+ if leaf.bracket_depth == lowest_depth:
+ if is_vararg(leaf, within={syms.typedargslist}):
+ trailing_comma_safe = (
+ trailing_comma_safe and Feature.TRAILING_COMMA_IN_DEF in features
+ )
+ elif is_vararg(leaf, within={syms.arglist, syms.argument}):
+ trailing_comma_safe = (
+ trailing_comma_safe and Feature.TRAILING_COMMA_IN_CALL in features
+ )
+
leaf_priority = bt.delimiters.get(id(leaf))
if leaf_priority == delimiter_priority:
yield current_line
@dont_increase_indentation
def standalone_comment_split(
- line: Line, supports_trailing_commas: bool = False
+ line: Line, features: Collection[Feature] = ()
) -> Iterator[Line]:
"""Split standalone comments from the rest of the line."""
if not line.contains_standalone_comments(0):
check_lpar = False
for index, child in enumerate(list(node.children)):
+ # Add parentheses around long tuple unpacking in assignments.
+ if (
+ index == 0
+ and isinstance(child, Node)
+ and child.type == syms.testlist_star_expr
+ ):
+ check_lpar = True
+
if check_lpar:
if child.type == syms.atom:
- if maybe_make_parens_invisible_in_atom(child):
+ if maybe_make_parens_invisible_in_atom(child, parent=node):
lpar = Leaf(token.LPAR, "")
rpar = Leaf(token.RPAR, "")
index = child.remove() or 0
lpar = Leaf(token.LPAR, "")
rpar = Leaf(token.RPAR, "")
index = child.remove() or 0
- node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
+ prefix = child.prefix
+ child.prefix = ""
+ new_child = Node(syms.atom, [lpar, child, rpar])
+ new_child.prefix = prefix
+ node.insert_child(index, new_child)
check_lpar = isinstance(child, Leaf) and child.value in parens_after
container = container.next_sibling
-def maybe_make_parens_invisible_in_atom(node: LN) -> bool:
+def maybe_make_parens_invisible_in_atom(node: LN, parent: LN) -> bool:
"""If it's safe, make the parens in the atom `node` invisible, recursively.
Returns whether the node should itself be wrapped in invisible parentheses.
node.type != syms.atom
or is_empty_tuple(node)
or is_one_tuple(node)
- or is_yield(node)
+ or (is_yield(node) and parent.type != syms.expr_stmt)
or max_delimiter_priority_in_atom(node) >= COMMA_PRIORITY
):
return False
first.value = "" # type: ignore
last.value = "" # type: ignore
if len(node.children) > 1:
- maybe_make_parens_invisible_in_atom(node.children[1])
+ maybe_make_parens_invisible_in_atom(node.children[1], parent=parent)
return False
return True
and n.children
and n.children[-1].type == token.COMMA
):
+ if n.type == syms.typedargslist:
+ feature = Feature.TRAILING_COMMA_IN_DEF
+ else:
+ feature = Feature.TRAILING_COMMA_IN_CALL
+
for ch in n.children:
if ch.type in STARS:
- features.add(Feature.TRAILING_COMMA)
+ features.add(feature)
if ch.type == syms.argument:
for argch in ch.children:
if argch.type in STARS:
- features.add(Feature.TRAILING_COMMA)
+ features.add(feature)
return features
elif child.type == syms.import_as_names:
yield from get_imports_from_children(child.children)
else:
- assert False, "Invalid syntax parsing imports"
+ raise AssertionError("Invalid syntax parsing imports")
for child in node.children:
if child.type != syms.simple_stmt:
return ", ".join(report) + "."
+def parse_ast(src: str) -> Union[ast3.AST, ast27.AST]:
+ for feature_version in (7, 6):
+ try:
+ return ast3.parse(src, feature_version=feature_version)
+ except SyntaxError:
+ continue
+
+ return ast27.parse(src)
+
+
def assert_equivalent(src: str, dst: str) -> None:
"""Raise AssertionError if `src` and `dst` aren't equivalent."""
- import ast
import traceback
- def _v(node: ast.AST, depth: int = 0) -> Iterator[str]:
+ def _v(node: Union[ast3.AST, ast27.AST], depth: int = 0) -> Iterator[str]:
"""Simple visitor generating strings to compare ASTs by content."""
yield f"{' ' * depth}{node.__class__.__name__}("
for field in sorted(node._fields):
+ # TypeIgnore has only one field 'lineno' which breaks this comparison
+ if isinstance(node, (ast3.TypeIgnore, ast27.TypeIgnore)):
+ break
+
+ # Ignore str kind which is case sensitive / and ignores unicode_literals
+ if isinstance(node, (ast3.Str, ast27.Str, ast3.Bytes)) and field == "kind":
+ continue
+
try:
value = getattr(node, field)
except AttributeError:
# parentheses and they change the AST.
if (
field == "targets"
- and isinstance(node, ast.Delete)
- and isinstance(item, ast.Tuple)
+ and isinstance(node, (ast3.Delete, ast27.Delete))
+ and isinstance(item, (ast3.Tuple, ast27.Tuple))
):
for item in item.elts:
yield from _v(item, depth + 2)
- elif isinstance(item, ast.AST):
+ elif isinstance(item, (ast3.AST, ast27.AST)):
yield from _v(item, depth + 2)
- elif isinstance(value, ast.AST):
+ elif isinstance(value, (ast3.AST, ast27.AST)):
yield from _v(value, depth + 2)
else:
yield f"{' ' * depth}) # /{node.__class__.__name__}"
try:
- src_ast = ast.parse(src)
+ src_ast = parse_ast(src)
except Exception as exc:
- major, minor = sys.version_info[:2]
raise AssertionError(
- f"cannot use --safe with this file; failed to parse source file "
- f"with Python {major}.{minor}'s builtin AST. Re-run with --fast "
- f"or stop using deprecated Python 2 syntax. AST error message: {exc}"
+ f"cannot use --safe with this file; failed to parse source file. "
+ f"AST error message: {exc}"
)
try:
- dst_ast = ast.parse(dst)
+ dst_ast = parse_ast(dst)
except Exception as exc:
log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
raise AssertionError(
f"INTERNAL ERROR: Black produced invalid code: {exc}. "
- f"Please report a bug on https://github.com/ambv/black/issues. "
+ f"Please report a bug on https://github.com/python/black/issues. "
f"This invalid output might be helpful: {log}"
) from None
raise AssertionError(
f"INTERNAL ERROR: Black produced code that is not equivalent to "
f"the source. "
- f"Please report a bug on https://github.com/ambv/black/issues. "
+ f"Please report a bug on https://github.com/python/black/issues. "
f"This diff might be helpful: {log}"
) from None
raise AssertionError(
f"INTERNAL ERROR: Black produced different code on the second pass "
f"of the formatter. "
- f"Please report a bug on https://github.com/ambv/black/issues. "
+ f"Please report a bug on https://github.com/python/black/issues. "
f"This diff might be helpful: {log}"
) from None