import traceback
from contextlib import contextmanager
from dataclasses import replace
-from datetime import datetime
+from datetime import datetime, timezone
from enum import Enum
from json.decoder import JSONDecodeError
from pathlib import Path
import click
from click.core import ParameterSource
from mypy_extensions import mypyc_attr
+from pathspec import PathSpec
from pathspec.patterns.gitwildmatch import GitWildMatchPatternError
from _black_version import version as __version__
-from black.cache import Cache, get_cache_info, read_cache, write_cache
+from black.cache import Cache
from black.comments import normalize_fmt_off
from black.const import (
DEFAULT_EXCLUDES,
otherwise.
"""
if not value:
- value = find_pyproject_toml(ctx.params.get("src", ()))
+ value = find_pyproject_toml(
+ ctx.params.get("src", ()), ctx.params.get("stdin_filename", None)
+ )
if value is None:
return None
"target-version", "Config key target-version must be a list"
)
+ exclude = config.get("exclude")
+ if exclude is not None and not isinstance(exclude, str):
+ raise click.BadOptionUsage("exclude", "Config key exclude must be a string")
+
+ extend_exclude = config.get("extend_exclude")
+ if extend_exclude is not None and not isinstance(extend_exclude, str):
+ raise click.BadOptionUsage(
+ "extend-exclude", "Config key extend-exclude must be a string"
+ )
+
default_map: Dict[str, Any] = {}
if ctx.default_map:
default_map.update(ctx.default_map)
callback=target_version_option_callback,
multiple=True,
help=(
- "Python versions that should be supported by Black's output. [default: per-file"
- " auto-detection]"
+ "Python versions that should be supported by Black's output. By default, Black"
+ " will try to infer this from the project metadata in pyproject.toml. If this"
+ " does not yield conclusive results, Black will use per-file auto-detection."
),
)
@click.option(
multiple=True,
help=(
"When processing Jupyter Notebooks, add the given magic to the list"
- f" of known python-magics ({', '.join(PYTHON_CELL_MAGICS)})."
+ f" of known python-magics ({', '.join(sorted(PYTHON_CELL_MAGICS))})."
" Useful for formatting cells with custom python magics."
),
default=[],
@click.option(
"--stdin-filename",
type=str,
+ is_eager=True,
help=(
"The name of the file when passing it through stdin. Useful to make "
"sure Black will respect --force-exclude option on some "
"--workers",
type=click.IntRange(min=1),
default=None,
- help="Number of parallel workers [default: number of CPUs in the system]",
+ help=(
+ "Number of parallel workers [default: BLACK_NUM_WORKERS environment variable "
+ "or number of CPUs in the system]"
+ ),
)
@click.option(
"-q",
fg="blue",
)
- normalized = [
- (source, source)
- if source == "-"
- else (normalize_path_maybe_ignore(Path(source), root), source)
- for source in src
- ]
- srcs_string = ", ".join(
- [
- f'"{_norm}"'
- if _norm
- else f'\033[31m"{source} (skipping - invalid)"\033[34m'
- for _norm, source in normalized
- ]
- )
- out(f"Sources to be formatted: {srcs_string}", fg="blue")
-
if config:
config_source = ctx.get_parameter_source("config")
user_level_config = str(find_user_pyproject_toml())
out("Using configuration from project root.", fg="blue")
else:
out(f"Using configuration in '{config}'.", fg="blue")
+ if ctx.default_map:
+ for param, value in ctx.default_map.items():
+ out(f"{param}: {value}")
error_msg = "Oh no! 💥 💔 💥"
if (
content=code, fast=fast, write_back=write_back, mode=mode, report=report
)
else:
+ assert root is not None # root is only None if code is not None
try:
sources = get_sources(
- ctx=ctx,
+ root=root,
src=src,
quiet=quiet,
verbose=verbose,
def get_sources(
*,
- ctx: click.Context,
+ root: Path,
src: Tuple[str, ...],
quiet: bool,
verbose: bool,
) -> Set[Path]:
"""Compute the set of files to be formatted."""
sources: Set[Path] = set()
- root = ctx.obj["root"]
+
+ using_default_exclude = exclude is None
+ exclude = re_compile_maybe_verbose(DEFAULT_EXCLUDES) if exclude is None else exclude
+ gitignore: Optional[Dict[Path, PathSpec]] = None
+ root_gitignore = get_gitignore(root)
for s in src:
if s == "-" and stdin_filename:
is_stdin = False
if is_stdin or p.is_file():
- normalized_path = normalize_path_maybe_ignore(p, ctx.obj["root"], report)
+ normalized_path: Optional[str] = normalize_path_maybe_ignore(
+ p, root, report
+ )
if normalized_path is None:
+ if verbose:
+ out(f'Skipping invalid source: "{normalized_path}"', fg="red")
continue
+ if verbose:
+ out(f'Found input source: "{normalized_path}"', fg="blue")
normalized_path = "/" + normalized_path
# Hard-exclude any files that matches the `--force-exclude` regex.
p = Path(f"{STDIN_PLACEHOLDER}{str(p)}")
if p.suffix == ".ipynb" and not jupyter_dependencies_are_installed(
- verbose=verbose, quiet=quiet
+ warn=verbose or not quiet
):
continue
sources.add(p)
elif p.is_dir():
- if exclude is None:
- exclude = re_compile_maybe_verbose(DEFAULT_EXCLUDES)
- gitignore = get_gitignore(root)
- p_gitignore = get_gitignore(p)
- # No need to use p's gitignore if it is identical to root's gitignore
- # (i.e. root and p point to the same directory).
- if gitignore != p_gitignore:
- gitignore += p_gitignore
- else:
- gitignore = None
+ p_relative = normalize_path_maybe_ignore(p, root, report)
+ assert p_relative is not None
+ p = root / p_relative
+ if verbose:
+ out(f'Found input source directory: "{p}"', fg="blue")
+
+ if using_default_exclude:
+ gitignore = {
+ root: root_gitignore,
+ p: get_gitignore(p),
+ }
sources.update(
gen_python_files(
p.iterdir(),
- ctx.obj["root"],
+ root,
include,
exclude,
extend_exclude,
)
)
elif s == "-":
+ if verbose:
+ out("Found input source stdin", fg="blue")
sources.add(p)
else:
err(f"invalid path: {s}")
+
return sources
if format_stdin_to_stdout(fast=fast, write_back=write_back, mode=mode):
changed = Changed.YES
else:
- cache: Cache = {}
+ cache = Cache.read(mode)
if write_back not in (WriteBack.DIFF, WriteBack.COLOR_DIFF):
- cache = read_cache(mode)
- res_src = src.resolve()
- res_src_s = str(res_src)
- if res_src_s in cache and cache[res_src_s] == get_cache_info(res_src):
+ if not cache.is_changed(src):
changed = Changed.CACHED
if changed is not Changed.CACHED and format_file_in_place(
src, fast=fast, write_back=write_back, mode=mode
if (write_back is WriteBack.YES and changed is not Changed.CACHED) or (
write_back is WriteBack.CHECK and changed is Changed.NO
):
- write_cache(cache, [src], mode)
+ cache.write([src])
report.done(src, changed)
except Exception as exc:
if report.verbose:
elif src.suffix == ".ipynb":
mode = replace(mode, is_ipynb=True)
- then = datetime.utcfromtimestamp(src.stat().st_mtime)
+ then = datetime.fromtimestamp(src.stat().st_mtime, timezone.utc)
header = b""
with open(src, "rb") as buf:
if mode.skip_source_first_line:
with open(src, "w", encoding=encoding, newline=newline) as f:
f.write(dst_contents)
elif write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF):
- now = datetime.utcnow()
- src_name = f"{src}\t{then} +0000"
- dst_name = f"{src}\t{now} +0000"
+ now = datetime.now(timezone.utc)
+ src_name = f"{src}\t{then}"
+ dst_name = f"{src}\t{now}"
if mode.is_ipynb:
diff_contents = ipynb_diff(src_contents, dst_contents, src_name, dst_name)
else:
write a diff to stdout. The `mode` argument is passed to
:func:`format_file_contents`.
"""
- then = datetime.utcnow()
+ then = datetime.now(timezone.utc)
if content is None:
src, encoding, newline = decode_bytes(sys.stdin.buffer.read())
dst += "\n"
f.write(dst)
elif write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF):
- now = datetime.utcnow()
- src_name = f"STDIN\t{then} +0000"
- dst_name = f"STDOUT\t{now} +0000"
+ now = datetime.now(timezone.utc)
+ src_name = f"STDIN\t{then}"
+ dst_name = f"STDOUT\t{now}"
d = diff(src, dst, src_name, dst_name)
if write_back == WriteBack.COLOR_DIFF:
d = color_diff(d)
valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it.
`mode` is passed to :func:`format_str`.
"""
- if not src_contents.strip():
- raise NothingChanged
-
if mode.is_ipynb:
dst_contents = format_ipynb_string(src_contents, fast=fast, mode=mode)
else:
Operate cell-by-cell, only on code cells, only for Python notebooks.
If the ``.ipynb`` originally had a trailing newline, it'll be preserved.
"""
+ if not src_contents:
+ raise NothingChanged
+
trailing_newline = src_contents[-1] == "\n"
modified = False
nb = json.loads(src_contents)
future_imports = get_future_imports(src_node)
versions = detect_target_versions(src_node, future_imports=future_imports)
- normalize_fmt_off(src_node, preview=mode.preview)
- lines = LineGenerator(mode=mode)
+ context_manager_features = {
+ feature
+ for feature in {Feature.PARENTHESIZED_CONTEXT_MANAGERS}
+ if supports_feature(versions, feature)
+ }
+ normalize_fmt_off(src_node)
+ lines = LineGenerator(mode=mode, features=context_manager_features)
elt = EmptyLineTracker(mode=mode)
split_line_features = {
feature
dst_contents = []
for block in dst_blocks:
dst_contents.extend(block.all_lines())
+ if not dst_contents:
+ # Use decode_bytes to retrieve the correct source newline (CRLF or LF),
+ # and check if normalized_content has more than one line
+ normalized_content, _, newline = decode_bytes(src_contents.encode("utf-8"))
+ if "\n" in normalized_content:
+ return newline
+ return ""
return "".join(dst_contents)
- relaxed decorator syntax;
- usage of __future__ flags (annotations);
- print / exec statements;
+ - parenthesized context managers;
+ - match statements;
+ - except* clause;
+ - variadic generics;
"""
features: Set[Feature] = set()
if future_imports:
):
features.add(Feature.ANN_ASSIGN_EXTENDED_RHS)
+ elif (
+ n.type == syms.with_stmt
+ and len(n.children) > 2
+ and n.children[1].type == syms.atom
+ ):
+ atom_children = n.children[1].children
+ if (
+ len(atom_children) == 3
+ and atom_children[0].type == token.LPAR
+ and atom_children[1].type == syms.testlist_gexp
+ and atom_children[2].type == token.RPAR
+ ):
+ features.add(Feature.PARENTHESIZED_CONTEXT_MANAGERS)
+
+ elif n.type == syms.match_stmt:
+ features.add(Feature.PATTERN_MATCHING)
+
elif (
n.type == syms.except_clause
and len(n.children) >= 2
):
features.add(Feature.VARIADIC_GENERICS)
+ elif n.type in (syms.type_stmt, syms.typeparams):
+ features.add(Feature.TYPE_PARAMS)
+
return features
yield
-def patch_click() -> None:
- """Make Click not crash on Python 3.6 with LANG=C.
-
- On certain misconfigured environments, Python 3 selects the ASCII encoding as the
- default which restricts paths that it can access during the lifetime of the
- application. Click refuses to work in this scenario by raising a RuntimeError.
-
- In case of Black the likelihood that non-ASCII characters are going to be used in
- file paths is minimal since it's Python source code. Moreover, this crash was
- spurious on Python 3.7 thanks to PEP 538 and PEP 540.
- """
- modules: List[Any] = []
- try:
- from click import core
- except ImportError:
- pass
- else:
- modules.append(core)
- try:
- # Removed in Click 8.1.0 and newer; we keep this around for users who have
- # older versions installed.
- from click import _unicodefun # type: ignore
- except ImportError:
- pass
- else:
- modules.append(_unicodefun)
-
- for module in modules:
- if hasattr(module, "_verify_python3_env"):
- module._verify_python3_env = lambda: None
- if hasattr(module, "_verify_python_env"):
- module._verify_python_env = lambda: None
-
-
def patched_main() -> None:
# PyInstaller patches multiprocessing to need freeze_support() even in non-Windows
# environments so just assume we always need to call it if frozen.
freeze_support()
- patch_click()
main()