]>
git.madduck.net Git - etc/vim.git/blobdiff - src/black/__init__.py
madduck's git repository
Every one of the projects in this repository is available at the canonical
URL git://git.madduck.net/madduck/pub/<projectpath> — see
each project's metadata for the exact URL.
All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@ git. madduck. net .
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
SSH access, as well as push access can be individually
arranged .
If you use my repositories frequently, consider adding the following
snippet to ~/.gitconfig and using the third clone URL listed for each
project:
[url "git://git.madduck.net/madduck/"]
insteadOf = madduck:
from multiprocessing import Manager, freeze_support
import os
from pathlib import Path
from multiprocessing import Manager, freeze_support
import os
from pathlib import Path
+from pathspec.patterns.gitwildmatch import GitWildMatchPatternError
import regex as re
import signal
import sys
import regex as re
import signal
import sys
# Legacy name, left for integrations.
FileMode = Mode
# Legacy name, left for integrations.
FileMode = Mode
+DEFAULT_WORKERS = os.cpu_count()
+
def read_pyproject_toml(
ctx: click.Context, param: click.Parameter, value: Optional[str]
def read_pyproject_toml(
ctx: click.Context, param: click.Parameter, value: Optional[str]
except (OSError, ValueError) as e:
raise click.FileError(
filename=value, hint=f"Error reading configuration file: {e}"
except (OSError, ValueError) as e:
raise click.FileError(
filename=value, hint=f"Error reading configuration file: {e}"
if not config:
return None
if not config:
return None
ctx: click.Context,
param: click.Parameter,
value: Optional[str],
ctx: click.Context,
param: click.Parameter,
value: Optional[str],
+) -> Optional[Pattern[str] ]:
try:
return re_compile_maybe_verbose(value) if value is not None else None
except re.error:
try:
return re_compile_maybe_verbose(value) if value is not None else None
except re.error:
- raise click.BadParameter("Not a valid regular expression")
+ raise click.BadParameter("Not a valid regular expression") from None
@click.command(context_settings=dict(help_option_names=["-h", "--help"]))
@click.command(context_settings=dict(help_option_names=["-h", "--help"]))
"editors that rely on using stdin."
),
)
"editors that rely on using stdin."
),
)
+@click.option(
+ "-W",
+ "--workers",
+ type=click.IntRange(min=1),
+ default=DEFAULT_WORKERS,
+ show_default=True,
+ help="Number of parallel workers",
+)
@click.option(
"-q",
"--quiet",
@click.option(
"-q",
"--quiet",
quiet: bool,
verbose: bool,
required_version: str,
quiet: bool,
verbose: bool,
required_version: str,
- include: Pattern,
- exclude: Optional[Pattern],
- extend_exclude: Optional[Pattern],
- force_exclude: Optional[Pattern],
+ include: Pattern[str] ,
+ exclude: Optional[Pattern[str] ],
+ extend_exclude: Optional[Pattern[str] ],
+ force_exclude: Optional[Pattern[str] ],
stdin_filename: Optional[str],
stdin_filename: Optional[str],
src: Tuple[str, ...],
config: Optional[str],
) -> None:
src: Tuple[str, ...],
config: Optional[str],
) -> None:
content=code, fast=fast, write_back=write_back, mode=mode, report=report
)
else:
content=code, fast=fast, write_back=write_back, mode=mode, report=report
)
else:
- sources = get_sources(
- ctx=ctx,
- src=src,
- quiet=quiet,
- verbose=verbose,
- include=include,
- exclude=exclude,
- extend_exclude=extend_exclude,
- force_exclude=force_exclude,
- report=report,
- stdin_filename=stdin_filename,
- )
+ try:
+ sources = get_sources(
+ ctx=ctx,
+ src=src,
+ quiet=quiet,
+ verbose=verbose,
+ include=include,
+ exclude=exclude,
+ extend_exclude=extend_exclude,
+ force_exclude=force_exclude,
+ report=report,
+ stdin_filename=stdin_filename,
+ )
+ except GitWildMatchPatternError:
+ ctx.exit(1)
write_back=write_back,
mode=mode,
report=report,
write_back=write_back,
mode=mode,
report=report,
)
if verbose or not quiet:
)
if verbose or not quiet:
- sources: Set[Path], fast: bool, write_back: WriteBack, mode: Mode, report: "Report"
+ sources: Set[Path],
+ fast: bool,
+ write_back: WriteBack,
+ mode: Mode,
+ report: "Report",
+ workers: Optional[int],
) -> None:
"""Reformat multiple files using a ProcessPoolExecutor."""
executor: Executor
loop = asyncio.get_event_loop()
) -> None:
"""Reformat multiple files using a ProcessPoolExecutor."""
executor: Executor
loop = asyncio.get_event_loop()
- worker_count = os.cpu_count()
+ worker_count = workers if workers is not None else DEFAULT_WORKERS
if sys.platform == "win32":
# Work around https://bugs.python.org/issue26903
worker_count = min(worker_count, 60)
if sys.platform == "win32":
# Work around https://bugs.python.org/issue26903
worker_count = min(worker_count, 60)
except NothingChanged:
return False
except JSONDecodeError:
except NothingChanged:
return False
except JSONDecodeError:
- raise ValueError(f"File '{src}' cannot be parsed as valid Jupyter notebook.")
+ raise ValueError(
+ f"File '{src}' cannot be parsed as valid Jupyter notebook."
+ ) from None
if write_back == WriteBack.YES:
with open(src, "w", encoding=encoding, newline=newline) as f:
if write_back == WriteBack.YES:
with open(src, "w", encoding=encoding, newline=newline) as f:
try:
masked_src, replacements = mask_cell(src_without_trailing_semicolon)
except SyntaxError:
try:
masked_src, replacements = mask_cell(src_without_trailing_semicolon)
except SyntaxError:
+ raise NothingChanged from None
masked_dst = format_str(masked_src, mode=mode)
if not fast:
check_stability_and_equivalence(masked_src, masked_dst, mode=mode)
masked_dst = format_str(masked_src, mode=mode)
if not fast:
check_stability_and_equivalence(masked_src, masked_dst, mode=mode)
)
dst = dst.rstrip("\n")
if dst == src:
)
dst = dst.rstrip("\n")
if dst == src:
+ raise NothingChanged from None
"""
language = nb.get("metadata", {}).get("language_info", {}).get("name", None)
if language is not None and language != "python":
"""
language = nb.get("metadata", {}).get("language_info", {}).get("name", None)
if language is not None and language != "python":
+ raise NothingChanged from None
def format_ipynb_string(src_contents: str, *, fast: bool, mode: Mode) -> FileContent:
"""Format Jupyter notebook.
Operate cell-by-cell, only on code cells, only for Python notebooks.
def format_ipynb_string(src_contents: str, *, fast: bool, mode: Mode) -> FileContent:
"""Format Jupyter notebook.
Operate cell-by-cell, only on code cells, only for Python notebooks.
- If the ``.ipynb`` originally had a trailing newline, it'll be preseved.
+ If the ``.ipynb`` originally had a trailing newline, it'll be preser ved.
"""
trailing_newline = src_contents[-1] == "\n"
modified = False
"""
trailing_newline = src_contents[-1] == "\n"
modified = False
features.add(Feature.NUMERIC_UNDERSCORES)
elif n.type == token.SLASH:
features.add(Feature.NUMERIC_UNDERSCORES)
elif n.type == token.SLASH:
- if n.parent and n.parent.type in {syms.typedargslist, syms.arglist}:
+ if n.parent and n.parent.type in {
+ syms.typedargslist,
+ syms.arglist,
+ syms.varargslist,
+ }:
features.add(Feature.POS_ONLY_ARGUMENTS)
elif n.type == token.COLONEQUAL:
features.add(Feature.POS_ONLY_ARGUMENTS)
elif n.type == token.COLONEQUAL:
src_ast = parse_ast(src)
except Exception as exc:
raise AssertionError(
src_ast = parse_ast(src)
except Exception as exc:
raise AssertionError(
- "cannot use --safe with this file; failed to parse source file. AST"
- f" error message: {exc}"
- )
+ "cannot use --safe with this file; failed to parse source file."
+ ) from exc
try:
dst_ast = parse_ast(dst)
try:
dst_ast = parse_ast(dst)
"""
try:
from click import core
"""
try:
from click import core
- from click import _unicodefun # type: ignore
+ from click import _unicodefun
except ModuleNotFoundError:
return
except ModuleNotFoundError:
return