from multiprocessing import Manager, freeze_support
import os
from pathlib import Path
+from pathspec.patterns.gitwildmatch import GitWildMatchPatternError
import regex as re
import signal
import sys
# Legacy name, left for integrations.
FileMode = Mode
+DEFAULT_WORKERS = os.cpu_count()
+
def read_pyproject_toml(
ctx: click.Context, param: click.Parameter, value: Optional[str]
except (OSError, ValueError) as e:
raise click.FileError(
filename=value, hint=f"Error reading configuration file: {e}"
- )
+ ) from None
if not config:
return None
try:
return re_compile_maybe_verbose(value) if value is not None else None
except re.error:
- raise click.BadParameter("Not a valid regular expression")
+ raise click.BadParameter("Not a valid regular expression") from None
@click.command(context_settings=dict(help_option_names=["-h", "--help"]))
"editors that rely on using stdin."
),
)
+@click.option(
+ "-W",
+ "--workers",
+ type=click.IntRange(min=1),
+ default=DEFAULT_WORKERS,
+ show_default=True,
+ help="Number of parallel workers",
+)
@click.option(
"-q",
"--quiet",
extend_exclude: Optional[Pattern],
force_exclude: Optional[Pattern],
stdin_filename: Optional[str],
+ workers: int,
src: Tuple[str, ...],
config: Optional[str],
) -> None:
content=code, fast=fast, write_back=write_back, mode=mode, report=report
)
else:
- sources = get_sources(
- ctx=ctx,
- src=src,
- quiet=quiet,
- verbose=verbose,
- include=include,
- exclude=exclude,
- extend_exclude=extend_exclude,
- force_exclude=force_exclude,
- report=report,
- stdin_filename=stdin_filename,
- )
+ try:
+ sources = get_sources(
+ ctx=ctx,
+ src=src,
+ quiet=quiet,
+ verbose=verbose,
+ include=include,
+ exclude=exclude,
+ extend_exclude=extend_exclude,
+ force_exclude=force_exclude,
+ report=report,
+ stdin_filename=stdin_filename,
+ )
+ except GitWildMatchPatternError:
+ ctx.exit(1)
path_empty(
sources,
write_back=write_back,
mode=mode,
report=report,
+ workers=workers,
)
if verbose or not quiet:
def reformat_many(
- sources: Set[Path], fast: bool, write_back: WriteBack, mode: Mode, report: "Report"
+ sources: Set[Path],
+ fast: bool,
+ write_back: WriteBack,
+ mode: Mode,
+ report: "Report",
+ workers: Optional[int],
) -> None:
"""Reformat multiple files using a ProcessPoolExecutor."""
executor: Executor
loop = asyncio.get_event_loop()
- worker_count = os.cpu_count()
+ worker_count = workers if workers is not None else DEFAULT_WORKERS
if sys.platform == "win32":
# Work around https://bugs.python.org/issue26903
worker_count = min(worker_count, 60)
except NothingChanged:
return False
except JSONDecodeError:
- raise ValueError(f"File '{src}' cannot be parsed as valid Jupyter notebook.")
+ raise ValueError(
+ f"File '{src}' cannot be parsed as valid Jupyter notebook."
+ ) from None
if write_back == WriteBack.YES:
with open(src, "w", encoding=encoding, newline=newline) as f:
try:
masked_src, replacements = mask_cell(src_without_trailing_semicolon)
except SyntaxError:
- raise NothingChanged
+ raise NothingChanged from None
masked_dst = format_str(masked_src, mode=mode)
if not fast:
check_stability_and_equivalence(masked_src, masked_dst, mode=mode)
)
dst = dst.rstrip("\n")
if dst == src:
- raise NothingChanged
+ raise NothingChanged from None
return dst
"""
language = nb.get("metadata", {}).get("language_info", {}).get("name", None)
if language is not None and language != "python":
- raise NothingChanged
+ raise NothingChanged from None
def format_ipynb_string(src_contents: str, *, fast: bool, mode: Mode) -> FileContent:
"""Format Jupyter notebook.
Operate cell-by-cell, only on code cells, only for Python notebooks.
- If the ``.ipynb`` originally had a trailing newline, it'll be preseved.
+ If the ``.ipynb`` originally had a trailing newline, it'll be preserved.
"""
trailing_newline = src_contents[-1] == "\n"
modified = False
src_ast = parse_ast(src)
except Exception as exc:
raise AssertionError(
- "cannot use --safe with this file; failed to parse source file. AST"
- f" error message: {exc}"
- )
+ "cannot use --safe with this file; failed to parse source file."
+ ) from exc
try:
dst_ast = parse_ast(dst)