]>
git.madduck.net Git - etc/vim.git/blobdiff - black.py
madduck's git repository
Every one of the projects in this repository is available at the canonical
URL git://git.madduck.net/madduck/pub/<projectpath> — see
each project's metadata for the exact URL.
All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@ git. madduck. net .
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
SSH access, as well as push access can be individually
arranged .
If you use my repositories frequently, consider adding the following
snippet to ~/.gitconfig and using the third clone URL listed for each
project:
[url "git://git.madduck.net/madduck/"]
insteadOf = madduck:
from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
+from datetime import datetime
from enum import Enum, Flag
from functools import partial, wraps
import io
from enum import Enum, Flag
from functools import partial, wraps
import io
from multiprocessing import Manager
import os
from pathlib import Path
from multiprocessing import Manager
import os
from pathlib import Path
from typing import (
Any,
Callable,
from typing import (
Any,
Callable,
# types
FileContent = str
Encoding = str
# types
FileContent = str
Encoding = str
Depth = int
NodeType = int
LeafID = int
Depth = int
NodeType = int
LeafID = int
py36=py36, pyi=pyi, skip_string_normalization=skip_string_normalization
)
report = Report(check=check, quiet=quiet, verbose=verbose)
py36=py36, pyi=pyi, skip_string_normalization=skip_string_normalization
)
report = Report(check=check, quiet=quiet, verbose=verbose)
- sources: List[Path] = []
+ sources: Set[Path] = set()
try:
include_regex = re.compile(include)
except re.error:
try:
include_regex = re.compile(include)
except re.error:
for s in src:
p = Path(s)
if p.is_dir():
for s in src:
p = Path(s)
if p.is_dir():
gen_python_files_in_dir(p, root, include_regex, exclude_regex, report)
)
elif p.is_file() or s == "-":
# if a file was explicitly given, we don't care about its extension
gen_python_files_in_dir(p, root, include_regex, exclude_regex, report)
)
elif p.is_file() or s == "-":
# if a file was explicitly given, we don't care about its extension
else:
err(f"invalid path: {s}")
if len(sources) == 0:
else:
err(f"invalid path: {s}")
if len(sources) == 0:
elif len(sources) == 1:
reformat_one(
elif len(sources) == 1:
reformat_one(
line_length=line_length,
fast=fast,
write_back=write_back,
line_length=line_length,
fast=fast,
write_back=write_back,
)
finally:
shutdown(loop)
)
finally:
shutdown(loop)
- if verbose or not quiet:
- out("All done! ✨ 🍰 ✨")
- click.echo(str(report))
+ if verbose or not quiet:
+ out("All done! ✨ 🍰 ✨")
+ click.echo(str(report))
ctx.exit(report.return_code)
ctx.exit(report.return_code)
async def schedule_formatting(
async def schedule_formatting(
line_length: int,
fast: bool,
write_back: WriteBack,
line_length: int,
fast: bool,
write_back: WriteBack,
if write_back != WriteBack.DIFF:
cache = read_cache(line_length, mode)
sources, cached = filter_cached(cache, sources)
if write_back != WriteBack.DIFF:
cache = read_cache(line_length, mode)
sources, cached = filter_cached(cache, sources)
+ for src in sorted(cached) :
report.done(src, Changed.CACHED)
cancelled = []
formatted = []
report.done(src, Changed.CACHED)
cancelled = []
formatted = []
if src.suffix == ".pyi":
mode |= FileMode.PYI
if src.suffix == ".pyi":
mode |= FileMode.PYI
+ then = datetime.utcfromtimestamp(src.stat().st_mtime)
with open(src, "rb") as buf:
with open(src, "rb") as buf:
- newline, encoding, src_contents = prepare_input (buf.read())
+ src_contents, encoding, newline = decode_bytes (buf.read())
try:
dst_contents = format_file_contents(
src_contents, line_length=line_length, fast=fast, mode=mode
try:
dst_contents = format_file_contents(
src_contents, line_length=line_length, fast=fast, mode=mode
with open(src, "w", encoding=encoding, newline=newline) as f:
f.write(dst_contents)
elif write_back == write_back.DIFF:
with open(src, "w", encoding=encoding, newline=newline) as f:
f.write(dst_contents)
elif write_back == write_back.DIFF:
- src_name = f"{src} (original)"
- dst_name = f"{src} (formatted)"
+ now = datetime.utcnow()
+ src_name = f"{src}\t{then} +0000"
+ dst_name = f"{src}\t{now} +0000"
diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
if lock:
lock.acquire()
diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
if lock:
lock.acquire()
`line_length`, `fast`, `is_pyi`, and `force_py36` arguments are passed to
:func:`format_file_contents`.
"""
`line_length`, `fast`, `is_pyi`, and `force_py36` arguments are passed to
:func:`format_file_contents`.
"""
- newline, encoding, src = prepare_input(sys.stdin.buffer.read())
+ then = datetime.utcnow()
+ src, encoding, newline = decode_bytes(sys.stdin.buffer.read())
dst = src
try:
dst = format_file_contents(src, line_length=line_length, fast=fast, mode=mode)
dst = src
try:
dst = format_file_contents(src, line_length=line_length, fast=fast, mode=mode)
+ f = io.TextIOWrapper(
+ sys.stdout.buffer, encoding=encoding, newline=newline, write_through=True
+ )
if write_back == WriteBack.YES:
if write_back == WriteBack.YES:
- f = io.TextIOWrapper(
- sys.stdout.buffer,
- encoding=encoding,
- newline=newline,
- write_through=True,
- )
elif write_back == WriteBack.DIFF:
elif write_back == WriteBack.DIFF:
- src_name = "<stdin> (original)"
- dst_name = "<stdin> (formatted)"
- f = io.TextIOWrapper(
- sys.stdout.buffer,
- encoding=encoding,
- newline=newline,
- write_through=True,
- )
+ now = datetime.utcnow()
+ src_name = f"STDIN\t{then} +0000"
+ dst_name = f"STDOUT\t{now} +0000"
f.write(diff(src, dst, src_name, dst_name))
f.write(diff(src, dst, src_name, dst_name))
def format_file_contents(
def format_file_contents(
-def prepare_input(src: bytes) -> Tuple[str, str, str ]:
- """Analyze `src` and return a tuple of (newline, encoding, decoded_contents)
+def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine ]:
+ """Return a tuple of (decoded_contents, encoding, newline).
- Where `newline` is either CRLF or LF, and `decoded_contents` is decoded with
- universal newlines (i.e. only LF).
+ `newline` is either CRLF or LF but `decoded_contents` is decoded with
+ universal newlines (i.e. only contains LF).
"""
srcbuf = io.BytesIO(src)
encoding, lines = tokenize.detect_encoding(srcbuf.readline)
newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
srcbuf.seek(0)
"""
srcbuf = io.BytesIO(src)
encoding, lines = tokenize.detect_encoding(srcbuf.readline)
newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
srcbuf.seek(0)
- return newline, encoding, io.TextIOWrapper(srcbuf, encoding).read()
+ with io.TextIOWrapper(srcbuf, encoding) as tiow:
+ return tiow.read(), encoding, newline
return stat.st_mtime, stat.st_size
return stat.st_mtime, stat.st_size
-def filter_cached(
- cache: Cache, sources: Iterable[Path]
-) -> Tuple[List[Path], List[Path]]:
- """Split a list of paths into two.
+def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
+ """Split an iterable of paths in `sources` into two sets.
- The first list contains paths of files that modified on disk or are not in the
- cache. The other list contains paths to non-modified files.
+ The first contains paths of files that modified on disk or are not in the
+ cache. The other contains paths to non-modified files.
+ todo, done = set(), set()
for src in sources:
src = src.resolve()
if cache.get(src) != get_cache_info(src):
for src in sources:
src = src.resolve()
if cache.get(src) != get_cache_info(src):
return todo, done
def write_cache(
return todo, done
def write_cache(
- cache: Cache, sources: List [Path], line_length: int, mode: FileMode
+ cache: Cache, sources: Iterable [Path], line_length: int, mode: FileMode
) -> None:
"""Update the cache file."""
cache_file = get_cache_file(line_length, mode)
) -> None:
"""Update the cache file."""
cache_file = get_cache_file(line_length, mode)