import tokenize
import sys
from typing import (
- Dict, Generic, Iterable, Iterator, List, Optional, Set, Tuple, TypeVar, Union
+ Dict, Generic, Iterable, Iterator, List, Optional, Set, Tuple, Type, TypeVar, Union
)
from attr import dataclass, Factory
from blib2to3.pgen2 import driver, token
from blib2to3.pgen2.parse import ParseError
-__version__ = "18.3a3"
+__version__ = "18.3a4"
DEFAULT_LINE_LENGTH = 88
# types
syms = pygram.python_symbols
class NothingChanged(UserWarning):
- """Raised by `format_file` when the reformatted code is the same as source."""
+ """Raised by `format_file()` when the reformatted code is the same as source."""
class CannotSplit(Exception):
"""
+class FormatError(Exception):
+ """Base exception for `# fmt: on` and `# fmt: off` handling.
+
+ It holds the number of bytes of the prefix consumed before the format
+ control comment appeared.
+ """
+
+ def __init__(self, consumed: int) -> None:
+ super().__init__(consumed)
+ self.consumed = consumed
+
+ def trim_prefix(self, leaf: Leaf) -> None:
+ leaf.prefix = leaf.prefix[self.consumed:]
+
+ def leaf_from_consumed(self, leaf: Leaf) -> Leaf:
+ """Returns a new Leaf from the consumed part of the prefix."""
+ unformatted_prefix = leaf.prefix[:self.consumed]
+ return Leaf(token.NEWLINE, unformatted_prefix)
+
+
+class FormatOn(FormatError):
+ """Found a comment like `# fmt: on` in the file."""
+
+
+class FormatOff(FormatError):
+ """Found a comment like `# fmt: off` in the file."""
+
+
@click.command()
@click.option(
'-l',
loop: BaseEventLoop,
executor: Executor,
) -> int:
+ """Run formatting of `sources` in parallel using the provided `executor`.
+
+ (Use ProcessPoolExecutors for actual parallelism.)
+
+ `line_length`, `write_back`, and `fast` options are passed to
+ :func:`format_file_in_place`.
+ """
tasks = {
src: loop.run_in_executor(
executor, format_file_in_place, src, line_length, fast, write_back
def format_file_in_place(
src: Path, line_length: int, fast: bool, write_back: bool = False
) -> bool:
- """Format the file and rewrite if changed. Return True if changed."""
+ """Format file under `src` path. Return True if changed.
+
+ If `write_back` is True, write reformatted code back to stdout.
+ `line_length` and `fast` options are passed to :func:`format_file_contents`.
+ """
with tokenize.open(src) as src_buffer:
src_contents = src_buffer.read()
try:
def format_stdin_to_stdout(
line_length: int, fast: bool, write_back: bool = False
) -> bool:
- """Format file on stdin and pipe output to stdout. Return True if changed."""
+ """Format file on stdin. Return True if changed.
+
+ If `write_back` is True, write reformatted code back to stdout.
+ `line_length` and `fast` arguments are passed to :func:`format_file_contents`.
+ """
contents = sys.stdin.read()
try:
contents = format_file_contents(contents, line_length=line_length, fast=fast)
def format_file_contents(
src_contents: str, line_length: int, fast: bool
) -> FileContent:
- """Reformats a file and returns its contents and encoding."""
+ """Reformats a file and returns its contents and encoding.
+
+ If `fast` is False, additionally confirm that the reformatted code is
+ valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
+ `line_length` is passed to :func:`format_str`.
+ """
if src_contents.strip() == '':
raise NothingChanged
def format_str(src_contents: str, line_length: int) -> FileContent:
- """Reformats a string and returns new contents."""
+ """Reformats a string and returns new contents.
+
+ `line_length` determines how many characters per line are allowed.
+ """
src_node = lib2to3_parse(src_contents)
dst_contents = ""
lines = LineGenerator()
class Visitor(Generic[T]):
- """Basic lib2to3 visitor that yields things on visiting."""
+ """Basic lib2to3 visitor that yields things of type `T` on `visit()`."""
def visit(self, node: LN) -> Iterator[T]:
+ """Main method to visit `node` and its children.
+
+ It tries to find a `visit_*()` method for the given `node.type`, like
+ `visit_simple_stmt` for Node objects or `visit_INDENT` for Leaf objects.
+ If no dedicated `visit_*()` method is found, chooses `visit_default()`
+ instead.
+
+ Then yields objects of type `T` from the selected visitor.
+ """
if node.type < 256:
name = token.tok_name[node.type]
else:
yield from getattr(self, f'visit_{name}', self.visit_default)(node)
def visit_default(self, node: LN) -> Iterator[T]:
+ """Default `visit_*()` implementation. Recurses to children of `node`."""
if isinstance(node, Node):
for child in node.children:
yield from self.visit(child)
@dataclass
class BracketTracker:
+ """Keeps track of brackets on a line."""
+
depth: int = 0
bracket_match: Dict[Tuple[Depth, NodeType], Leaf] = Factory(dict)
delimiters: Dict[LeafID, Priority] = Factory(dict)
previous: Optional[Leaf] = None
def mark(self, leaf: Leaf) -> None:
+ """Marks `leaf` with bracket-related metadata. Keeps track of delimiters.
+
+ All leaves receive an int `bracket_depth` field that stores how deep
+ within brackets a given leaf is. 0 means there are no enclosing brackets
+ that started on this line.
+
+ If a leaf is itself a closing bracket, it receives an `opening_bracket`
+ field that it forms a pair with. This is a one-directional link to
+ avoid reference cycles.
+
+ If a leaf is a delimiter (a token on which Black can split the line if
+ needed) and it's on depth 0, its `id()` is stored in the tracker's
+ `delimiters` field.
+ """
if leaf.type == token.COMMENT:
return
"""Returns True if there is an yet unmatched open bracket on the line."""
return bool(self.bracket_match)
- def max_priority(self, exclude: Iterable[LeafID] = ()) -> int:
+ def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> int:
"""Returns the highest priority of a delimiter found on the line.
Values are consistent with what `is_delimiter()` returns.
@dataclass
class Line:
+ """Holds leaves and comments. Can be printed with `str(line)`."""
+
depth: int = 0
leaves: List[Leaf] = Factory(list)
comments: Dict[LeafID, Leaf] = Factory(dict)
_for_loop_variable: bool = False
def append(self, leaf: Leaf, preformatted: bool = False) -> None:
+ """Add a new `leaf` to the end of the line.
+
+ Unless `preformatted` is True, the `leaf` will receive a new consistent
+ whitespace prefix and metadata applied by :class:`BracketTracker`.
+ Trailing commas are maybe removed, unpacked for loop variables are
+ demoted from being delimiters.
+
+ Inline comments are put aside.
+ """
has_value = leaf.value.strip()
if not has_value:
return
@property
def is_comment(self) -> bool:
+ """Is this line a standalone comment?"""
return bool(self) and self.leaves[0].type == STANDALONE_COMMENT
@property
def is_decorator(self) -> bool:
+ """Is this line a decorator?"""
return bool(self) and self.leaves[0].type == token.AT
@property
def is_import(self) -> bool:
+ """Is this an import line?"""
return bool(self) and is_import(self.leaves[0])
@property
def is_class(self) -> bool:
+ """Is this a class definition?"""
return (
bool(self)
and self.leaves[0].type == token.NAME
@property
def is_def(self) -> bool:
- """Also returns True for async defs."""
+ """Is this a function definition? (Also returns True for async defs.)"""
try:
first_leaf = self.leaves[0]
except IndexError:
@property
def is_flow_control(self) -> bool:
+ """Is this a flow control statement?
+
+ Those are `return`, `raise`, `break`, and `continue`.
+ """
return (
bool(self)
and self.leaves[0].type == token.NAME
@property
def is_yield(self) -> bool:
+ """Is this a yield statement?"""
return (
bool(self)
and self.leaves[0].type == token.NAME
)
def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
+ """Remove trailing comma if there is one and it's safe."""
if not (
self.leaves
and self.leaves[-1].type == token.COMMA
return False
def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
- # See `maybe_increment_for_loop_variable` above for explanation.
+ """See `maybe_increment_for_loop_variable` above for explanation."""
if self._for_loop_variable and leaf.type == token.NAME and leaf.value == 'in':
self.bracket_tracker.depth -= 1
self._for_loop_variable = False
return self.append_comment(comment)
def append_comment(self, comment: Leaf) -> bool:
+ """Add an inline comment to the line."""
if comment.type != token.COMMENT:
return False
return True
def last_non_delimiter(self) -> Leaf:
+ """Returns the last non-delimiter on the line. Raises LookupError otherwise."""
for i in range(len(self.leaves)):
last = self.leaves[-i - 1]
if not is_delimiter(last):
raise LookupError("No non-delimiters found")
def __str__(self) -> str:
+ """Render the line."""
if not self:
return '\n'
return res + '\n'
def __bool__(self) -> bool:
+ """Returns True if the line has leaves or comments."""
return bool(self.leaves or self.comments)
+class UnformattedLines(Line):
+ """Just like :class:`Line` but stores lines which aren't reformatted."""
+
+ def append(self, leaf: Leaf, preformatted: bool = True) -> None:
+ """Just add a new `leaf` to the end of the lines.
+
+ The `preformatted` argument is ignored.
+
+ Keeps track of indentation `depth`, which is useful when the user
+ says `# fmt: on`. Otherwise, doesn't do anything with the `leaf`.
+ """
+ try:
+ list(generate_comments(leaf))
+ except FormatOn as f_on:
+ self.leaves.append(f_on.leaf_from_consumed(leaf))
+ raise
+
+ self.leaves.append(leaf)
+ if leaf.type == token.INDENT:
+ self.depth += 1
+ elif leaf.type == token.DEDENT:
+ self.depth -= 1
+
+ def append_comment(self, comment: Leaf) -> bool:
+ """Not implemented in this class."""
+ raise NotImplementedError("Unformatted lines don't store comments separately.")
+
+ def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
+ """Does nothing and returns False."""
+ return False
+
+ def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
+ """Does nothing and returns False."""
+ return False
+
+ def maybe_adapt_standalone_comment(self, comment: Leaf) -> bool:
+ """Does nothing and returns False."""
+ return False
+
+ def __str__(self) -> str:
+ """Renders unformatted lines from leaves which were added with `append()`.
+
+ `depth` is not used for indentation in this case.
+ """
+ if not self:
+ return '\n'
+
+ res = ''
+ for leaf in self.leaves:
+ res += str(leaf)
+ return res
+
+
@dataclass
class EmptyLineTracker:
"""Provides a stateful method that returns the number of potential extra
(two on module-level), as well as providing an extra empty line after flow
control keywords to make them more prominent.
"""
+ if isinstance(current_line, UnformattedLines):
+ return 0, 0
+
before, after = self._maybe_empty_lines(current_line)
before -= self.previous_after
self.previous_after = after
def _maybe_empty_lines(self, current_line: Line) -> Tuple[int, int]:
max_allowed = 1
- if current_line.is_comment and current_line.depth == 0:
+ if current_line.depth == 0:
max_allowed = 2
if current_line.leaves:
# Consume the first leaf's extra newlines.
first_leaf = current_line.leaves[0]
before = first_leaf.prefix.count('\n')
- before = min(before, max(before, max_allowed))
+ before = min(before, max_allowed)
first_leaf.prefix = ''
else:
before = 0
"""
current_line: Line = Factory(Line)
- def line(self, indent: int = 0) -> Iterator[Line]:
+ def line(self, indent: int = 0, type: Type[Line] = Line) -> Iterator[Line]:
"""Generate a line.
If the line is empty, only emit if it makes sense.
If any lines were generated, set up a new current_line.
"""
if not self.current_line:
- self.current_line.depth += indent
+ if self.current_line.__class__ == type:
+ self.current_line.depth += indent
+ else:
+ self.current_line = type(depth=self.current_line.depth + indent)
return # Line is empty, don't emit. Creating a new one unnecessary.
complete_line = self.current_line
- self.current_line = Line(depth=complete_line.depth + indent)
+ self.current_line = type(depth=complete_line.depth + indent)
yield complete_line
+ def visit(self, node: LN) -> Iterator[Line]:
+ """Main method to start the visit process. Yields :class:`Line` objects."""
+ if isinstance(self.current_line, UnformattedLines):
+ # File contained `# fmt: off`
+ yield from self.visit_unformatted(node)
+
+ else:
+ yield from super().visit(node)
+
def visit_default(self, node: LN) -> Iterator[Line]:
+ """Default `visit_*()` implementation. Recurses to children of `node`."""
if isinstance(node, Leaf):
any_open_brackets = self.current_line.bracket_tracker.any_open_brackets()
- for comment in generate_comments(node):
- if any_open_brackets:
- # any comment within brackets is subject to splitting
- self.current_line.append(comment)
- elif comment.type == token.COMMENT:
- # regular trailing comment
- self.current_line.append(comment)
- yield from self.line()
-
- else:
- # regular standalone comment
- yield from self.line()
-
- self.current_line.append(comment)
- yield from self.line()
-
- normalize_prefix(node, inside_brackets=any_open_brackets)
- if node.type not in WHITESPACE:
- self.current_line.append(node)
+ try:
+ for comment in generate_comments(node):
+ if any_open_brackets:
+ # any comment within brackets is subject to splitting
+ self.current_line.append(comment)
+ elif comment.type == token.COMMENT:
+ # regular trailing comment
+ self.current_line.append(comment)
+ yield from self.line()
+
+ else:
+ # regular standalone comment
+ yield from self.line()
+
+ self.current_line.append(comment)
+ yield from self.line()
+
+ except FormatOff as f_off:
+ f_off.trim_prefix(node)
+ yield from self.line(type=UnformattedLines)
+ yield from self.visit(node)
+
+ except FormatOn as f_on:
+ # This only happens here if somebody says "fmt: on" multiple
+ # times in a row.
+ f_on.trim_prefix(node)
+ yield from self.visit_default(node)
+
+ else:
+ normalize_prefix(node, inside_brackets=any_open_brackets)
+ if node.type not in WHITESPACE:
+ self.current_line.append(node)
yield from super().visit_default(node)
def visit_INDENT(self, node: Node) -> Iterator[Line]:
+ """Increases indentation level, maybe yields a line."""
+ # In blib2to3 INDENT never holds comments.
yield from self.line(+1)
yield from self.visit_default(node)
def visit_DEDENT(self, node: Node) -> Iterator[Line]:
+ """Decreases indentation level, maybe yields a line."""
+ # DEDENT has no value. Additionally, in blib2to3 it never holds comments.
yield from self.line(-1)
def visit_stmt(self, node: Node, keywords: Set[str]) -> Iterator[Line]:
- """Visit a statement.
+ """Visits a statement.
- The relevant Python language keywords for this statement are NAME leaves
- within it.
+ This implementation is shared for `if`, `while`, `for`, `try`, `except`,
+ `def`, `with`, and `class`.
+
+ The relevant Python language `keywords` for a given statement will be NAME
+ leaves within it. This methods puts those on a separate line.
"""
for child in node.children:
if child.type == token.NAME and child.value in keywords: # type: ignore
yield from self.visit(child)
def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
- """A statement without nested statements."""
+ """Visits a statement without nested statements."""
is_suite_like = node.parent and node.parent.type in STATEMENT
if is_suite_like:
yield from self.line(+1)
yield from self.visit_default(node)
def visit_async_stmt(self, node: Node) -> Iterator[Line]:
+ """Visits `async def`, `async for`, `async with`."""
yield from self.line()
children = iter(node.children)
yield from self.visit(child)
def visit_decorators(self, node: Node) -> Iterator[Line]:
+ """Visits decorators."""
for child in node.children:
yield from self.line()
yield from self.visit(child)
def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
+ """Semicolons are always removed.
+
+ Statements between them are put on separate lines.
+ """
yield from self.line()
def visit_ENDMARKER(self, leaf: Leaf) -> Iterator[Line]:
+ """End of file.
+
+ Process outstanding comments and end with a newline.
+ """
yield from self.visit_default(leaf)
yield from self.line()
+ def visit_unformatted(self, node: LN) -> Iterator[Line]:
+ """Used when file contained a `# fmt: off`."""
+ if isinstance(node, Node):
+ for child in node.children:
+ yield from self.visit(child)
+
+ else:
+ try:
+ self.current_line.append(node)
+ except FormatOn as f_on:
+ f_on.trim_prefix(node)
+ yield from self.line()
+ yield from self.visit(node)
+
def __attrs_post_init__(self) -> None:
"""You are in a twisty little maze of passages."""
v = self.visit_stmt
if '#' not in p:
return
+ consumed = 0
nlines = 0
for index, line in enumerate(p.split('\n')):
+ consumed += len(line) + 1 # adding the length of the split '\n'
line = line.lstrip()
if not line:
nlines += 1
comment_type = token.COMMENT # simple trailing comment
else:
comment_type = STANDALONE_COMMENT
- yield Leaf(comment_type, make_comment(line), prefix='\n' * nlines)
+ comment = make_comment(line)
+ yield Leaf(comment_type, comment, prefix='\n' * nlines)
+
+ if comment in {'# fmt: on', '# yapf: enable'}:
+ raise FormatOn(consumed)
+
+ if comment in {'# fmt: off', '# yapf: disable'}:
+ raise FormatOff(consumed)
nlines = 0
def make_comment(content: str) -> str:
+ """Returns a consistently formatted comment from the given `content` string.
+
+ All comments (except for "##", "#!", "#:") should have a single space between
+ the hash sign and the content.
+
+ If `content` didn't start with a hash sign, one is provided.
+ """
content = content.rstrip()
if not content:
return '#'
If `py36` is True, splitting may generate syntax that is only compatible
with Python 3.6 and later.
"""
+ if isinstance(line, UnformattedLines):
+ yield line
+ return
+
line_str = str(line).strip('\n')
if len(line_str) <= line_length and '\n' not in line_str:
yield line
def left_hand_split(line: Line, py36: bool = False) -> Iterator[Line]:
- """Split line into many lines, starting with the first matching bracket pair.
+ """Splits line into many lines, starting with the first matching bracket pair.
Note: this usually looks weird, only use this for function definitions.
Prefer RHS otherwise.
comment_after = line.comments.get(id(leaf))
if comment_after:
result.append(comment_after, preformatted=True)
- split_succeeded_or_raise(head, body, tail)
+ bracket_split_succeeded_or_raise(head, body, tail)
for result in (head, body, tail):
if result:
yield result
def right_hand_split(line: Line, py36: bool = False) -> Iterator[Line]:
- """Split line into many lines, starting with the last matching bracket pair."""
+ """Splits line into many lines, starting with the last matching bracket pair."""
head = Line(depth=line.depth)
body = Line(depth=line.depth + 1, inside_brackets=True)
tail = Line(depth=line.depth)
comment_after = line.comments.get(id(leaf))
if comment_after:
result.append(comment_after, preformatted=True)
- split_succeeded_or_raise(head, body, tail)
+ bracket_split_succeeded_or_raise(head, body, tail)
for result in (head, body, tail):
if result:
yield result
-def split_succeeded_or_raise(head: Line, body: Line, tail: Line) -> None:
+def bracket_split_succeeded_or_raise(head: Line, body: Line, tail: Line) -> None:
+ """Raise :exc:`CannotSplit` if the last left- or right-hand split failed.
+
+ Do nothing otherwise.
+
+ A left- or right-hand split is based on a pair of brackets. Content before
+ (and including) the opening bracket is left on one line, content inside the
+ brackets is put on a separate line, and finally content starting with and
+ following the closing bracket is put on a separate line.
+
+ Those are called `head`, `body`, and `tail`, respectively. If the split
+ produced the same line (all content in `head`) or ended up with an empty `body`
+ and the `tail` is just the closing bracket, then it's considered failed.
+ """
tail_len = len(str(tail).strip())
if not body:
if tail_len == 0:
def delimiter_split(line: Line, py36: bool = False) -> Iterator[Line]:
- """Split according to delimiters of the highest priority.
+ """Splits according to delimiters of the highest priority.
This kind of split doesn't increase indentation.
If `py36` is True, the split will add trailing commas also in function
- signatures that contain * and **.
+ signatures that contain `*` and `**`.
"""
try:
last_leaf = line.leaves[-1]
delimiters = line.bracket_tracker.delimiters
try:
- delimiter_priority = line.bracket_tracker.max_priority(exclude={id(last_leaf)})
+ delimiter_priority = line.bracket_tracker.max_delimiter_priority(
+ exclude={id(last_leaf)}
+ )
except ValueError:
raise CannotSplit("No delimiters found")
def normalize_prefix(leaf: Leaf, *, inside_brackets: bool) -> None:
- """Leave existing extra newlines if not `inside_brackets`.
+ """Leaves existing extra newlines if not `inside_brackets`.
- Remove everything else. Note: don't use backslashes for formatting or
+ Removes everything else. Note: don't use backslashes for formatting or
you'll lose your voting rights.
"""
if not inside_brackets:
def gen_python_files_in_dir(path: Path) -> Iterator[Path]:
+ """Generates all files under `path` which aren't under BLACKLISTED_DIRECTORIES
+ and have one of the PYTHON_EXTENSIONS.
+ """
for child in path.iterdir():
if child.is_dir():
if child.name in BLACKLISTED_DIRECTORIES:
@dataclass
class Report:
- """Provides a reformatting counter."""
+ """Provides a reformatting counter. Can be rendered with `str(report)`."""
check: bool = False
change_count: int = 0
same_count: int = 0