import asyncio
-import pickle
from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
+from datetime import datetime
from enum import Enum, Flag
-from functools import partial, wraps
+from functools import lru_cache, partial, wraps
+import io
import keyword
import logging
from multiprocessing import Manager
import os
from pathlib import Path
+import pickle
import re
-import tokenize
import signal
import sys
+import tokenize
from typing import (
Any,
Callable,
from appdirs import user_cache_dir
from attr import dataclass, Factory
import click
+import toml
# lib2to3 fork
from blib2to3.pytree import Node, Leaf, type_repr
from blib2to3.pgen2.parse import ParseError
-__version__ = "18.5b1"
+__version__ = "18.6b2"
DEFAULT_LINE_LENGTH = 88
DEFAULT_EXCLUDES = (
r"/(\.git|\.hg|\.mypy_cache|\.tox|\.venv|_build|buck-out|build|dist)/"
# types
FileContent = str
Encoding = str
+NewLine = str
Depth = int
NodeType = int
LeafID = int
YES = 1
DIFF = 2
+ @classmethod
+ def from_configuration(cls, *, check: bool, diff: bool) -> "WriteBack":
+ if check and not diff:
+ return cls.NO
+
+ return cls.DIFF if diff else cls.YES
+
class Changed(Enum):
NO = 0
PYI = 2
NO_STRING_NORMALIZATION = 4
+ @classmethod
+ def from_configuration(
+ cls, *, py36: bool, pyi: bool, skip_string_normalization: bool
+ ) -> "FileMode":
+ mode = cls.AUTO_DETECT
+ if py36:
+ mode |= cls.PYTHON36
+ if pyi:
+ mode |= cls.PYI
+ if skip_string_normalization:
+ mode |= cls.NO_STRING_NORMALIZATION
+ return mode
+
+
+def read_pyproject_toml(
+ ctx: click.Context, param: click.Parameter, value: Union[str, int, bool, None]
+) -> Optional[str]:
+ """Inject Black configuration from "pyproject.toml" into defaults in `ctx`.
+
+ Returns the path to a successfully found and read configuration file, None
+ otherwise.
+ """
+ assert not isinstance(value, (int, bool)), "Invalid parameter type passed"
+ if not value:
+ root = find_project_root(ctx.params.get("src", ()))
+ path = root / "pyproject.toml"
+ if path.is_file():
+ value = str(path)
+ else:
+ return None
+
+ try:
+ pyproject_toml = toml.load(value)
+ config = pyproject_toml.get("tool", {}).get("black", {})
+ except (toml.TomlDecodeError, OSError) as e:
+ raise click.BadOptionUsage(f"Error reading configuration file: {e}", ctx)
+
+ if not config:
+ return None
+
+ if ctx.default_map is None:
+ ctx.default_map = {}
+ ctx.default_map.update( # type: ignore # bad types in .pyi
+ {k.replace("--", "").replace("-", "_"): v for k, v in config.items()}
+ )
+ return value
+
-@click.command()
+@click.command(context_settings=dict(help_option_names=["-h", "--help"]))
@click.option(
"-l",
"--line-length",
"silence those with 2>/dev/null."
),
)
+@click.option(
+ "-v",
+ "--verbose",
+ is_flag=True,
+ help=(
+ "Also emit messages to stderr about files that were not changed or were "
+ "ignored due to --exclude=."
+ ),
+)
@click.version_option(version=__version__)
@click.argument(
"src",
type=click.Path(
exists=True, file_okay=True, dir_okay=True, readable=True, allow_dash=True
),
+ is_eager=True,
+)
+@click.option(
+ "--config",
+ type=click.Path(
+ exists=False, file_okay=True, dir_okay=False, readable=True, allow_dash=False
+ ),
+ is_eager=True,
+ callback=read_pyproject_toml,
+ help="Read configuration from PATH.",
)
@click.pass_context
def main(
py36: bool,
skip_string_normalization: bool,
quiet: bool,
+ verbose: bool,
include: str,
exclude: str,
- src: List[str],
+ src: Tuple[str],
+ config: Optional[str],
) -> None:
"""The uncompromising code formatter."""
- sources: List[Path] = []
+ write_back = WriteBack.from_configuration(check=check, diff=diff)
+ mode = FileMode.from_configuration(
+ py36=py36, pyi=pyi, skip_string_normalization=skip_string_normalization
+ )
+ if config and verbose:
+ out(f"Using configuration from {config}.", bold=False, fg="blue")
try:
- include_regex = re.compile(include)
+ include_regex = re_compile_maybe_verbose(include)
except re.error:
err(f"Invalid regular expression for include given: {include!r}")
ctx.exit(2)
try:
- exclude_regex = re.compile(exclude)
+ exclude_regex = re_compile_maybe_verbose(exclude)
except re.error:
err(f"Invalid regular expression for exclude given: {exclude!r}")
ctx.exit(2)
+ report = Report(check=check, quiet=quiet, verbose=verbose)
+ root = find_project_root(src)
+ sources: Set[Path] = set()
for s in src:
p = Path(s)
if p.is_dir():
- sources.extend(gen_python_files_in_dir(p, include_regex, exclude_regex))
- elif p.is_file():
+ sources.update(
+ gen_python_files_in_dir(p, root, include_regex, exclude_regex, report)
+ )
+ elif p.is_file() or s == "-":
# if a file was explicitly given, we don't care about its extension
- sources.append(p)
- elif s == "-":
- sources.append(Path("-"))
+ sources.add(p)
else:
err(f"invalid path: {s}")
-
- if check and not diff:
- write_back = WriteBack.NO
- elif diff:
- write_back = WriteBack.DIFF
- else:
- write_back = WriteBack.YES
- mode = FileMode.AUTO_DETECT
- if py36:
- mode |= FileMode.PYTHON36
- if pyi:
- mode |= FileMode.PYI
- if skip_string_normalization:
- mode |= FileMode.NO_STRING_NORMALIZATION
- report = Report(check=check, quiet=quiet)
if len(sources) == 0:
- out("No paths given. Nothing to do 😴")
+ if verbose or not quiet:
+ out("No paths given. Nothing to do 😴")
ctx.exit(0)
- return
- elif len(sources) == 1:
+ if len(sources) == 1:
reformat_one(
- src=sources[0],
+ src=sources.pop(),
line_length=line_length,
fast=fast,
write_back=write_back,
)
finally:
shutdown(loop)
- if not quiet:
- out("All done! ✨ 🍰 ✨")
- click.echo(str(report))
+ if verbose or not quiet:
+ bang = "💥 💔 💥" if report.return_code else "✨ 🍰 ✨"
+ out(f"All done! {bang}")
+ click.secho(str(report), err=True)
ctx.exit(report.return_code)
cache: Cache = {}
if write_back != WriteBack.DIFF:
cache = read_cache(line_length, mode)
- src = src.resolve()
- if src in cache and cache[src] == get_cache_info(src):
+ res_src = src.resolve()
+ if res_src in cache and cache[res_src] == get_cache_info(res_src):
changed = Changed.CACHED
if changed is not Changed.CACHED and format_file_in_place(
src,
async def schedule_formatting(
- sources: List[Path],
+ sources: Set[Path],
line_length: int,
fast: bool,
write_back: WriteBack,
if write_back != WriteBack.DIFF:
cache = read_cache(line_length, mode)
sources, cached = filter_cached(cache, sources)
- for src in cached:
+ for src in sorted(cached):
report.done(src, Changed.CACHED)
cancelled = []
formatted = []
"""
if src.suffix == ".pyi":
mode |= FileMode.PYI
- with tokenize.open(src) as src_buffer:
- src_contents = src_buffer.read()
+
+ then = datetime.utcfromtimestamp(src.stat().st_mtime)
+ with open(src, "rb") as buf:
+ src_contents, encoding, newline = decode_bytes(buf.read())
try:
dst_contents = format_file_contents(
src_contents, line_length=line_length, fast=fast, mode=mode
return False
if write_back == write_back.YES:
- with open(src, "w", encoding=src_buffer.encoding) as f:
+ with open(src, "w", encoding=encoding, newline=newline) as f:
f.write(dst_contents)
elif write_back == write_back.DIFF:
- src_name = f"{src} (original)"
- dst_name = f"{src} (formatted)"
+ now = datetime.utcnow()
+ src_name = f"{src}\t{then} +0000"
+ dst_name = f"{src}\t{now} +0000"
diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
if lock:
lock.acquire()
try:
- sys.stdout.write(diff_contents)
+ f = io.TextIOWrapper(
+ sys.stdout.buffer,
+ encoding=encoding,
+ newline=newline,
+ write_through=True,
+ )
+ f.write(diff_contents)
+ f.detach()
finally:
if lock:
lock.release()
`line_length`, `fast`, `is_pyi`, and `force_py36` arguments are passed to
:func:`format_file_contents`.
"""
- src = sys.stdin.read()
+ then = datetime.utcnow()
+ src, encoding, newline = decode_bytes(sys.stdin.buffer.read())
dst = src
try:
dst = format_file_contents(src, line_length=line_length, fast=fast, mode=mode)
return False
finally:
+ f = io.TextIOWrapper(
+ sys.stdout.buffer, encoding=encoding, newline=newline, write_through=True
+ )
if write_back == WriteBack.YES:
- sys.stdout.write(dst)
+ f.write(dst)
elif write_back == WriteBack.DIFF:
- src_name = "<stdin> (original)"
- dst_name = "<stdin> (formatted)"
- sys.stdout.write(diff(src, dst, src_name, dst_name))
+ now = datetime.utcnow()
+ src_name = f"STDIN\t{then} +0000"
+ dst_name = f"STDOUT\t{now} +0000"
+ f.write(diff(src, dst, src_name, dst_name))
+ f.detach()
def format_file_contents(
return dst_contents
+def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine]:
+ """Return a tuple of (decoded_contents, encoding, newline).
+
+ `newline` is either CRLF or LF but `decoded_contents` is decoded with
+ universal newlines (i.e. only contains LF).
+ """
+ srcbuf = io.BytesIO(src)
+ encoding, lines = tokenize.detect_encoding(srcbuf.readline)
+ if not lines:
+ return "", encoding, "\n"
+
+ newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
+ srcbuf.seek(0)
+ with io.TextIOWrapper(srcbuf, encoding) as tiow:
+ return tiow.read(), encoding, newline
+
+
GRAMMARS = [
pygram.python_grammar_no_print_statement_no_exec_statement,
pygram.python_grammar_no_print_statement,
def lib2to3_parse(src_txt: str) -> Node:
"""Given a string with source, return the lib2to3 Node."""
grammar = pygram.python_grammar_no_print_statement
- if src_txt[-1] != "\n":
- nl = "\r\n" if "\r\n" in src_txt[:1024] else "\n"
- src_txt += nl
+ if src_txt[-1:] != "\n":
+ src_txt += "\n"
for grammar in GRAMMARS:
drv = driver.Driver(grammar, pytree.convert)
try:
syms.dictsetmaker,
syms.listmaker,
syms.testlist_gexp,
+ syms.testlist_star_expr,
}
TEST_DESCENDANTS = {
syms.test,
return False
+ def contains_multiline_strings(self) -> bool:
+ for leaf in self.leaves:
+ if is_multiline_string(leaf):
+ return True
+
+ return False
+
def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
"""Remove trailing comma if there is one and it's safe."""
if not (
Provide a non-negative leaf `_index` to speed up the function.
"""
+ if not self.comments:
+ return
+
if _index == -1:
for _index, _leaf in enumerate(self.leaves):
if leaf is _leaf:
def is_complex_subscript(self, leaf: Leaf) -> bool:
"""Return True iff `leaf` is part of a slice with non-trivial exprs."""
- open_lsqb = (
- leaf if leaf.type == token.LSQB else self.bracket_tracker.get_open_lsqb()
- )
+ open_lsqb = self.bracket_tracker.get_open_lsqb()
if open_lsqb is None:
return False
subscript_start = open_lsqb.next_sibling
- if (
- isinstance(subscript_start, Node)
- and subscript_start.type == syms.subscriptlist
- ):
- subscript_start = child_towards(subscript_start, leaf)
+
+ if isinstance(subscript_start, Node):
+ if subscript_start.type == syms.listmaker:
+ return False
+
+ if subscript_start.type == syms.subscriptlist:
+ subscript_start = child_towards(subscript_start, leaf)
return subscript_start is not None and any(
n.type in TEST_DESCENDANTS for n in subscript_start.pre_order()
)
elif prevp.type == token.EQUAL and prevp_parent.type == syms.argument:
return NO
- elif t == token.NAME or t == token.NUMBER:
+ elif t in {token.NAME, token.NUMBER, token.STRING}:
return NO
elif p.type == syms.import_from:
result.append(leaf, preformatted=True)
for comment_after in line.comments_after(leaf):
result.append(comment_after, preformatted=True)
- bracket_split_succeeded_or_raise(head, body, tail)
assert opening_bracket and closing_bracket
+ body.should_explode = should_explode(body, opening_bracket)
+ bracket_split_succeeded_or_raise(head, body, tail)
if (
+ # the body shouldn't be exploded
+ not body.should_explode
# the opening bracket is an optional paren
- opening_bracket.type == token.LPAR
+ and opening_bracket.type == token.LPAR
and not opening_bracket.value
# the closing bracket is an optional paren
and closing_bracket.type == token.RPAR
and not closing_bracket.value
- # there are no standalone comments in the body
- and not line.contains_standalone_comments(0)
- # and it's not an import (optional parens are the only thing we can split
- # on in this case; attempting a split without them is a waste of time)
+ # it's not an import (optional parens are the only thing we can split on
+ # in this case; attempting a split without them is a waste of time)
and not line.is_import
+ # there are no standalone comments in the body
+ and not body.contains_standalone_comments(0)
+ # and we can actually remove the parens
+ and can_omit_invisible_parens(body, line_length)
):
omit = {id(closing_bracket), *omit}
- if can_omit_invisible_parens(body, line_length):
- try:
- yield from right_hand_split(line, line_length, py36=py36, omit=omit)
- return
- except CannotSplit:
- pass
+ try:
+ yield from right_hand_split(line, line_length, py36=py36, omit=omit)
+ return
+
+ except CannotSplit:
+ if not (
+ can_be_split(body)
+ or is_line_short_enough(body, line_length=line_length)
+ ):
+ raise CannotSplit(
+ "Splitting failed, body is still too long and can't be split."
+ )
+
+ elif head.contains_multiline_strings() or tail.contains_multiline_strings():
+ raise CannotSplit(
+ "The current optional pair of parentheses is bound to fail to "
+ "satisfy the splitting algorithm because the head or the tail "
+ "contains multiline strings which by definition never fit one "
+ "line."
+ )
ensure_visible(opening_bracket)
ensure_visible(closing_bracket)
- body.should_explode = should_explode(body, opening_bracket)
for result in (head, body, tail):
if result:
yield result
prefix = leaf.value[:first_quote_pos]
unescaped_new_quote = re.compile(rf"(([^\\]|^)(\\\\)*){new_quote}")
- escaped_new_quote = re.compile(rf"([^\\]|^)\\(\\\\)*{new_quote}")
- escaped_orig_quote = re.compile(rf"([^\\]|^)\\(\\\\)*{orig_quote}")
+ escaped_new_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){new_quote}")
+ escaped_orig_quote = re.compile(rf"([^\\]|^)\\((?:\\\\)*){orig_quote}")
body = leaf.value[first_quote_pos + len(orig_quote) : -len(orig_quote)]
if "r" in prefix.casefold():
if unescaped_new_quote.search(body):
# Do not introduce or remove backslashes in raw strings
new_body = body
else:
- # remove unnecessary quotes
+ # remove unnecessary escapes
new_body = sub_twice(escaped_new_quote, rf"\1\2{new_quote}", body)
if body != new_body:
- # Consider the string without unnecessary quotes as the original
+ # Consider the string without unnecessary escapes as the original
body = new_body
leaf.value = f"{prefix}{orig_quote}{body}{orig_quote}"
new_body = sub_twice(escaped_orig_quote, rf"\1\2{orig_quote}", new_body)
new_body = sub_twice(unescaped_new_quote, rf"\1\\{new_quote}", new_body)
- if new_quote == '"""' and new_body[-1] == '"':
+ if "f" in prefix.casefold():
+ matches = re.findall(r"[^{]\{(.*?)\}[^}]", new_body)
+ for m in matches:
+ if "\\" in str(m):
+ # Do not introduce backslashes in interpolated expressions
+ return
+ if new_quote == '"""' and new_body[-1:] == '"':
# edge case:
new_body = new_body[:-1] + '\\"'
orig_escape_count = body.count("\\")
def maybe_make_parens_invisible_in_atom(node: LN) -> bool:
- """If it's safe, make the parens in the atom `node` invisible, recusively."""
+ """If it's safe, make the parens in the atom `node` invisible, recursively."""
if (
node.type != syms.atom
or is_empty_tuple(node)
def gen_python_files_in_dir(
- path: Path, include: Pattern[str], exclude: Pattern[str]
+ path: Path,
+ root: Path,
+ include: Pattern[str],
+ exclude: Pattern[str],
+ report: "Report",
) -> Iterator[Path]:
"""Generate all files under `path` whose paths are not excluded by the
`exclude` regex, but are included by the `include` regex.
+
+ Symbolic links pointing outside of the root directory are ignored.
+
+ `report` is where output about exclusions goes.
"""
+ assert root.is_absolute(), f"INTERNAL ERROR: `root` must be absolute but is {root}"
for child in path.iterdir():
- normalized_path = child.resolve().as_posix()
+ try:
+ normalized_path = "/" + child.resolve().relative_to(root).as_posix()
+ except ValueError:
+ if child.is_symlink():
+ report.path_ignored(
+ child,
+ "is a symbolic link that points outside of the root directory",
+ )
+ continue
+
+ raise
+
if child.is_dir():
normalized_path += "/"
exclude_match = exclude.search(normalized_path)
if exclude_match and exclude_match.group(0):
+ report.path_ignored(child, f"matches the --exclude regular expression")
continue
if child.is_dir():
- yield from gen_python_files_in_dir(child, include, exclude)
+ yield from gen_python_files_in_dir(child, root, include, exclude, report)
elif child.is_file():
include_match = include.search(normalized_path)
yield child
+@lru_cache()
+def find_project_root(srcs: Iterable[str]) -> Path:
+ """Return a directory containing .git, .hg, or pyproject.toml.
+
+ That directory can be one of the directories passed in `srcs` or their
+ common parent.
+
+ If no directory in the tree contains a marker that would specify it's the
+ project root, the root of the file system is returned.
+ """
+ if not srcs:
+ return Path("/").resolve()
+
+ common_base = min(Path(src).resolve() for src in srcs)
+ if common_base.is_dir():
+ # Append a fake file so `parents` below returns `common_base_dir`, too.
+ common_base /= "fake-file"
+ for directory in common_base.parents:
+ if (directory / ".git").is_dir():
+ return directory
+
+ if (directory / ".hg").is_dir():
+ return directory
+
+ if (directory / "pyproject.toml").is_file():
+ return directory
+
+ return directory
+
+
@dataclass
class Report:
"""Provides a reformatting counter. Can be rendered with `str(report)`."""
check: bool = False
quiet: bool = False
+ verbose: bool = False
change_count: int = 0
same_count: int = 0
failure_count: int = 0
"""Increment the counter for successful reformatting. Write out a message."""
if changed is Changed.YES:
reformatted = "would reformat" if self.check else "reformatted"
- if not self.quiet:
+ if self.verbose or not self.quiet:
out(f"{reformatted} {src}")
self.change_count += 1
else:
- if not self.quiet:
+ if self.verbose:
if changed is Changed.NO:
msg = f"{src} already well formatted, good job."
else:
err(f"error: cannot format {src}: {message}")
self.failure_count += 1
+ def path_ignored(self, path: Path, message: str) -> None:
+ if self.verbose:
+ out(f"{path} ignored: {message}", bold=False)
+
@property
def return_code(self) -> int:
"""Return the exit code that the app should use.
return regex.sub(replacement, regex.sub(replacement, original))
+def re_compile_maybe_verbose(regex: str) -> Pattern[str]:
+ """Compile a regular expression string in `regex`.
+
+ If it contains newlines, use verbose mode.
+ """
+ if "\n" in regex:
+ regex = "(?x)" + regex
+ return re.compile(regex)
+
+
def enumerate_reversed(sequence: Sequence[T]) -> Iterator[Tuple[Index, T]]:
"""Like `reversed(enumerate(sequence))` if that were possible."""
index = len(sequence) - 1
)
+def can_be_split(line: Line) -> bool:
+ """Return False if the line cannot be split *for sure*.
+
+ This is not an exhaustive search but a cheap heuristic that we can use to
+ avoid some unfortunate formattings (mostly around wrapping unsplittable code
+ in unnecessary parentheses).
+ """
+ leaves = line.leaves
+ if len(leaves) < 2:
+ return False
+
+ if leaves[0].type == token.STRING and leaves[1].type == token.DOT:
+ call_count = 0
+ dot_count = 0
+ next = leaves[-1]
+ for leaf in leaves[-2::-1]:
+ if leaf.type in OPENING_BRACKETS:
+ if next.type not in CLOSING_BRACKETS:
+ return False
+
+ call_count += 1
+ elif leaf.type == token.DOT:
+ dot_count += 1
+ elif leaf.type == token.NAME:
+ if not (next.type == token.DOT or next.type in OPENING_BRACKETS):
+ return False
+
+ elif leaf.type not in CLOSING_BRACKETS:
+ return False
+
+ if dot_count > 1 and call_count > 1:
+ return False
+
+ return True
+
+
def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
"""Does `line` have a shape safe to reformat without optional parens around it?
def get_cache_file(line_length: int, mode: FileMode) -> Path:
- pyi = bool(mode & FileMode.PYI)
- py36 = bool(mode & FileMode.PYTHON36)
- return (
- CACHE_DIR
- / f"cache.{line_length}{'.pyi' if pyi else ''}{'.py36' if py36 else ''}.pickle"
- )
+ return CACHE_DIR / f"cache.{line_length}.{mode.value}.pickle"
def read_cache(line_length: int, mode: FileMode) -> Cache:
return stat.st_mtime, stat.st_size
-def filter_cached(
- cache: Cache, sources: Iterable[Path]
-) -> Tuple[List[Path], List[Path]]:
- """Split a list of paths into two.
+def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
+ """Split an iterable of paths in `sources` into two sets.
- The first list contains paths of files that modified on disk or are not in the
- cache. The other list contains paths to non-modified files.
+ The first contains paths of files that modified on disk or are not in the
+ cache. The other contains paths to non-modified files.
"""
- todo, done = [], []
+ todo, done = set(), set()
for src in sources:
src = src.resolve()
if cache.get(src) != get_cache_info(src):
- todo.append(src)
+ todo.add(src)
else:
- done.append(src)
+ done.add(src)
return todo, done
def write_cache(
- cache: Cache, sources: List[Path], line_length: int, mode: FileMode
+ cache: Cache, sources: Iterable[Path], line_length: int, mode: FileMode
) -> None:
"""Update the cache file."""
cache_file = get_cache_file(line_length, mode)