]>
git.madduck.net Git - etc/vim.git/blobdiff - black.py
madduck's git repository
Every one of the projects in this repository is available at the canonical
URL git://git.madduck.net/madduck/pub/<projectpath> — see
each project's metadata for the exact URL.
All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@ git. madduck. net .
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
SSH access, as well as push access can be individually
arranged .
If you use my repositories frequently, consider adding the following
snippet to ~/.gitconfig and using the third clone URL listed for each
project:
[url "git://git.madduck.net/madduck/"]
insteadOf = madduck:
import pickle
from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
import pickle
from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
+from enum import Enum, Flag
from functools import partial, wraps
import keyword
import logging
from functools import partial, wraps
import keyword
import logging
from blib2to3.pgen2.parse import ParseError
from blib2to3.pgen2.parse import ParseError
DEFAULT_LINE_LENGTH = 88
CACHE_DIR = Path(user_cache_dir("black", version=__version__))
DEFAULT_LINE_LENGTH = 88
CACHE_DIR = Path(user_cache_dir("black", version=__version__))
+class FileMode(Flag):
+ AUTO_DETECT = 0
+ PYTHON36 = 1
+ PYI = 2
+ NO_STRING_NORMALIZATION = 4
+
+
@click.command()
@click.option(
"-l",
@click.command()
@click.option(
"-l",
"**kwargs. [default: per-file auto-detection]"
),
)
"**kwargs. [default: per-file auto-detection]"
),
)
+@click.option(
+ "-S",
+ "--skip-string-normalization",
+ is_flag=True,
+ help="Don't normalize string quotes or prefixes.",
+)
@click.version_option(version=__version__)
@click.argument(
"src",
@click.version_option(version=__version__)
@click.argument(
"src",
fast: bool,
pyi: bool,
py36: bool,
fast: bool,
pyi: bool,
py36: bool,
+ skip_string_normalization: bool,
quiet: bool,
src: List[str],
) -> None:
quiet: bool,
src: List[str],
) -> None:
write_back = WriteBack.DIFF
else:
write_back = WriteBack.YES
write_back = WriteBack.DIFF
else:
write_back = WriteBack.YES
+ mode = FileMode.AUTO_DETECT
+ if py36:
+ mode |= FileMode.PYTHON36
+ if pyi:
+ mode |= FileMode.PYI
+ if skip_string_normalization:
+ mode |= FileMode.NO_STRING_NORMALIZATION
report = Report(check=check, quiet=quiet)
if len(sources) == 0:
out("No paths given. Nothing to do 😴")
report = Report(check=check, quiet=quiet)
if len(sources) == 0:
out("No paths given. Nothing to do 😴")
src=sources[0],
line_length=line_length,
fast=fast,
src=sources[0],
line_length=line_length,
fast=fast,
sources=sources,
line_length=line_length,
fast=fast,
sources=sources,
line_length=line_length,
fast=fast,
report=report,
loop=loop,
executor=executor,
report=report,
loop=loop,
executor=executor,
src: Path,
line_length: int,
fast: bool,
src: Path,
line_length: int,
fast: bool,
- pyi: bool,
- py36: bool,
report: "Report",
) -> None:
"""Reformat a single file under `src` without spawning child processes.
report: "Report",
) -> None:
"""Reformat a single file under `src` without spawning child processes.
changed = Changed.NO
if not src.is_file() and str(src) == "-":
if format_stdin_to_stdout(
changed = Changed.NO
if not src.is_file() and str(src) == "-":
if format_stdin_to_stdout(
- line_length=line_length,
- fast=fast,
- is_pyi=pyi,
- force_py36=py36,
- write_back=write_back,
+ line_length=line_length, fast=fast, write_back=write_back, mode=mode
):
changed = Changed.YES
else:
cache: Cache = {}
if write_back != WriteBack.DIFF:
):
changed = Changed.YES
else:
cache: Cache = {}
if write_back != WriteBack.DIFF:
- cache = read_cache(line_length, pyi, py36 )
+ cache = read_cache(line_length, mode )
src = src.resolve()
if src in cache and cache[src] == get_cache_info(src):
changed = Changed.CACHED
src = src.resolve()
if src in cache and cache[src] == get_cache_info(src):
changed = Changed.CACHED
src,
line_length=line_length,
fast=fast,
src,
line_length=line_length,
fast=fast,
- force_pyi=pyi,
- force_py36=py36,
):
changed = Changed.YES
if write_back == WriteBack.YES and changed is not Changed.NO:
):
changed = Changed.YES
if write_back == WriteBack.YES and changed is not Changed.NO:
- write_cache(cache, [src], line_length, pyi, py36 )
+ write_cache(cache, [src], line_length, mode )
report.done(src, changed)
except Exception as exc:
report.failed(src, str(exc))
report.done(src, changed)
except Exception as exc:
report.failed(src, str(exc))
sources: List[Path],
line_length: int,
fast: bool,
sources: List[Path],
line_length: int,
fast: bool,
- pyi: bool,
- py36: bool,
report: "Report",
loop: BaseEventLoop,
executor: Executor,
report: "Report",
loop: BaseEventLoop,
executor: Executor,
"""
cache: Cache = {}
if write_back != WriteBack.DIFF:
"""
cache: Cache = {}
if write_back != WriteBack.DIFF:
- cache = read_cache(line_length, pyi, py36 )
+ cache = read_cache(line_length, mode )
sources, cached = filter_cached(cache, sources)
for src in cached:
report.done(src, Changed.CACHED)
sources, cached = filter_cached(cache, sources)
for src in cached:
report.done(src, Changed.CACHED)
lock,
): src
for src in sorted(sources)
lock,
): src
for src in sorted(sources)
if cancelled:
await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
if write_back == WriteBack.YES and formatted:
if cancelled:
await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
if write_back == WriteBack.YES and formatted:
- write_cache(cache, formatted, line_length, pyi, py36 )
+ write_cache(cache, formatted, line_length, mode )
def format_file_in_place(
src: Path,
line_length: int,
fast: bool,
def format_file_in_place(
src: Path,
line_length: int,
fast: bool,
- force_pyi: bool = False,
- force_py36: bool = False,
write_back: WriteBack = WriteBack.NO,
write_back: WriteBack = WriteBack.NO,
+ mode: FileMode = FileMode.AUTO_DETECT,
lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
) -> bool:
"""Format file under `src` path. Return True if changed.
lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
) -> bool:
"""Format file under `src` path. Return True if changed.
If `write_back` is True, write reformatted code back to stdout.
`line_length` and `fast` options are passed to :func:`format_file_contents`.
"""
If `write_back` is True, write reformatted code back to stdout.
`line_length` and `fast` options are passed to :func:`format_file_contents`.
"""
- is_pyi = force_pyi or src.suffix == ".pyi"
-
+ if src.suffix == ".pyi":
+ mode |= FileMode.PYI
with tokenize.open(src) as src_buffer:
src_contents = src_buffer.read()
try:
dst_contents = format_file_contents(
with tokenize.open(src) as src_buffer:
src_contents = src_buffer.read()
try:
dst_contents = format_file_contents(
- src_contents,
- line_length=line_length,
- fast=fast,
- is_pyi=is_pyi,
- force_py36=force_py36,
+ src_contents, line_length=line_length, fast=fast, mode=mode
)
except NothingChanged:
return False
)
except NothingChanged:
return False
def format_stdin_to_stdout(
line_length: int,
fast: bool,
def format_stdin_to_stdout(
line_length: int,
fast: bool,
- is_pyi: bool = False,
- force_py36: bool = False,
write_back: WriteBack = WriteBack.NO,
write_back: WriteBack = WriteBack.NO,
+ mode: FileMode = FileMode.AUTO_DETECT,
) -> bool:
"""Format file on stdin. Return True if changed.
) -> bool:
"""Format file on stdin. Return True if changed.
src = sys.stdin.read()
dst = src
try:
src = sys.stdin.read()
dst = src
try:
- dst = format_file_contents(
- src,
- line_length=line_length,
- fast=fast,
- is_pyi=is_pyi,
- force_py36=force_py36,
- )
+ dst = format_file_contents(src, line_length=line_length, fast=fast, mode=mode)
return True
except NothingChanged:
return True
except NothingChanged:
*,
line_length: int,
fast: bool,
*,
line_length: int,
fast: bool,
- is_pyi: bool = False,
- force_py36: bool = False,
+ mode: FileMode = FileMode.AUTO_DETECT,
) -> FileContent:
"""Reformat contents a file and return new contents.
) -> FileContent:
"""Reformat contents a file and return new contents.
if src_contents.strip() == "":
raise NothingChanged
if src_contents.strip() == "":
raise NothingChanged
- dst_contents = format_str(
- src_contents, line_length=line_length, is_pyi=is_pyi, force_py36=force_py36
- )
+ dst_contents = format_str(src_contents, line_length=line_length, mode=mode)
if src_contents == dst_contents:
raise NothingChanged
if not fast:
assert_equivalent(src_contents, dst_contents)
if src_contents == dst_contents:
raise NothingChanged
if not fast:
assert_equivalent(src_contents, dst_contents)
- assert_stable(
- src_contents,
- dst_contents,
- line_length=line_length,
- is_pyi=is_pyi,
- force_py36=force_py36,
- )
+ assert_stable(src_contents, dst_contents, line_length=line_length, mode=mode)
return dst_contents
def format_str(
return dst_contents
def format_str(
- src_contents: str,
- line_length: int,
- *,
- is_pyi: bool = False,
- force_py36: bool = False,
+ src_contents: str, line_length: int, *, mode: FileMode = FileMode.AUTO_DETECT
) -> FileContent:
"""Reformat a string and return new contents.
) -> FileContent:
"""Reformat a string and return new contents.
src_node = lib2to3_parse(src_contents)
dst_contents = ""
future_imports = get_future_imports(src_node)
src_node = lib2to3_parse(src_contents)
dst_contents = ""
future_imports = get_future_imports(src_node)
- elt = EmptyLineTracker(is_pyi=is_pyi)
- py36 = force_py36 or is_python36(src_node)
+ is_pyi = bool(mode & FileMode.PYI)
+ py36 = bool(mode & FileMode.PYTHON36) or is_python36(src_node)
+ normalize_strings = not bool(mode & FileMode.NO_STRING_NORMALIZATION)
- remove_u_prefix=py36 or "unicode_literals" in future_imports, is_pyi=is_pyi
+ remove_u_prefix=py36 or "unicode_literals" in future_imports,
+ is_pyi=is_pyi,
+ normalize_strings=normalize_strings,
+ elt = EmptyLineTracker(is_pyi=is_pyi)
empty_line = Line()
after = 0
for current_line in lines.visit(src_node):
empty_line = Line()
after = 0
for current_line in lines.visit(src_node):
and self.leaves[3].value == ")"
)
and self.leaves[3].value == ")"
)
+ @property
+ def is_triple_quoted_string(self) -> bool:
+ """Is the line a triple quoted string?"""
+ return (
+ bool(self)
+ and self.leaves[0].type == token.STRING
+ and self.leaves[0].value.startswith(('"""', "'''"))
+ )
+
def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
"""If so, needs to be split before emitting."""
for leaf in self.leaves:
def contains_standalone_comments(self, depth_limit: int = sys.maxsize) -> bool:
"""If so, needs to be split before emitting."""
for leaf in self.leaves:
the prefix of the first leaf consists of optional newlines. Those newlines
are consumed by `maybe_empty_lines()` and included in the computation.
"""
the prefix of the first leaf consists of optional newlines. Those newlines
are consumed by `maybe_empty_lines()` and included in the computation.
"""
is_pyi: bool = False
previous_line: Optional[Line] = None
previous_after: int = 0
is_pyi: bool = False
previous_line: Optional[Line] = None
previous_after: int = 0
if self.previous_line.is_decorator:
return 0, 0
if self.previous_line.is_decorator:
return 0, 0
+ if self.previous_line.depth < current_line.depth and (
+ self.previous_line.is_class or self.previous_line.is_def
+ ):
+ return 0, 0
+
if (
self.previous_line.is_comment
and self.previous_line.depth == current_line.depth
if (
self.previous_line.is_comment
and self.previous_line.depth == current_line.depth
):
return (before or 1), 0
):
return (before or 1), 0
+ if (
+ self.previous_line
+ and self.previous_line.is_class
+ and current_line.is_triple_quoted_string
+ ):
+ return before, 1
+
Note: destroys the tree it's visiting by mutating prefixes of its leaves
in ways that will no longer stringify to valid Python code on the tree.
"""
Note: destroys the tree it's visiting by mutating prefixes of its leaves
in ways that will no longer stringify to valid Python code on the tree.
"""
+ normalize_strings: bool = True
current_line: Line = Factory(Line)
remove_u_prefix: bool = False
current_line: Line = Factory(Line)
remove_u_prefix: bool = False
else:
normalize_prefix(node, inside_brackets=any_open_brackets)
else:
normalize_prefix(node, inside_brackets=any_open_brackets)
- if node.type == token.STRING:
+ if self.normalize_strings and node.type == token.STRING:
normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
normalize_string_quotes(node)
if node.type not in WHITESPACE:
normalize_string_prefix(node, remove_u_prefix=self.remove_u_prefix)
normalize_string_quotes(node)
if node.type not in WHITESPACE:
@dataclass
class Report:
"""Provides a reformatting counter. Can be rendered with `str(report)`."""
@dataclass
class Report:
"""Provides a reformatting counter. Can be rendered with `str(report)`."""
check: bool = False
quiet: bool = False
change_count: int = 0
check: bool = False
quiet: bool = False
change_count: int = 0
- src: str, dst: str, line_length: int, is_pyi: bool = False, force_py36: bool = False
+ src: str, dst: str, line_length: int, mode: FileMode = FileMode.AUTO_DETECT
) -> None:
"""Raise AssertionError if `dst` reformats differently the second time."""
) -> None:
"""Raise AssertionError if `dst` reformats differently the second time."""
- newdst = format_str(
- dst, line_length=line_length, is_pyi=is_pyi, force_py36=force_py36
- )
+ newdst = format_str(dst, line_length=line_length, mode=mode)
if dst != newdst:
log = dump_to_file(
diff(src, dst, "source", "first pass"),
if dst != newdst:
log = dump_to_file(
diff(src, dst, "source", "first pass"),
-def get_cache_file(line_length: int, pyi: bool = False, py36: bool = False) -> Path:
+def get_cache_file(line_length: int, mode: FileMode) -> Path:
+ pyi = bool(mode & FileMode.PYI)
+ py36 = bool(mode & FileMode.PYTHON36)
return (
CACHE_DIR
/ f"cache.{line_length}{'.pyi' if pyi else ''}{'.py36' if py36 else ''}.pickle"
)
return (
CACHE_DIR
/ f"cache.{line_length}{'.pyi' if pyi else ''}{'.py36' if py36 else ''}.pickle"
)
-def read_cache(line_length: int, pyi: bool = False, py36: bool = Fals e) -> Cache:
+def read_cache(line_length: int, mode: FileMod e) -> Cache:
"""Read the cache if it exists and is well formed.
If it is not well formed, the call to write_cache later should resolve the issue.
"""
"""Read the cache if it exists and is well formed.
If it is not well formed, the call to write_cache later should resolve the issue.
"""
- cache_file = get_cache_file(line_length, pyi, py36 )
+ cache_file = get_cache_file(line_length, mode )
if not cache_file.exists():
return {}
if not cache_file.exists():
return {}
- cache: Cache,
- sources: List[Path],
- line_length: int,
- pyi: bool = False,
- py36: bool = False,
+ cache: Cache, sources: List[Path], line_length: int, mode: FileMode
) -> None:
"""Update the cache file."""
) -> None:
"""Update the cache file."""
- cache_file = get_cache_file(line_length, pyi, py36 )
+ cache_file = get_cache_file(line_length, mode )
try:
if not CACHE_DIR.exists():
CACHE_DIR.mkdir(parents=True)
try:
if not CACHE_DIR.exists():
CACHE_DIR.mkdir(parents=True)