import sys
import tempfile
import tokenize
+import traceback
from typing import (
Any,
Callable,
from attr import dataclass, evolve, Factory
import click
import toml
+from typed_ast import ast3, ast27
# lib2to3 fork
from blib2to3.pytree import Node, Leaf, type_repr
NUMERIC_UNDERSCORES = 3
TRAILING_COMMA_IN_CALL = 4
TRAILING_COMMA_IN_DEF = 5
+ # The following two feature-flags are mutually exclusive, and exactly one should be
+ # set for every version of python.
+ ASYNC_IDENTIFIERS = 6
+ ASYNC_KEYWORDS = 7
VERSION_TO_FEATURES: Dict[TargetVersion, Set[Feature]] = {
- TargetVersion.PY27: set(),
- TargetVersion.PY33: {Feature.UNICODE_LITERALS},
- TargetVersion.PY34: {Feature.UNICODE_LITERALS},
- TargetVersion.PY35: {Feature.UNICODE_LITERALS, Feature.TRAILING_COMMA_IN_CALL},
+ TargetVersion.PY27: {Feature.ASYNC_IDENTIFIERS},
+ TargetVersion.PY33: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
+ TargetVersion.PY34: {Feature.UNICODE_LITERALS, Feature.ASYNC_IDENTIFIERS},
+ TargetVersion.PY35: {
+ Feature.UNICODE_LITERALS,
+ Feature.TRAILING_COMMA_IN_CALL,
+ Feature.ASYNC_IDENTIFIERS,
+ },
TargetVersion.PY36: {
Feature.UNICODE_LITERALS,
Feature.F_STRINGS,
Feature.NUMERIC_UNDERSCORES,
Feature.TRAILING_COMMA_IN_CALL,
Feature.TRAILING_COMMA_IN_DEF,
+ Feature.ASYNC_IDENTIFIERS,
},
TargetVersion.PY37: {
Feature.UNICODE_LITERALS,
Feature.NUMERIC_UNDERSCORES,
Feature.TRAILING_COMMA_IN_CALL,
Feature.TRAILING_COMMA_IN_DEF,
+ Feature.ASYNC_KEYWORDS,
},
TargetVersion.PY38: {
Feature.UNICODE_LITERALS,
Feature.NUMERIC_UNDERSCORES,
Feature.TRAILING_COMMA_IN_CALL,
Feature.TRAILING_COMMA_IN_DEF,
+ Feature.ASYNC_KEYWORDS,
},
}
"--quiet",
is_flag=True,
help=(
- "Don't emit non-error messages to stderr. Errors are still emitted, "
+ "Don't emit non-error messages to stderr. Errors are still emitted; "
"silence those with 2>/dev/null."
),
)
)
if verbose or not quiet:
- bang = "💥 💔 💥" if report.return_code else "✨ 🍰 ✨"
- out(f"All done! {bang}")
+ out("Oh no! 💥 💔 💥" if report.return_code else "All done! ✨ 🍰 ✨")
click.secho(str(report), err=True)
ctx.exit(report.return_code)
) -> None:
"""Reformat a single file under `src` without spawning child processes.
- If `quiet` is True, non-error messages are not output. `line_length`,
- `write_back`, `fast` and `pyi` options are passed to
+ `fast`, `write_back`, and `mode` options are passed to
:func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
"""
try:
If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted
code to the file.
- `line_length` and `fast` options are passed to :func:`format_file_contents`.
+ `mode` and `fast` options are passed to :func:`format_file_contents`.
"""
if src.suffix == ".pyi":
mode = evolve(mode, is_pyi=True)
If `fast` is False, additionally confirm that the reformatted code is
valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
- `line_length` is passed to :func:`format_str`.
+ `mode` is passed to :func:`format_str`.
"""
if src_contents.strip() == "":
raise NothingChanged
def format_str(src_contents: str, *, mode: FileMode) -> FileContent:
"""Reformat a string and return new contents.
- `line_length` determines how many characters per line are allowed.
+ `mode` determines formatting options, such as how many characters per line are
+ allowed.
"""
src_node = lib2to3_parse(src_contents.lstrip(), mode.target_versions)
- dst_contents = ""
+ dst_contents = []
future_imports = get_future_imports(src_node)
if mode.target_versions:
versions = mode.target_versions
}
for current_line in lines.visit(src_node):
for _ in range(after):
- dst_contents += str(empty_line)
+ dst_contents.append(str(empty_line))
before, after = elt.maybe_empty_lines(current_line)
for _ in range(before):
- dst_contents += str(empty_line)
+ dst_contents.append(str(empty_line))
for line in split_line(
current_line, line_length=mode.line_length, features=split_line_features
):
- dst_contents += str(line)
- return dst_contents
+ dst_contents.append(str(line))
+ return "".join(dst_contents)
def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine]:
if not target_versions:
# No target_version specified, so try all grammars.
return [
+ # Python 3.7+
+ pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords,
+ # Python 3.0-3.6
pygram.python_grammar_no_print_statement_no_exec_statement,
+ # Python 2.7 with future print_function import
pygram.python_grammar_no_print_statement,
+ # Python 2.7
pygram.python_grammar,
]
elif all(version.is_python2() for version in target_versions):
# Python 2-only code, so try Python 2 grammars.
- return [pygram.python_grammar_no_print_statement, pygram.python_grammar]
+ return [
+ # Python 2.7 with future print_function import
+ pygram.python_grammar_no_print_statement,
+ # Python 2.7
+ pygram.python_grammar,
+ ]
else:
# Python 3-compatible code, so only try Python 3 grammar.
- return [pygram.python_grammar_no_print_statement_no_exec_statement]
+ grammars = []
+ # If we have to parse both, try to parse async as a keyword first
+ if not supports_feature(target_versions, Feature.ASYNC_IDENTIFIERS):
+ # Python 3.7+
+ grammars.append(
+ pygram.python_grammar_no_print_statement_no_exec_statement_async_keywords # noqa: B950
+ )
+ if not supports_feature(target_versions, Feature.ASYNC_KEYWORDS):
+ # Python 3.0-3.6
+ grammars.append(pygram.python_grammar_no_print_statement_no_exec_statement)
+ # At least one of the above branches must have been taken, because every Python
+ # version has exactly one of the two 'ASYNC_*' flags
+ return grammars
def lib2to3_parse(src_txt: str, target_versions: Iterable[TargetVersion] = ()) -> Node:
"""Return True if there is an yet unmatched open bracket on the line."""
return bool(self.bracket_match)
- def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> int:
+ def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> Priority:
"""Return the highest priority of a delimiter found on the line.
Values are consistent with what `is_split_*_delimiter()` return.
"""
return max(v for k, v in self.delimiters.items() if k not in exclude)
- def delimiter_count_with_priority(self, priority: int = 0) -> int:
+ def delimiter_count_with_priority(self, priority: Priority = 0) -> int:
"""Return the number of delimiters with the given `priority`.
If no `priority` is passed, defaults to max priority on the line.
self.current_line.append(node)
yield from super().visit_default(node)
+ def visit_atom(self, node: Node) -> Iterator[Line]:
+ # Always make parentheses invisible around a single node, because it should
+ # not be needed (except in the case of yield, where removing the parentheses
+ # produces a SyntaxError).
+ if (
+ len(node.children) == 3
+ and isinstance(node.children[0], Leaf)
+ and node.children[0].type == token.LPAR
+ and isinstance(node.children[2], Leaf)
+ and node.children[2].type == token.RPAR
+ and isinstance(node.children[1], Leaf)
+ and not (
+ node.children[1].type == token.NAME
+ and node.children[1].value == "yield"
+ )
+ ):
+ node.children[0].value = ""
+ node.children[2].value = ""
+ yield from super().visit_default(node)
+
def visit_INDENT(self, node: Node) -> Iterator[Line]:
"""Increase indentation level, maybe yield a line."""
# In blib2to3 INDENT never holds comments.
return container
-def is_split_after_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> int:
+def is_split_after_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
"""Return the priority of the `leaf` delimiter, given a line break after it.
The delimiter priorities returned here are from those delimiters that would
return 0
-def is_split_before_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> int:
+def is_split_before_delimiter(leaf: Leaf, previous: Optional[Leaf] = None) -> Priority:
"""Return the priority of the `leaf` delimiter, given a line break before it.
The delimiter priorities returned here are from those delimiters that would
new_body = sub_twice(escaped_orig_quote, rf"\1\2{orig_quote}", new_body)
new_body = sub_twice(unescaped_new_quote, rf"\1\\{new_quote}", new_body)
if "f" in prefix.casefold():
- matches = re.findall(r"[^{]\{(.*?)\}[^}]", new_body)
+ matches = re.findall(
+ r"""
+ (?:[^{]|^)\{ # start of the string or a non-{ followed by a single {
+ ([^{].*?) # contents of the brackets except if begins with {{
+ \}(?:[^}]|$) # A } followed by end of the string or a non-}
+ """,
+ new_body,
+ re.VERBOSE,
+ )
for m in matches:
if "\\" in str(m):
# Do not introduce backslashes in interpolated expressions
def normalize_invisible_parens(node: Node, parens_after: Set[str]) -> None:
"""Make existing optional parentheses invisible or create new ones.
- `parens_after` is a set of string leaf values immeditely after which parens
+ `parens_after` is a set of string leaf values immediately after which parens
should be put.
Standardizes on visible parentheses for single-element tuples, and keeps
)
-def max_delimiter_priority_in_atom(node: LN) -> int:
+def max_delimiter_priority_in_atom(node: LN) -> Priority:
"""Return maximum delimiter priority inside `node`.
This is specific to atoms with contents contained in a pair of parentheses.
return ", ".join(report) + "."
+def parse_ast(src: str) -> Union[ast3.AST, ast27.AST]:
+ for feature_version in (7, 6):
+ try:
+ return ast3.parse(src, feature_version=feature_version)
+ except SyntaxError:
+ continue
+
+ return ast27.parse(src)
+
+
def assert_equivalent(src: str, dst: str) -> None:
"""Raise AssertionError if `src` and `dst` aren't equivalent."""
- import ast
- import traceback
-
- def _v(node: ast.AST, depth: int = 0) -> Iterator[str]:
+ def _v(node: Union[ast3.AST, ast27.AST], depth: int = 0) -> Iterator[str]:
"""Simple visitor generating strings to compare ASTs by content."""
yield f"{' ' * depth}{node.__class__.__name__}("
for field in sorted(node._fields):
+ # TypeIgnore has only one field 'lineno' which breaks this comparison
+ if isinstance(node, (ast3.TypeIgnore, ast27.TypeIgnore)):
+ break
+
+ # Ignore str kind which is case sensitive / and ignores unicode_literals
+ if isinstance(node, (ast3.Str, ast27.Str, ast3.Bytes)) and field == "kind":
+ continue
+
try:
value = getattr(node, field)
except AttributeError:
# parentheses and they change the AST.
if (
field == "targets"
- and isinstance(node, ast.Delete)
- and isinstance(item, ast.Tuple)
+ and isinstance(node, (ast3.Delete, ast27.Delete))
+ and isinstance(item, (ast3.Tuple, ast27.Tuple))
):
for item in item.elts:
yield from _v(item, depth + 2)
- elif isinstance(item, ast.AST):
+ elif isinstance(item, (ast3.AST, ast27.AST)):
yield from _v(item, depth + 2)
- elif isinstance(value, ast.AST):
+ elif isinstance(value, (ast3.AST, ast27.AST)):
yield from _v(value, depth + 2)
else:
yield f"{' ' * depth}) # /{node.__class__.__name__}"
try:
- src_ast = ast.parse(src)
+ src_ast = parse_ast(src)
except Exception as exc:
- major, minor = sys.version_info[:2]
raise AssertionError(
- f"cannot use --safe with this file; failed to parse source file "
- f"with Python {major}.{minor}'s builtin AST. Re-run with --fast "
- f"or stop using deprecated Python 2 syntax. AST error message: {exc}"
+ f"cannot use --safe with this file; failed to parse source file. "
+ f"AST error message: {exc}"
)
try:
- dst_ast = ast.parse(dst)
+ dst_ast = parse_ast(dst)
except Exception as exc:
log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
raise AssertionError(
def dump_to_file(*output: str) -> str:
"""Dump `output` to a temporary file. Return path to the file."""
- import tempfile
-
with tempfile.NamedTemporaryFile(
mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
) as f:
if "\n" in leaf.value:
return # Multiline strings, we can't continue.
- comment: Optional[Leaf]
for comment in line.comments_after(leaf):
length += len(comment.value)