import pickle
from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
-from enum import Enum
+from enum import Enum, Flag
from functools import partial, wraps
import keyword
import logging
Type,
TypeVar,
Union,
+ cast,
)
from appdirs import user_cache_dir
from blib2to3.pgen2.parse import ParseError
-__version__ = "18.4a6"
+__version__ = "18.5b1"
DEFAULT_LINE_LENGTH = 88
+CACHE_DIR = Path(user_cache_dir("black", version=__version__))
+
# types
-syms = pygram.python_symbols
FileContent = str
Encoding = str
Depth = int
out = partial(click.secho, bold=True, err=True)
err = partial(click.secho, fg="red", err=True)
+pygram.initialize(CACHE_DIR)
+syms = pygram.python_symbols
+
class NothingChanged(UserWarning):
"""Raised by :func:`format_file` when reformatted code is the same as source."""
YES = 2
+class FileMode(Flag):
+ AUTO_DETECT = 0
+ PYTHON36 = 1
+ PYI = 2
+
+
@click.command()
@click.option(
"-l",
"silence those with 2>/dev/null."
),
)
+@click.option(
+ "--pyi",
+ is_flag=True,
+ help=(
+ "Consider all input files typing stubs regardless of file extension "
+ "(useful when piping source on standard input)."
+ ),
+)
+@click.option(
+ "--py36",
+ is_flag=True,
+ help=(
+ "Allow using Python 3.6-only syntax on all input files. This will put "
+ "trailing commas in function signatures and calls also after *args and "
+ "**kwargs. [default: per-file auto-detection]"
+ ),
+)
@click.version_option(version=__version__)
@click.argument(
"src",
check: bool,
diff: bool,
fast: bool,
+ pyi: bool,
+ py36: bool,
quiet: bool,
src: List[str],
) -> None:
write_back = WriteBack.DIFF
else:
write_back = WriteBack.YES
+ mode = FileMode.AUTO_DETECT
+ if py36:
+ mode |= FileMode.PYTHON36
+ if pyi:
+ mode |= FileMode.PYI
report = Report(check=check, quiet=quiet)
if len(sources) == 0:
out("No paths given. Nothing to do 😴")
return
elif len(sources) == 1:
- reformat_one(sources[0], line_length, fast, write_back, report)
+ reformat_one(
+ src=sources[0],
+ line_length=line_length,
+ fast=fast,
+ write_back=write_back,
+ mode=mode,
+ report=report,
+ )
else:
loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=os.cpu_count())
try:
loop.run_until_complete(
schedule_formatting(
- sources, line_length, fast, write_back, report, loop, executor
+ sources=sources,
+ line_length=line_length,
+ fast=fast,
+ write_back=write_back,
+ mode=mode,
+ report=report,
+ loop=loop,
+ executor=executor,
)
)
finally:
def reformat_one(
- src: Path, line_length: int, fast: bool, write_back: WriteBack, report: "Report"
+ src: Path,
+ line_length: int,
+ fast: bool,
+ write_back: WriteBack,
+ mode: FileMode,
+ report: "Report",
) -> None:
"""Reformat a single file under `src` without spawning child processes.
If `quiet` is True, non-error messages are not output. `line_length`,
- `write_back`, and `fast` options are passed to :func:`format_file_in_place`.
+ `write_back`, `fast` and `pyi` options are passed to
+ :func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
"""
try:
changed = Changed.NO
if not src.is_file() and str(src) == "-":
if format_stdin_to_stdout(
- line_length=line_length, fast=fast, write_back=write_back
+ line_length=line_length, fast=fast, write_back=write_back, mode=mode
):
changed = Changed.YES
else:
cache: Cache = {}
if write_back != WriteBack.DIFF:
- cache = read_cache(line_length)
+ cache = read_cache(line_length, mode)
src = src.resolve()
if src in cache and cache[src] == get_cache_info(src):
changed = Changed.CACHED
if changed is not Changed.CACHED and format_file_in_place(
- src, line_length=line_length, fast=fast, write_back=write_back
+ src,
+ line_length=line_length,
+ fast=fast,
+ write_back=write_back,
+ mode=mode,
):
changed = Changed.YES
if write_back == WriteBack.YES and changed is not Changed.NO:
- write_cache(cache, [src], line_length)
+ write_cache(cache, [src], line_length, mode)
report.done(src, changed)
except Exception as exc:
report.failed(src, str(exc))
line_length: int,
fast: bool,
write_back: WriteBack,
+ mode: FileMode,
report: "Report",
loop: BaseEventLoop,
executor: Executor,
(Use ProcessPoolExecutors for actual parallelism.)
- `line_length`, `write_back`, and `fast` options are passed to
+ `line_length`, `write_back`, `fast`, and `pyi` options are passed to
:func:`format_file_in_place`.
"""
cache: Cache = {}
if write_back != WriteBack.DIFF:
- cache = read_cache(line_length)
+ cache = read_cache(line_length, mode)
sources, cached = filter_cached(cache, sources)
for src in cached:
report.done(src, Changed.CACHED)
lock = manager.Lock()
tasks = {
loop.run_in_executor(
- executor, format_file_in_place, src, line_length, fast, write_back, lock
+ executor,
+ format_file_in_place,
+ src,
+ line_length,
+ fast,
+ write_back,
+ mode,
+ lock,
): src
for src in sorted(sources)
}
if cancelled:
await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
if write_back == WriteBack.YES and formatted:
- write_cache(cache, formatted, line_length)
+ write_cache(cache, formatted, line_length, mode)
def format_file_in_place(
line_length: int,
fast: bool,
write_back: WriteBack = WriteBack.NO,
+ mode: FileMode = FileMode.AUTO_DETECT,
lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
) -> bool:
"""Format file under `src` path. Return True if changed.
If `write_back` is True, write reformatted code back to stdout.
`line_length` and `fast` options are passed to :func:`format_file_contents`.
"""
- is_pyi = src.suffix == ".pyi"
-
+ if src.suffix == ".pyi":
+ mode |= FileMode.PYI
with tokenize.open(src) as src_buffer:
src_contents = src_buffer.read()
try:
dst_contents = format_file_contents(
- src_contents, line_length=line_length, fast=fast, is_pyi=is_pyi
+ src_contents, line_length=line_length, fast=fast, mode=mode
)
except NothingChanged:
return False
def format_stdin_to_stdout(
- line_length: int, fast: bool, write_back: WriteBack = WriteBack.NO
+ line_length: int,
+ fast: bool,
+ write_back: WriteBack = WriteBack.NO,
+ mode: FileMode = FileMode.AUTO_DETECT,
) -> bool:
"""Format file on stdin. Return True if changed.
If `write_back` is True, write reformatted code back to stdout.
- `line_length` and `fast` arguments are passed to :func:`format_file_contents`.
+ `line_length`, `fast`, `is_pyi`, and `force_py36` arguments are passed to
+ :func:`format_file_contents`.
"""
src = sys.stdin.read()
dst = src
try:
- dst = format_file_contents(src, line_length=line_length, fast=fast)
+ dst = format_file_contents(src, line_length=line_length, fast=fast, mode=mode)
return True
except NothingChanged:
def format_file_contents(
- src_contents: str, *, line_length: int, fast: bool, is_pyi: bool = False
+ src_contents: str,
+ *,
+ line_length: int,
+ fast: bool,
+ mode: FileMode = FileMode.AUTO_DETECT,
) -> FileContent:
"""Reformat contents a file and return new contents.
if src_contents.strip() == "":
raise NothingChanged
- dst_contents = format_str(src_contents, line_length=line_length, is_pyi=is_pyi)
+ dst_contents = format_str(src_contents, line_length=line_length, mode=mode)
if src_contents == dst_contents:
raise NothingChanged
if not fast:
assert_equivalent(src_contents, dst_contents)
- assert_stable(
- src_contents, dst_contents, line_length=line_length, is_pyi=is_pyi
- )
+ assert_stable(src_contents, dst_contents, line_length=line_length, mode=mode)
return dst_contents
def format_str(
- src_contents: str, line_length: int, *, is_pyi: bool = False
+ src_contents: str, line_length: int, *, mode: FileMode = FileMode.AUTO_DETECT
) -> FileContent:
"""Reformat a string and return new contents.
src_node = lib2to3_parse(src_contents)
dst_contents = ""
future_imports = get_future_imports(src_node)
- elt = EmptyLineTracker(is_pyi=is_pyi)
- py36 = is_python36(src_node)
+ is_pyi = bool(mode & FileMode.PYI)
+ py36 = bool(mode & FileMode.PYTHON36) or is_python36(src_node)
lines = LineGenerator(
remove_u_prefix=py36 or "unicode_literals" in future_imports, is_pyi=is_pyi
)
+ elt = EmptyLineTracker(is_pyi=is_pyi)
empty_line = Line()
after = 0
for current_line in lines.visit(src_node):
and second_leaf.value == "def"
)
- @property
- def is_flow_control(self) -> bool:
- """Is this line a flow control statement?
-
- Those are `return`, `raise`, `break`, and `continue`.
- """
- return (
- bool(self)
- and self.leaves[0].type == token.NAME
- and self.leaves[0].value in FLOW_CONTROL
- )
-
- @property
- def is_yield(self) -> bool:
- """Is this line a yield statement?"""
- return (
- bool(self)
- and self.leaves[0].type == token.NAME
- and self.leaves[0].value == "yield"
- )
-
@property
def is_class_paren_empty(self) -> bool:
"""Is this a class with no base classes but using parentheses?
and self.leaves[3].value == ")"
)
+ @property
+ def is_triple_quoted_string(self) -> bool:
+ """Is the line a triple quoted string?"""
+ return (
+ bool(self)
+ and self.leaves[0].type == token.STRING
+ and self.leaves[0].value.startswith(('"""', "'''"))
+ )
+
def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
"""If so, needs to be split before emitting."""
for leaf in self.leaves:
the prefix of the first leaf consists of optional newlines. Those newlines
are consumed by `maybe_empty_lines()` and included in the computation.
"""
+
is_pyi: bool = False
previous_line: Optional[Line] = None
previous_after: int = 0
"""Return the number of extra empty lines before and after the `current_line`.
This is for separating `def`, `async def` and `class` with extra empty
- lines (two on module-level), as well as providing an extra empty line
- after flow control keywords to make them more prominent.
+ lines (two on module-level).
"""
if isinstance(current_line, UnformattedLines):
return 0, 0
if self.previous_line.is_decorator:
return 0, 0
+ if self.previous_line.depth < current_line.depth and (
+ self.previous_line.is_class or self.previous_line.is_def
+ ):
+ return 0, 0
+
if (
self.previous_line.is_comment
and self.previous_line.depth == current_line.depth
):
return (before or 1), 0
+ if (
+ self.previous_line
+ and self.previous_line.is_class
+ and current_line.is_triple_quoted_string
+ ):
+ return before, 1
+
return before, 0
Note: destroys the tree it's visiting by mutating prefixes of its leaves
in ways that will no longer stringify to valid Python code on the tree.
"""
+
is_pyi: bool = False
current_line: Line = Factory(Line)
remove_u_prefix: bool = False
yield from self.line()
yield from self.visit(child)
- def visit_import_from(self, node: Node) -> Iterator[Line]:
- """Visit import_from and maybe put invisible parentheses.
-
- This is separate from `visit_stmt` because import statements don't
- support arbitrary atoms and thus handling of parentheses is custom.
- """
- check_lpar = False
- for index, child in enumerate(node.children):
- if check_lpar:
- if child.type == token.LPAR:
- # make parentheses invisible
- child.value = "" # type: ignore
- node.children[-1].value = "" # type: ignore
- else:
- # insert invisible parentheses
- node.insert_child(index, Leaf(token.LPAR, ""))
- node.append_child(Leaf(token.RPAR, ""))
- break
-
- check_lpar = (
- child.type == token.NAME and child.value == "import" # type: ignore
- )
-
- for child in node.children:
- yield from self.visit(child)
-
def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
"""Remove a semicolon and put the other statement on a separate line."""
yield from self.line()
self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
+ self.visit_import_from = partial(v, keywords=Ø, parens={"import"})
self.visit_async_funcdef = self.visit_async_stmt
self.visit_decorated = self.visit_decorators
return 0
-def generate_comments(leaf: Leaf) -> Iterator[Leaf]:
+def generate_comments(leaf: LN) -> Iterator[Leaf]:
"""Clean the prefix of the `leaf` and generate comments from it, if any.
Comments in lib2to3 are shoved into the whitespace prefix. This happens
def rhs(line: Line, py36: bool = False) -> Iterator[Line]:
for omit in generate_trailers_to_omit(line, line_length):
- lines = list(right_hand_split(line, py36, omit=omit))
+ lines = list(right_hand_split(line, line_length, py36, omit=omit))
if is_line_short_enough(lines[0], line_length=line_length):
yield from lines
return
# All splits failed, best effort split with no omits.
+ # This mostly happens to multiline strings that are by definition
+ # reported as not fitting a single line.
yield from right_hand_split(line, py36)
if line.inside_brackets:
def right_hand_split(
- line: Line, py36: bool = False, omit: Collection[LeafID] = ()
+ line: Line, line_length: int, py36: bool = False, omit: Collection[LeafID] = ()
) -> Iterator[Line]:
"""Split line into many lines, starting with the last matching bracket pair.
and not line.is_import
):
omit = {id(closing_bracket), *omit}
- delimiter_count = body.bracket_tracker.delimiter_count_with_priority()
- if (
- delimiter_count == 0
- or delimiter_count == 1
- and (
- body.leaves[0].type in OPENING_BRACKETS
- or body.leaves[-1].type in CLOSING_BRACKETS
- )
- ):
+ if can_omit_invisible_parens(body, line_length):
try:
- yield from right_hand_split(line, py36=py36, omit=omit)
+ yield from right_hand_split(line, line_length, py36=py36, omit=omit)
return
except CannotSplit:
pass
Standardizes on visible parentheses for single-element tuples, and keeps
existing visible parentheses for other tuples and generator expressions.
"""
+ try:
+ list(generate_comments(node))
+ except FormatOff:
+ return # This `node` has a prefix with `# fmt: off`, don't mess with parens.
+
check_lpar = False
- for child in list(node.children):
+ for index, child in enumerate(list(node.children)):
if check_lpar:
if child.type == syms.atom:
maybe_make_parens_invisible_in_atom(child)
# wrap child in visible parentheses
lpar = Leaf(token.LPAR, "(")
rpar = Leaf(token.RPAR, ")")
- index = child.remove() or 0
+ child.remove()
node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
+ elif node.type == syms.import_from:
+ # "import from" nodes store parentheses directly as part of
+ # the statement
+ if child.type == token.LPAR:
+ # make parentheses invisible
+ child.value = "" # type: ignore
+ node.children[-1].value = "" # type: ignore
+ elif child.type != token.STAR:
+ # insert invisible parentheses
+ node.insert_child(index, Leaf(token.LPAR, ""))
+ node.append_child(Leaf(token.RPAR, ""))
+ break
+
elif not (isinstance(child, Leaf) and is_multiline_string(child)):
# wrap child in invisible parentheses
lpar = Leaf(token.LPAR, "")
def should_explode(line: Line, opening_bracket: Leaf) -> bool:
"""Should `line` immediately be split with `delimiter_split()` after RHS?"""
- return bool(
+ if not (
opening_bracket.parent
and opening_bracket.parent.type in {syms.atom, syms.import_from}
and opening_bracket.value in "[{("
- and line.bracket_tracker.delimiters
- and line.bracket_tracker.max_delimiter_priority() == COMMA_PRIORITY
- )
+ ):
+ return False
+
+ try:
+ last_leaf = line.leaves[-1]
+ exclude = {id(last_leaf)} if last_leaf.type == token.COMMA else set()
+ max_priority = line.bracket_tracker.max_delimiter_priority(exclude=exclude)
+ except (IndexError, ValueError):
+ return False
+
+ return max_priority == COMMA_PRIORITY
def is_python36(node: Node) -> bool:
closing_bracket = None
optional_brackets: Set[LeafID] = set()
inner_brackets: Set[LeafID] = set()
- for index, leaf in enumerate_reversed(line.leaves):
- length += len(leaf.prefix) + len(leaf.value)
+ for index, leaf, leaf_length in enumerate_with_length(line, reversed=True):
+ length += leaf_length
if length > line_length:
break
- comment: Optional[Leaf]
- for comment in line.comments_after(leaf, index):
- if "\n" in comment.prefix:
- break # Oops, standalone comment!
-
- length += len(comment.value)
- else:
- comment = None
- if comment is not None:
- break # There was a standalone comment, we can't continue.
+ has_inline_comment = leaf_length > len(leaf.value) + len(leaf.prefix)
+ if leaf.type == STANDALONE_COMMENT or has_inline_comment:
+ break
optional_brackets.discard(id(leaf))
if opening_bracket:
@dataclass
class Report:
"""Provides a reformatting counter. Can be rendered with `str(report)`."""
+
check: bool = False
quiet: bool = False
change_count: int = 0
) from None
-def assert_stable(src: str, dst: str, line_length: int, is_pyi: bool = False) -> None:
+def assert_stable(
+ src: str, dst: str, line_length: int, mode: FileMode = FileMode.AUTO_DETECT
+) -> None:
"""Raise AssertionError if `dst` reformats differently the second time."""
- newdst = format_str(dst, line_length=line_length, is_pyi=is_pyi)
+ newdst = format_str(dst, line_length=line_length, mode=mode)
if dst != newdst:
log = dump_to_file(
diff(src, dst, "source", "first pass"),
index -= 1
+def enumerate_with_length(
+ line: Line, reversed: bool = False
+) -> Iterator[Tuple[Index, Leaf, int]]:
+ """Return an enumeration of leaves with their length.
+
+ Stops prematurely on multiline strings and standalone comments.
+ """
+ op = cast(
+ Callable[[Sequence[Leaf]], Iterator[Tuple[Index, Leaf]]],
+ enumerate_reversed if reversed else enumerate,
+ )
+ for index, leaf in op(line.leaves):
+ length = len(leaf.prefix) + len(leaf.value)
+ if "\n" in leaf.value:
+ return # Multiline strings, we can't continue.
+
+ comment: Optional[Leaf]
+ for comment in line.comments_after(leaf, index):
+ length += len(comment.value)
+
+ yield index, leaf, length
+
+
def is_line_short_enough(line: Line, *, line_length: int, line_str: str = "") -> bool:
"""Return True if `line` is no longer than `line_length`.
)
-CACHE_DIR = Path(user_cache_dir("black", version=__version__))
+def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
+ """Does `line` have a shape safe to reformat without optional parens around it?
+ Returns True for only a subset of potentially nice looking formattings but
+ the point is to not return false positives that end up producing lines that
+ are too long.
+ """
+ bt = line.bracket_tracker
+ if not bt.delimiters:
+ # Without delimiters the optional parentheses are useless.
+ return True
-def get_cache_file(line_length: int) -> Path:
- return CACHE_DIR / f"cache.{line_length}.pickle"
+ max_priority = bt.max_delimiter_priority()
+ if bt.delimiter_count_with_priority(max_priority) > 1:
+ # With more than one delimiter of a kind the optional parentheses read better.
+ return False
+
+ if max_priority == DOT_PRIORITY:
+ # A single stranded method call doesn't require optional parentheses.
+ return True
+
+ assert len(line.leaves) >= 2, "Stranded delimiter"
+
+ first = line.leaves[0]
+ second = line.leaves[1]
+ penultimate = line.leaves[-2]
+ last = line.leaves[-1]
+
+ # With a single delimiter, omit if the expression starts or ends with
+ # a bracket.
+ if first.type in OPENING_BRACKETS and second.type not in CLOSING_BRACKETS:
+ remainder = False
+ length = 4 * line.depth
+ for _index, leaf, leaf_length in enumerate_with_length(line):
+ if leaf.type in CLOSING_BRACKETS and leaf.opening_bracket is first:
+ remainder = True
+ if remainder:
+ length += leaf_length
+ if length > line_length:
+ break
+
+ if leaf.type in OPENING_BRACKETS:
+ # There are brackets we can further split on.
+ remainder = False
+
+ else:
+ # checked the entire string and line length wasn't exceeded
+ if len(line.leaves) == _index + 1:
+ return True
+ # Note: we are not returning False here because a line might have *both*
+ # a leading opening bracket and a trailing closing bracket. If the
+ # opening bracket doesn't match our rule, maybe the closing will.
-def read_cache(line_length: int) -> Cache:
+ if (
+ last.type == token.RPAR
+ or last.type == token.RBRACE
+ or (
+ # don't use indexing for omitting optional parentheses;
+ # it looks weird
+ last.type == token.RSQB
+ and last.parent
+ and last.parent.type != syms.trailer
+ )
+ ):
+ if penultimate.type in OPENING_BRACKETS:
+ # Empty brackets don't help.
+ return False
+
+ if is_multiline_string(first):
+ # Additional wrapping of a multiline string in this situation is
+ # unnecessary.
+ return True
+
+ length = 4 * line.depth
+ seen_other_brackets = False
+ for _index, leaf, leaf_length in enumerate_with_length(line):
+ length += leaf_length
+ if leaf is last.opening_bracket:
+ if seen_other_brackets or length <= line_length:
+ return True
+
+ elif leaf.type in OPENING_BRACKETS:
+ # There are brackets we can further split on.
+ seen_other_brackets = True
+
+ return False
+
+
+def get_cache_file(line_length: int, mode: FileMode) -> Path:
+ pyi = bool(mode & FileMode.PYI)
+ py36 = bool(mode & FileMode.PYTHON36)
+ return (
+ CACHE_DIR
+ / f"cache.{line_length}{'.pyi' if pyi else ''}{'.py36' if py36 else ''}.pickle"
+ )
+
+
+def read_cache(line_length: int, mode: FileMode) -> Cache:
"""Read the cache if it exists and is well formed.
If it is not well formed, the call to write_cache later should resolve the issue.
"""
- cache_file = get_cache_file(line_length)
+ cache_file = get_cache_file(line_length, mode)
if not cache_file.exists():
return {}
return todo, done
-def write_cache(cache: Cache, sources: List[Path], line_length: int) -> None:
+def write_cache(
+ cache: Cache, sources: List[Path], line_length: int, mode: FileMode
+) -> None:
"""Update the cache file."""
- cache_file = get_cache_file(line_length)
+ cache_file = get_cache_file(line_length, mode)
try:
if not CACHE_DIR.exists():
CACHE_DIR.mkdir(parents=True)