import pickle
from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
-from enum import Enum
+from enum import Enum, Flag
from functools import partial, wraps
import keyword
import logging
from blib2to3.pgen2.parse import ParseError
-__version__ = "18.5b0"
+__version__ = "18.5b1"
DEFAULT_LINE_LENGTH = 88
+DEFAULT_EXCLUDES = (
+ r"/(\.git|\.hg|\.mypy_cache|\.tox|\.venv|_build|buck-out|build|dist)/"
+)
+DEFAULT_INCLUDES = r"\.pyi?$"
CACHE_DIR = Path(user_cache_dir("black", version=__version__))
YES = 2
+class FileMode(Flag):
+ AUTO_DETECT = 0
+ PYTHON36 = 1
+ PYI = 2
+ NO_STRING_NORMALIZATION = 4
+
+
@click.command()
@click.option(
"-l",
help="How many character per line to allow.",
show_default=True,
)
+@click.option(
+ "--py36",
+ is_flag=True,
+ help=(
+ "Allow using Python 3.6-only syntax on all input files. This will put "
+ "trailing commas in function signatures and calls also after *args and "
+ "**kwargs. [default: per-file auto-detection]"
+ ),
+)
+@click.option(
+ "--pyi",
+ is_flag=True,
+ help=(
+ "Format all input files like typing stubs regardless of file extension "
+ "(useful when piping source on standard input)."
+ ),
+)
+@click.option(
+ "-S",
+ "--skip-string-normalization",
+ is_flag=True,
+ help="Don't normalize string quotes or prefixes.",
+)
@click.option(
"--check",
is_flag=True,
is_flag=True,
help="If --fast given, skip temporary sanity checks. [default: --safe]",
)
+@click.option(
+ "--include",
+ type=str,
+ default=DEFAULT_INCLUDES,
+ help=(
+ "A regular expression that matches files and directories that should be "
+ "included on recursive searches. An empty value means all files are "
+ "included regardless of the name. Use forward slashes for directories on "
+ "all platforms (Windows, too). Exclusions are calculated first, inclusions "
+ "later."
+ ),
+ show_default=True,
+)
+@click.option(
+ "--exclude",
+ type=str,
+ default=DEFAULT_EXCLUDES,
+ help=(
+ "A regular expression that matches files and directories that should be "
+ "excluded on recursive searches. An empty value means no paths are excluded. "
+ "Use forward slashes for directories on all platforms (Windows, too). "
+ "Exclusions are calculated first, inclusions later."
+ ),
+ show_default=True,
+)
@click.option(
"-q",
"--quiet",
check: bool,
diff: bool,
fast: bool,
+ pyi: bool,
+ py36: bool,
+ skip_string_normalization: bool,
quiet: bool,
+ include: str,
+ exclude: str,
src: List[str],
) -> None:
"""The uncompromising code formatter."""
sources: List[Path] = []
+ try:
+ include_regex = re.compile(include)
+ except re.error:
+ err(f"Invalid regular expression for include given: {include!r}")
+ ctx.exit(2)
+ try:
+ exclude_regex = re.compile(exclude)
+ except re.error:
+ err(f"Invalid regular expression for exclude given: {exclude!r}")
+ ctx.exit(2)
+ root = find_project_root(src)
for s in src:
p = Path(s)
if p.is_dir():
- sources.extend(gen_python_files_in_dir(p))
- elif p.is_file():
+ sources.extend(
+ gen_python_files_in_dir(p, root, include_regex, exclude_regex)
+ )
+ elif p.is_file() or s == "-":
# if a file was explicitly given, we don't care about its extension
sources.append(p)
- elif s == "-":
- sources.append(Path("-"))
else:
err(f"invalid path: {s}")
write_back = WriteBack.DIFF
else:
write_back = WriteBack.YES
+ mode = FileMode.AUTO_DETECT
+ if py36:
+ mode |= FileMode.PYTHON36
+ if pyi:
+ mode |= FileMode.PYI
+ if skip_string_normalization:
+ mode |= FileMode.NO_STRING_NORMALIZATION
report = Report(check=check, quiet=quiet)
if len(sources) == 0:
out("No paths given. Nothing to do 😴")
return
elif len(sources) == 1:
- reformat_one(sources[0], line_length, fast, write_back, report)
+ reformat_one(
+ src=sources[0],
+ line_length=line_length,
+ fast=fast,
+ write_back=write_back,
+ mode=mode,
+ report=report,
+ )
else:
loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=os.cpu_count())
try:
loop.run_until_complete(
schedule_formatting(
- sources, line_length, fast, write_back, report, loop, executor
+ sources=sources,
+ line_length=line_length,
+ fast=fast,
+ write_back=write_back,
+ mode=mode,
+ report=report,
+ loop=loop,
+ executor=executor,
)
)
finally:
def reformat_one(
- src: Path, line_length: int, fast: bool, write_back: WriteBack, report: "Report"
+ src: Path,
+ line_length: int,
+ fast: bool,
+ write_back: WriteBack,
+ mode: FileMode,
+ report: "Report",
) -> None:
"""Reformat a single file under `src` without spawning child processes.
If `quiet` is True, non-error messages are not output. `line_length`,
- `write_back`, and `fast` options are passed to :func:`format_file_in_place`.
+ `write_back`, `fast` and `pyi` options are passed to
+ :func:`format_file_in_place` or :func:`format_stdin_to_stdout`.
"""
try:
changed = Changed.NO
if not src.is_file() and str(src) == "-":
if format_stdin_to_stdout(
- line_length=line_length, fast=fast, write_back=write_back
+ line_length=line_length, fast=fast, write_back=write_back, mode=mode
):
changed = Changed.YES
else:
cache: Cache = {}
if write_back != WriteBack.DIFF:
- cache = read_cache(line_length)
- src = src.resolve()
- if src in cache and cache[src] == get_cache_info(src):
+ cache = read_cache(line_length, mode)
+ res_src = src.resolve()
+ if res_src in cache and cache[res_src] == get_cache_info(res_src):
changed = Changed.CACHED
if changed is not Changed.CACHED and format_file_in_place(
- src, line_length=line_length, fast=fast, write_back=write_back
+ src,
+ line_length=line_length,
+ fast=fast,
+ write_back=write_back,
+ mode=mode,
):
changed = Changed.YES
if write_back == WriteBack.YES and changed is not Changed.NO:
- write_cache(cache, [src], line_length)
+ write_cache(cache, [src], line_length, mode)
report.done(src, changed)
except Exception as exc:
report.failed(src, str(exc))
line_length: int,
fast: bool,
write_back: WriteBack,
+ mode: FileMode,
report: "Report",
loop: BaseEventLoop,
executor: Executor,
(Use ProcessPoolExecutors for actual parallelism.)
- `line_length`, `write_back`, and `fast` options are passed to
+ `line_length`, `write_back`, `fast`, and `pyi` options are passed to
:func:`format_file_in_place`.
"""
cache: Cache = {}
if write_back != WriteBack.DIFF:
- cache = read_cache(line_length)
+ cache = read_cache(line_length, mode)
sources, cached = filter_cached(cache, sources)
for src in cached:
report.done(src, Changed.CACHED)
lock = manager.Lock()
tasks = {
loop.run_in_executor(
- executor, format_file_in_place, src, line_length, fast, write_back, lock
+ executor,
+ format_file_in_place,
+ src,
+ line_length,
+ fast,
+ write_back,
+ mode,
+ lock,
): src
for src in sorted(sources)
}
if cancelled:
await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
if write_back == WriteBack.YES and formatted:
- write_cache(cache, formatted, line_length)
+ write_cache(cache, formatted, line_length, mode)
def format_file_in_place(
line_length: int,
fast: bool,
write_back: WriteBack = WriteBack.NO,
+ mode: FileMode = FileMode.AUTO_DETECT,
lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
) -> bool:
"""Format file under `src` path. Return True if changed.
If `write_back` is True, write reformatted code back to stdout.
`line_length` and `fast` options are passed to :func:`format_file_contents`.
"""
- is_pyi = src.suffix == ".pyi"
-
+ if src.suffix == ".pyi":
+ mode |= FileMode.PYI
with tokenize.open(src) as src_buffer:
src_contents = src_buffer.read()
try:
dst_contents = format_file_contents(
- src_contents, line_length=line_length, fast=fast, is_pyi=is_pyi
+ src_contents, line_length=line_length, fast=fast, mode=mode
)
except NothingChanged:
return False
def format_stdin_to_stdout(
- line_length: int, fast: bool, write_back: WriteBack = WriteBack.NO
+ line_length: int,
+ fast: bool,
+ write_back: WriteBack = WriteBack.NO,
+ mode: FileMode = FileMode.AUTO_DETECT,
) -> bool:
"""Format file on stdin. Return True if changed.
If `write_back` is True, write reformatted code back to stdout.
- `line_length` and `fast` arguments are passed to :func:`format_file_contents`.
+ `line_length`, `fast`, `is_pyi`, and `force_py36` arguments are passed to
+ :func:`format_file_contents`.
"""
src = sys.stdin.read()
dst = src
try:
- dst = format_file_contents(src, line_length=line_length, fast=fast)
+ dst = format_file_contents(src, line_length=line_length, fast=fast, mode=mode)
return True
except NothingChanged:
def format_file_contents(
- src_contents: str, *, line_length: int, fast: bool, is_pyi: bool = False
+ src_contents: str,
+ *,
+ line_length: int,
+ fast: bool,
+ mode: FileMode = FileMode.AUTO_DETECT,
) -> FileContent:
"""Reformat contents a file and return new contents.
if src_contents.strip() == "":
raise NothingChanged
- dst_contents = format_str(src_contents, line_length=line_length, is_pyi=is_pyi)
+ dst_contents = format_str(src_contents, line_length=line_length, mode=mode)
if src_contents == dst_contents:
raise NothingChanged
if not fast:
assert_equivalent(src_contents, dst_contents)
- assert_stable(
- src_contents, dst_contents, line_length=line_length, is_pyi=is_pyi
- )
+ assert_stable(src_contents, dst_contents, line_length=line_length, mode=mode)
return dst_contents
def format_str(
- src_contents: str, line_length: int, *, is_pyi: bool = False
+ src_contents: str, line_length: int, *, mode: FileMode = FileMode.AUTO_DETECT
) -> FileContent:
"""Reformat a string and return new contents.
src_node = lib2to3_parse(src_contents)
dst_contents = ""
future_imports = get_future_imports(src_node)
- elt = EmptyLineTracker(is_pyi=is_pyi)
- py36 = is_python36(src_node)
+ is_pyi = bool(mode & FileMode.PYI)
+ py36 = bool(mode & FileMode.PYTHON36) or is_python36(src_node)
+ normalize_strings = not bool(mode & FileMode.NO_STRING_NORMALIZATION)
lines = LineGenerator(
- remove_u_prefix=py36 or "unicode_literals" in future_imports, is_pyi=is_pyi
+ remove_u_prefix=py36 or "unicode_literals" in future_imports,
+ is_pyi=is_pyi,
+ normalize_strings=normalize_strings,
)
+ elt = EmptyLineTracker(is_pyi=is_pyi)
empty_line = Line()
after = 0
for current_line in lines.visit(src_node):
and second_leaf.value == "def"
)
- @property
- def is_flow_control(self) -> bool:
- """Is this line a flow control statement?
-
- Those are `return`, `raise`, `break`, and `continue`.
- """
- return (
- bool(self)
- and self.leaves[0].type == token.NAME
- and self.leaves[0].value in FLOW_CONTROL
- )
-
- @property
- def is_yield(self) -> bool:
- """Is this line a yield statement?"""
- return (
- bool(self)
- and self.leaves[0].type == token.NAME
- and self.leaves[0].value == "yield"
- )
-
@property
def is_class_paren_empty(self) -> bool:
"""Is this a class with no base classes but using parentheses?
and self.leaves[3].value == ")"
)
+ @property
+ def is_triple_quoted_string(self) -> bool:
+ """Is the line a triple quoted string?"""
+ return (
+ bool(self)
+ and self.leaves[0].type == token.STRING
+ and self.leaves[0].value.startswith(('"""', "'''"))
+ )
+
def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
"""If so, needs to be split before emitting."""
for leaf in self.leaves:
the prefix of the first leaf consists of optional newlines. Those newlines
are consumed by `maybe_empty_lines()` and included in the computation.
"""
+
is_pyi: bool = False
previous_line: Optional[Line] = None
previous_after: int = 0
"""Return the number of extra empty lines before and after the `current_line`.
This is for separating `def`, `async def` and `class` with extra empty
- lines (two on module-level), as well as providing an extra empty line
- after flow control keywords to make them more prominent.
+ lines (two on module-level).
"""
if isinstance(current_line, UnformattedLines):
return 0, 0
if self.previous_line.is_decorator:
return 0, 0
+ if self.previous_line.depth < current_line.depth and (
+ self.previous_line.is_class or self.previous_line.is_def
+ ):
+ return 0, 0
+
if (
self.previous_line.is_comment
and self.previous_line.depth == current_line.depth
):
return (before or 1), 0
+ if (
+ self.previous_line
+ and self.previous_line.is_class
+ and current_line.is_triple_quoted_string
+ ):
+ return before, 1
+
return before, 0
Note: destroys the tree it's visiting by mutating prefixes of its leaves
in ways that will no longer stringify to valid Python code on the tree.
"""
+
is_pyi: bool = False
+ normalize_strings: bool = True
current_line: Line = Factory(Line)
remove_u_prefix: bool = False
else:
normalize_prefix(node, inside_brackets=any_open_brackets)
- if node.type == token.STRING:
+ if self.normalize_strings and node.type == token.STRING:
normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
normalize_string_quotes(node)
if node.type not in WHITESPACE:
yield from self.line()
yield from self.visit(child)
- def visit_import_from(self, node: Node) -> Iterator[Line]:
- """Visit import_from and maybe put invisible parentheses.
-
- This is separate from `visit_stmt` because import statements don't
- support arbitrary atoms and thus handling of parentheses is custom.
- """
- check_lpar = False
- for index, child in enumerate(node.children):
- if check_lpar:
- if child.type == token.LPAR:
- # make parentheses invisible
- child.value = "" # type: ignore
- node.children[-1].value = "" # type: ignore
- else:
- # insert invisible parentheses
- node.insert_child(index, Leaf(token.LPAR, ""))
- node.append_child(Leaf(token.RPAR, ""))
- break
-
- check_lpar = (
- child.type == token.NAME and child.value == "import" # type: ignore
- )
-
- for child in node.children:
- yield from self.visit(child)
-
def visit_SEMI(self, leaf: Leaf) -> Iterator[Line]:
"""Remove a semicolon and put the other statement on a separate line."""
yield from self.line()
self.visit_classdef = partial(v, keywords={"class"}, parens=Ø)
self.visit_expr_stmt = partial(v, keywords=Ø, parens=ASSIGNMENTS)
self.visit_return_stmt = partial(v, keywords={"return"}, parens={"return"})
+ self.visit_import_from = partial(v, keywords=Ø, parens={"import"})
self.visit_async_funcdef = self.visit_async_stmt
self.visit_decorated = self.visit_decorators
return # This `node` has a prefix with `# fmt: off`, don't mess with parens.
check_lpar = False
- for child in list(node.children):
+ for index, child in enumerate(list(node.children)):
if check_lpar:
if child.type == syms.atom:
maybe_make_parens_invisible_in_atom(child)
# wrap child in visible parentheses
lpar = Leaf(token.LPAR, "(")
rpar = Leaf(token.RPAR, ")")
- index = child.remove() or 0
+ child.remove()
node.insert_child(index, Node(syms.atom, [lpar, child, rpar]))
+ elif node.type == syms.import_from:
+ # "import from" nodes store parentheses directly as part of
+ # the statement
+ if child.type == token.LPAR:
+ # make parentheses invisible
+ child.value = "" # type: ignore
+ node.children[-1].value = "" # type: ignore
+ elif child.type != token.STAR:
+ # insert invisible parentheses
+ node.insert_child(index, Leaf(token.LPAR, ""))
+ node.append_child(Leaf(token.RPAR, ""))
+ break
+
elif not (isinstance(child, Leaf) and is_multiline_string(child)):
# wrap child in invisible parentheses
lpar = Leaf(token.LPAR, "")
if length > line_length:
break
- if leaf.type == STANDALONE_COMMENT:
+ has_inline_comment = leaf_length > len(leaf.value) + len(leaf.prefix)
+ if leaf.type == STANDALONE_COMMENT or has_inline_comment:
break
optional_brackets.discard(id(leaf))
return imports
-PYTHON_EXTENSIONS = {".py", ".pyi"}
-BLACKLISTED_DIRECTORIES = {
- "build",
- "buck-out",
- "dist",
- "_build",
- ".git",
- ".hg",
- ".mypy_cache",
- ".tox",
- ".venv",
-}
-
-
-def gen_python_files_in_dir(path: Path) -> Iterator[Path]:
- """Generate all files under `path` which aren't under BLACKLISTED_DIRECTORIES
- and have one of the PYTHON_EXTENSIONS.
+def gen_python_files_in_dir(
+ path: Path, root: Path, include: Pattern[str], exclude: Pattern[str]
+) -> Iterator[Path]:
+ """Generate all files under `path` whose paths are not excluded by the
+ `exclude` regex, but are included by the `include` regex.
"""
+ assert root.is_absolute(), f"INTERNAL ERROR: `root` must be absolute but is {root}"
for child in path.iterdir():
+ normalized_path = child.resolve().relative_to(root).as_posix()
if child.is_dir():
- if child.name in BLACKLISTED_DIRECTORIES:
- continue
+ normalized_path += "/"
+ exclude_match = exclude.search(normalized_path)
+ if exclude_match and exclude_match.group(0):
+ continue
+
+ if child.is_dir():
+ yield from gen_python_files_in_dir(child, root, include, exclude)
+
+ elif child.is_file():
+ include_match = include.search(normalized_path)
+ if include_match:
+ yield child
+
+
+def find_project_root(srcs: List[str]) -> Path:
+ """Return a directory containing .git, .hg, or pyproject.toml.
+
+ That directory can be one of the directories passed in `srcs` or their
+ common parent.
+
+ If no directory in the tree contains a marker that would specify it's the
+ project root, the root of the file system is returned.
+ """
+ if not srcs:
+ return Path("/").resolve()
+
+ common_base = min(Path(src).resolve() for src in srcs)
+ if common_base.is_dir():
+ # Append a fake file so `parents` below returns `common_base_dir`, too.
+ common_base /= "fake-file"
+ for directory in common_base.parents:
+ if (directory / ".git").is_dir():
+ return directory
- yield from gen_python_files_in_dir(child)
+ if (directory / ".hg").is_dir():
+ return directory
- elif child.is_file() and child.suffix in PYTHON_EXTENSIONS:
- yield child
+ if (directory / "pyproject.toml").is_file():
+ return directory
+
+ return directory
@dataclass
class Report:
"""Provides a reformatting counter. Can be rendered with `str(report)`."""
+
check: bool = False
quiet: bool = False
change_count: int = 0
) from None
-def assert_stable(src: str, dst: str, line_length: int, is_pyi: bool = False) -> None:
+def assert_stable(
+ src: str, dst: str, line_length: int, mode: FileMode = FileMode.AUTO_DETECT
+) -> None:
"""Raise AssertionError if `dst` reformats differently the second time."""
- newdst = format_str(dst, line_length=line_length, is_pyi=is_pyi)
+ newdst = format_str(dst, line_length=line_length, mode=mode)
if dst != newdst:
log = dump_to_file(
diff(src, dst, "source", "first pass"),
comment: Optional[Leaf]
for comment in line.comments_after(leaf, index):
- if "\n" in comment.prefix:
- return # Oops, standalone comment!
-
length += len(comment.value)
yield index, leaf, length
return False
-def get_cache_file(line_length: int) -> Path:
- return CACHE_DIR / f"cache.{line_length}.pickle"
+def get_cache_file(line_length: int, mode: FileMode) -> Path:
+ pyi = bool(mode & FileMode.PYI)
+ py36 = bool(mode & FileMode.PYTHON36)
+ return (
+ CACHE_DIR
+ / f"cache.{line_length}{'.pyi' if pyi else ''}{'.py36' if py36 else ''}.pickle"
+ )
-def read_cache(line_length: int) -> Cache:
+def read_cache(line_length: int, mode: FileMode) -> Cache:
"""Read the cache if it exists and is well formed.
If it is not well formed, the call to write_cache later should resolve the issue.
"""
- cache_file = get_cache_file(line_length)
+ cache_file = get_cache_file(line_length, mode)
if not cache_file.exists():
return {}
return todo, done
-def write_cache(cache: Cache, sources: List[Path], line_length: int) -> None:
+def write_cache(
+ cache: Cache, sources: List[Path], line_length: int, mode: FileMode
+) -> None:
"""Update the cache file."""
- cache_file = get_cache_file(line_length)
+ cache_file = get_cache_file(line_length, mode)
try:
if not CACHE_DIR.exists():
CACHE_DIR.mkdir(parents=True)