from dataclasses import dataclass, field, replace
import click
import toml
-from typed_ast import ast3, ast27
+
+try:
+ from typed_ast import ast3, ast27
+except ImportError:
+ if sys.version_info < (3, 8):
+ print(
+ "The typed_ast package is not installed.\n"
+ "You can install it with `python3 -m pip install typed-ast`.",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+ else:
+ ast3 = ast27 = ast
+
from pathspec import PathSpec
# lib2to3 fork
import colorama # noqa: F401
DEFAULT_LINE_LENGTH = 88
-DEFAULT_EXCLUDES = r"/(\.direnv|\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|\.svn|_build|buck-out|build|dist)/" # noqa: B950
+DEFAULT_EXCLUDES = r"/(\.direnv|\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|venv|\.svn|_build|buck-out|build|dist)/" # noqa: B950
DEFAULT_INCLUDES = r"\.pyi?$"
CACHE_DIR = Path(user_cache_dir("black", version=__version__))
STDIN_PLACEHOLDER = "__BLACK_STDIN_FILENAME__"
target_versions: Set[TargetVersion] = field(default_factory=set)
line_length: int = DEFAULT_LINE_LENGTH
string_normalization: bool = True
+ is_pyi: bool = False
magic_trailing_comma: bool = True
experimental_string_processing: bool = False
- is_pyi: bool = False
def get_cache_key(self) -> str:
if self.target_versions:
str(self.line_length),
str(int(self.string_normalization)),
str(int(self.is_pyi)),
+ str(int(self.magic_trailing_comma)),
+ str(int(self.experimental_string_processing)),
]
return ".".join(parts)
return all(feature in VERSION_TO_FEATURES[version] for version in target_versions)
-def find_pyproject_toml(path_search_start: Iterable[str]) -> Optional[str]:
+def find_pyproject_toml(path_search_start: Tuple[str, ...]) -> Optional[str]:
"""Find the absolute filepath to a pyproject.toml if it exists"""
path_project_root = find_project_root(path_search_start)
path_pyproject_toml = path_project_root / "pyproject.toml"
- return str(path_pyproject_toml) if path_pyproject_toml.is_file() else None
+ if path_pyproject_toml.is_file():
+ return str(path_pyproject_toml)
+
+ try:
+ path_user_pyproject_toml = find_user_pyproject_toml()
+ return (
+ str(path_user_pyproject_toml)
+ if path_user_pyproject_toml.is_file()
+ else None
+ )
+ except PermissionError as e:
+ # We do not have access to the user-level config directory, so ignore it.
+ err(f"Ignoring user configuration directory due to {e!r}")
+ return None
def parse_pyproject_toml(path_config: str) -> Dict[str, Any]:
return [TargetVersion[val.upper()] for val in v]
+def validate_regex(
+ ctx: click.Context,
+ param: click.Parameter,
+ value: Optional[str],
+) -> Optional[Pattern]:
+ try:
+ return re_compile_maybe_verbose(value) if value is not None else None
+ except re.error:
+ raise click.BadParameter("Not a valid regular expression")
+
+
@click.command(context_settings=dict(help_option_names=["-h", "--help"]))
@click.option("-c", "--code", type=str, help="Format the code passed in as a string.")
@click.option(
"--check",
is_flag=True,
help=(
- "Don't write the files back, just return the status. Return code 0 means"
- " nothing would change. Return code 1 means some files would be reformatted."
+ "Don't write the files back, just return the status. Return code 0 means"
+ " nothing would change. Return code 1 means some files would be reformatted."
" Return code 123 means there was an internal error."
),
)
"--include",
type=str,
default=DEFAULT_INCLUDES,
+ callback=validate_regex,
help=(
"A regular expression that matches files and directories that should be"
- " included on recursive searches. An empty value means all files are included"
- " regardless of the name. Use forward slashes for directories on all platforms"
- " (Windows, too). Exclusions are calculated first, inclusions later."
+ " included on recursive searches. An empty value means all files are included"
+ " regardless of the name. Use forward slashes for directories on all platforms"
+ " (Windows, too). Exclusions are calculated first, inclusions later."
),
show_default=True,
)
"--exclude",
type=str,
default=DEFAULT_EXCLUDES,
+ callback=validate_regex,
help=(
"A regular expression that matches files and directories that should be"
- " excluded on recursive searches. An empty value means no paths are excluded."
- " Use forward slashes for directories on all platforms (Windows, too). "
+ " excluded on recursive searches. An empty value means no paths are excluded."
+ " Use forward slashes for directories on all platforms (Windows, too)."
" Exclusions are calculated first, inclusions later."
),
show_default=True,
)
+@click.option(
+ "--extend-exclude",
+ type=str,
+ callback=validate_regex,
+ help=(
+ "Like --exclude, but adds additional files and directories on top of the"
+ " excluded ones. (Useful if you simply want to add to the default)"
+ ),
+)
@click.option(
"--force-exclude",
type=str,
+ callback=validate_regex,
help=(
"Like --exclude, but files and directories matching this regex will be "
"excluded even when they are passed explicitly as arguments."
is_flag=True,
help=(
"Also emit messages to stderr about files that were not changed or were ignored"
- " due to --exclude=."
+ " due to exclusion patterns."
),
)
@click.version_option(version=__version__)
experimental_string_processing: bool,
quiet: bool,
verbose: bool,
- include: str,
- exclude: str,
- force_exclude: Optional[str],
+ include: Pattern,
+ exclude: Pattern,
+ extend_exclude: Optional[Pattern],
+ force_exclude: Optional[Pattern],
stdin_filename: Optional[str],
src: Tuple[str, ...],
config: Optional[str],
verbose=verbose,
include=include,
exclude=exclude,
+ extend_exclude=extend_exclude,
force_exclude=force_exclude,
report=report,
stdin_filename=stdin_filename,
src: Tuple[str, ...],
quiet: bool,
verbose: bool,
- include: str,
- exclude: str,
- force_exclude: Optional[str],
+ include: Pattern[str],
+ exclude: Pattern[str],
+ extend_exclude: Optional[Pattern[str]],
+ force_exclude: Optional[Pattern[str]],
report: "Report",
stdin_filename: Optional[str],
) -> Set[Path]:
"""Compute the set of files to be formatted."""
- try:
- include_regex = re_compile_maybe_verbose(include)
- except re.error:
- err(f"Invalid regular expression for include given: {include!r}")
- ctx.exit(2)
- try:
- exclude_regex = re_compile_maybe_verbose(exclude)
- except re.error:
- err(f"Invalid regular expression for exclude given: {exclude!r}")
- ctx.exit(2)
- try:
- force_exclude_regex = (
- re_compile_maybe_verbose(force_exclude) if force_exclude else None
- )
- except re.error:
- err(f"Invalid regular expression for force_exclude given: {force_exclude!r}")
- ctx.exit(2)
root = find_project_root(src)
sources: Set[Path] = set()
normalized_path = "/" + normalized_path
# Hard-exclude any files that matches the `--force-exclude` regex.
- if force_exclude_regex:
- force_exclude_match = force_exclude_regex.search(normalized_path)
+ if force_exclude:
+ force_exclude_match = force_exclude.search(normalized_path)
else:
force_exclude_match = None
if force_exclude_match and force_exclude_match.group(0):
gen_python_files(
p.iterdir(),
root,
- include_regex,
- exclude_regex,
- force_exclude_regex,
+ include,
+ exclude,
+ extend_exclude,
+ force_exclude,
report,
gitignore,
)
is_stdin = False
if is_stdin:
+ if src.suffix == ".pyi":
+ mode = replace(mode, is_pyi=True)
if format_stdin_to_stdout(fast=fast, write_back=write_back, mode=mode):
changed = Changed.YES
else:
except (ImportError, OSError):
# we arrive here if the underlying system does not support multi-processing
# like in AWS Lambda or Termux, in which case we gracefully fallback to
- # a ThreadPollExecutor with just a single worker (more workers would not do us
+ # a ThreadPoolExecutor with just a single worker (more workers would not do us
# any good due to the Global Interpreter Lock)
executor = ThreadPoolExecutor(max_workers=1)
): src
for src in sorted(sources)
}
- pending: Iterable["asyncio.Future[bool]"] = tasks.keys()
+ pending = tasks.keys()
try:
loop.add_signal_handler(signal.SIGINT, cancel, pending)
loop.add_signal_handler(signal.SIGTERM, cancel, pending)
dst_name = f"{src}\t{now} +0000"
diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
- if write_back == write_back.COLOR_DIFF:
+ if write_back == WriteBack.COLOR_DIFF:
diff_contents = color_diff(diff_contents)
with lock or nullcontext():
if not fast:
assert_equivalent(src_contents, dst_contents)
- assert_stable(src_contents, dst_contents, mode=mode)
+
+ # Forced second pass to work around optional trailing commas (becoming
+ # forced trailing commas on pass 2) interacting differently with optional
+ # parentheses. Admittedly ugly.
+ dst_contents_pass2 = format_str(dst_contents, mode=mode)
+ if dst_contents != dst_contents_pass2:
+ dst_contents = dst_contents_pass2
+ assert_equivalent(src_contents, dst_contents, pass_num=2)
+ assert_stable(src_contents, dst_contents, mode=mode)
+ # Note: no need to explicitly call `assert_stable` if `dst_contents` was
+ # the same as `dst_contents_pass2`.
return dst_contents
comments: Dict[LeafID, List[Leaf]] = field(default_factory=dict)
bracket_tracker: BracketTracker = field(default_factory=BracketTracker)
inside_brackets: bool = False
- should_split: bool = False
+ should_split_rhs: bool = False
magic_trailing_comma: Optional[Leaf] = None
def append(self, leaf: Leaf, preformatted: bool = False) -> None:
mode=self.mode,
depth=self.depth,
inside_brackets=self.inside_brackets,
- should_split=self.should_split,
+ should_split_rhs=self.should_split_rhs,
magic_trailing_comma=self.magic_trailing_comma,
)
def visit_simple_stmt(self, node: Node) -> Iterator[Line]:
"""Visit a statement without nested statements."""
+ if first_child_is_arith(node):
+ wrap_in_parentheses(node, node.children[0], visible=False)
is_suite_like = node.parent and node.parent.type in STATEMENT
if is_suite_like:
if self.mode.is_pyi and is_stub_body(node):
# We're ignoring docstrings with backslash newline escapes because changing
# indentation of those changes the AST representation of the code.
prefix = get_string_prefix(leaf.value)
- lead_len = len(prefix) + 3
- tail_len = -3
- indent = " " * 4 * self.current_line.depth
- docstring = fix_docstring(leaf.value[lead_len:tail_len], indent)
+ docstring = leaf.value[len(prefix) :] # Remove the prefix
+ quote_char = docstring[0]
+ # A natural way to remove the outer quotes is to do:
+ # docstring = docstring.strip(quote_char)
+ # but that breaks on """""x""" (which is '""x').
+ # So we actually need to remove the first character and the next two
+ # characters but only if they are the same as the first.
+ quote_len = 1 if docstring[1] != quote_char else 3
+ docstring = docstring[quote_len:-quote_len]
+
+ if is_multiline_string(leaf):
+ indent = " " * 4 * self.current_line.depth
+ docstring = fix_docstring(docstring, indent)
+ else:
+ docstring = docstring.strip()
+
if docstring:
- if leaf.value[lead_len - 1] == docstring[0]:
+ # Add some padding if the docstring starts / ends with a quote mark.
+ if docstring[0] == quote_char:
docstring = " " + docstring
- if leaf.value[tail_len + 1] == docstring[-1]:
- docstring = docstring + " "
- leaf.value = leaf.value[0:lead_len] + docstring + leaf.value[tail_len:]
+ if docstring[-1] == quote_char:
+ docstring += " "
+ if docstring[-1] == "\\":
+ backslash_count = len(docstring) - len(docstring.rstrip("\\"))
+ if backslash_count % 2:
+ # Odd number of tailing backslashes, add some padding to
+ # avoid escaping the closing string quote.
+ docstring += " "
+ else:
+ # Add some padding if the docstring is empty.
+ docstring = " "
+
+ # We could enforce triple quotes at this point.
+ quote = quote_char * quote_len
+ leaf.value = prefix + quote + docstring + quote
yield from self.visit_default(leaf)
if content[0] == "#":
content = content[1:]
+ NON_BREAKING_SPACE = " "
+ if (
+ content
+ and content[0] == NON_BREAKING_SPACE
+ and not content.lstrip().startswith("type:")
+ ):
+ content = " " + content[1:] # Replace NBSP by a simple space
if content and content[0] not in " !:#'%":
content = " " + content
return "#" + content
transformers: List[Transformer]
if (
not line.contains_uncollapsable_type_comments()
- and not line.should_split
+ and not line.should_split_rhs
and not line.magic_trailing_comma
and (
is_line_short_enough(line, line_length=mode.line_length, line_str=line_str)
mode=line.mode,
depth=line.depth + 1,
inside_brackets=True,
- should_split=line.should_split,
+ should_split_rhs=line.should_split_rhs,
magic_trailing_comma=line.magic_trailing_comma,
)
string_leaf = Leaf(token.STRING, string_value)
for comment_after in original.comments_after(leaf):
result.append(comment_after, preformatted=True)
if is_body and should_split_line(result, opening_bracket):
- result.should_split = True
+ result.should_split_rhs = True
return result
def format_hex(text: str) -> str:
"""
- Formats a hexadecimal string like "0x12b3"
-
- Uses lowercase because of similarity between "B" and "8", which
- can cause security issues.
- see: https://github.com/psf/black/issues/1692
+ Formats a hexadecimal string like "0x12B3"
"""
-
before, after = text[:2], text[2:]
- return f"{before}{after.lower()}"
+ return f"{before}{after.upper()}"
def format_scientific_notation(text: str) -> str:
check_lpar = True
if check_lpar:
- if is_walrus_assignment(child):
- pass
-
- elif child.type == syms.atom:
+ if child.type == syms.atom:
if maybe_make_parens_invisible_in_atom(child, parent=node):
wrap_in_parentheses(node, child, visible=False)
elif is_one_tuple(child):
Returns whether the node should itself be wrapped in invisible parentheses.
"""
+
if (
node.type != syms.atom
or is_empty_tuple(node)
):
return False
+ if is_walrus_assignment(node):
+ if parent.type in [
+ syms.annassign,
+ syms.expr_stmt,
+ syms.assert_stmt,
+ syms.return_stmt,
+ # these ones aren't useful to end users, but they do please fuzzers
+ syms.for_stmt,
+ syms.del_stmt,
+ ]:
+ return False
+
first = node.children[0]
last = node.children[-1]
if first.type == token.LPAR and last.type == token.RPAR:
return wrapped
+def first_child_is_arith(node: Node) -> bool:
+ """Whether first child is an arithmetic or a binary arithmetic expression"""
+ expr_types = {
+ syms.arith_expr,
+ syms.shift_expr,
+ syms.xor_expr,
+ syms.and_expr,
+ }
+ return bool(node.children and node.children[0].type in expr_types)
+
+
def wrap_in_parentheses(parent: Node, child: LN, *, visible: bool = True) -> None:
"""Wrap `child` in parentheses.
and node.children[0].type == token.DOT
and node.children[1].type == token.NAME
)
+ # last trailer can be an argument-less parentheses pair
+ or (
+ last
+ and len(node.children) == 2
+ and node.children[0].type == token.LPAR
+ and node.children[1].type == token.RPAR
+ )
# last trailer can be arguments
or (
last
@lru_cache()
def get_gitignore(root: Path) -> PathSpec:
- """ Return a PathSpec matching gitignore content if present."""
+ """Return a PathSpec matching gitignore content if present."""
gitignore = root / ".gitignore"
lines: List[str] = []
if gitignore.is_file():
return normalized_path
+def path_is_excluded(
+ normalized_path: str,
+ pattern: Optional[Pattern[str]],
+) -> bool:
+ match = pattern.search(normalized_path) if pattern else None
+ return bool(match and match.group(0))
+
+
def gen_python_files(
paths: Iterable[Path],
root: Path,
include: Optional[Pattern[str]],
exclude: Pattern[str],
+ extend_exclude: Optional[Pattern[str]],
force_exclude: Optional[Pattern[str]],
report: "Report",
gitignore: PathSpec,
) -> Iterator[Path]:
"""Generate all files under `path` whose paths are not excluded by the
- `exclude_regex` or `force_exclude` regexes, but are included by the `include` regex.
+ `exclude_regex`, `extend_exclude`, or `force_exclude` regexes,
+ but are included by the `include` regex.
Symbolic links pointing outside of the `root` directory are ignored.
report.path_ignored(child, "matches the .gitignore file content")
continue
- # Then ignore with `--exclude` and `--force-exclude` options.
+ # Then ignore with `--exclude` `--extend-exclude` and `--force-exclude` options.
normalized_path = "/" + normalized_path
if child.is_dir():
normalized_path += "/"
- exclude_match = exclude.search(normalized_path) if exclude else None
- if exclude_match and exclude_match.group(0):
+ if path_is_excluded(normalized_path, exclude):
report.path_ignored(child, "matches the --exclude regular expression")
continue
- force_exclude_match = (
- force_exclude.search(normalized_path) if force_exclude else None
- )
- if force_exclude_match and force_exclude_match.group(0):
+ if path_is_excluded(normalized_path, extend_exclude):
+ report.path_ignored(
+ child, "matches the --extend-exclude regular expression"
+ )
+ continue
+
+ if path_is_excluded(normalized_path, force_exclude):
report.path_ignored(child, "matches the --force-exclude regular expression")
continue
root,
include,
exclude,
+ extend_exclude,
force_exclude,
report,
gitignore,
@lru_cache()
-def find_project_root(srcs: Iterable[str]) -> Path:
+def find_project_root(srcs: Tuple[str, ...]) -> Path:
"""Return a directory containing .git, .hg, or pyproject.toml.
That directory will be a common parent of all files and directories
return directory
+@lru_cache()
+def find_user_pyproject_toml() -> Path:
+ r"""Return the path to the top-level user configuration for black.
+
+ This looks for ~\.black on Windows and ~/.config/black on Linux and other
+ Unix systems.
+ """
+ if sys.platform == "win32":
+ # Windows
+ user_config_path = Path.home() / ".black"
+ else:
+ config_root = os.environ.get("XDG_CONFIG_HOME", "~/.config")
+ user_config_path = Path(config_root).expanduser() / "black"
+ return user_config_path.resolve()
+
+
@dataclass
class Report:
"""Provides a reformatting counter. Can be rendered with `str(report)`."""
return ast3.parse(src, filename, feature_version=feature_version)
except SyntaxError:
continue
-
+ if ast27.__name__ == "ast":
+ raise SyntaxError(
+ "The requested source code has invalid Python 3 syntax.\n"
+ "If you are trying to format Python 2 files please reinstall Black"
+ " with the 'python2' extra: `python3 -m pip install black[python2]`."
+ )
return ast27.parse(src)
# Constant strings may be indented across newlines, if they are
# docstrings; fold spaces after newlines when comparing. Similarly,
# trailing and leading space may be removed.
+ # Note that when formatting Python 2 code, at least with Windows
+ # line-endings, docstrings can end up here as bytes instead of
+ # str so make sure that we handle both cases.
if (
isinstance(node, ast.Constant)
and field == "value"
- and isinstance(value, str)
+ and isinstance(value, (str, bytes))
):
- normalized = re.sub(r" *\n[ \t]*", "\n", value).strip()
+ lineend = "\n" if isinstance(value, str) else b"\n"
+ # To normalize, we strip any leading and trailing space from
+ # each line...
+ stripped = [line.strip() for line in value.splitlines()]
+ normalized = lineend.join(stripped) # type: ignore[attr-defined]
+ # ...and remove any blank lines at the beginning and end of
+ # the whole string
+ normalized = normalized.strip()
else:
normalized = value
yield f"{' ' * (depth+2)}{normalized!r}, # {value.__class__.__name__}"
yield f"{' ' * depth}) # /{node.__class__.__name__}"
-def assert_equivalent(src: str, dst: str) -> None:
+def assert_equivalent(src: str, dst: str, *, pass_num: int = 1) -> None:
"""Raise AssertionError if `src` and `dst` aren't equivalent."""
try:
src_ast = parse_ast(src)
except Exception as exc:
log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst)
raise AssertionError(
- f"INTERNAL ERROR: Black produced invalid code: {exc}. Please report a bug"
- " on https://github.com/psf/black/issues. This invalid output might be"
- f" helpful: {log}"
+ f"INTERNAL ERROR: Black produced invalid code on pass {pass_num}: {exc}. "
+ "Please report a bug on https://github.com/psf/black/issues. "
+ f"This invalid output might be helpful: {log}"
) from None
src_ast_str = "\n".join(_stringify_ast(src_ast))
log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst"))
raise AssertionError(
"INTERNAL ERROR: Black produced code that is not equivalent to the"
- " source. Please report a bug on https://github.com/psf/black/issues. "
- f" This diff might be helpful: {log}"
+ f" source on pass {pass_num}. Please report a bug on "
+ f"https://github.com/psf/black/issues. This diff might be helpful: {log}"
) from None
@mypyc_attr(patchable=True)
-def dump_to_file(*output: str) -> str:
+def dump_to_file(*output: str, ensure_final_newline: bool = True) -> str:
"""Dump `output` to a temporary file. Return path to the file."""
with tempfile.NamedTemporaryFile(
mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
) as f:
for lines in output:
f.write(lines)
- if lines and lines[-1] != "\n":
+ if ensure_final_newline and lines and lines[-1] != "\n":
f.write("\n")
return f.name
"""Return a unified diff string between strings `a` and `b`."""
import difflib
- a_lines = [line + "\n" for line in a.splitlines()]
- b_lines = [line + "\n" for line in b.splitlines()]
- return "".join(
- difflib.unified_diff(a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5)
- )
+ a_lines = [line for line in a.splitlines(keepends=True)]
+ b_lines = [line for line in b.splitlines(keepends=True)]
+ diff_lines = []
+ for line in difflib.unified_diff(
+ a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5
+ ):
+ # Work around https://bugs.python.org/issue2142
+ # See https://www.gnu.org/software/diffutils/manual/html_node/Incomplete-Lines.html
+ if line[-1] == "\n":
+ diff_lines.append(line)
+ else:
+ diff_lines.append(line + "\n")
+ diff_lines.append("\\ No newline at end of file\n")
+ return "".join(diff_lines)
def cancel(tasks: Iterable["asyncio.Task[Any]"]) -> None:
def is_docstring(leaf: Leaf) -> bool:
- if not is_multiline_string(leaf):
- # For the purposes of docstring re-indentation, we don't need to do anything
- # with single-line docstrings.
- return False
-
if prev_siblings_are(
leaf.parent, [None, token.NEWLINE, token.INDENT, syms.simple_stmt]
):