from concurrent.futures import Executor, ProcessPoolExecutor
from enum import Enum, Flag
from functools import partial, wraps
+import io
import keyword
import logging
from multiprocessing import Manager
py36=py36, pyi=pyi, skip_string_normalization=skip_string_normalization
)
report = Report(check=check, quiet=quiet, verbose=verbose)
- sources: List[Path] = []
+ sources: Set[Path] = set()
try:
include_regex = re.compile(include)
except re.error:
for s in src:
p = Path(s)
if p.is_dir():
- sources.extend(
+ sources.update(
gen_python_files_in_dir(p, root, include_regex, exclude_regex, report)
)
elif p.is_file() or s == "-":
# if a file was explicitly given, we don't care about its extension
- sources.append(p)
+ sources.add(p)
else:
err(f"invalid path: {s}")
if len(sources) == 0:
- out("No paths given. Nothing to do 😴")
+ if verbose or not quiet:
+ out("No paths given. Nothing to do 😴")
ctx.exit(0)
return
elif len(sources) == 1:
reformat_one(
- src=sources[0],
+ src=sources.pop(),
line_length=line_length,
fast=fast,
write_back=write_back,
)
finally:
shutdown(loop)
- if not quiet:
+ if verbose or not quiet:
out("All done! ✨ 🍰 ✨")
click.echo(str(report))
ctx.exit(report.return_code)
async def schedule_formatting(
- sources: List[Path],
+ sources: Set[Path],
line_length: int,
fast: bool,
write_back: WriteBack,
if write_back != WriteBack.DIFF:
cache = read_cache(line_length, mode)
sources, cached = filter_cached(cache, sources)
- for src in cached:
+ for src in sorted(cached):
report.done(src, Changed.CACHED)
cancelled = []
formatted = []
"""
if src.suffix == ".pyi":
mode |= FileMode.PYI
- with tokenize.open(src) as src_buffer:
- src_contents = src_buffer.read()
+
+ with open(src, "rb") as buf:
+ newline, encoding, src_contents = prepare_input(buf.read())
try:
dst_contents = format_file_contents(
src_contents, line_length=line_length, fast=fast, mode=mode
return False
if write_back == write_back.YES:
- with open(src, "w", encoding=src_buffer.encoding) as f:
+ with open(src, "w", encoding=encoding, newline=newline) as f:
f.write(dst_contents)
elif write_back == write_back.DIFF:
src_name = f"{src} (original)"
if lock:
lock.acquire()
try:
- sys.stdout.write(diff_contents)
+ f = io.TextIOWrapper(
+ sys.stdout.buffer,
+ encoding=encoding,
+ newline=newline,
+ write_through=True,
+ )
+ f.write(diff_contents)
+ f.detach()
finally:
if lock:
lock.release()
`line_length`, `fast`, `is_pyi`, and `force_py36` arguments are passed to
:func:`format_file_contents`.
"""
- src = sys.stdin.read()
+ newline, encoding, src = prepare_input(sys.stdin.buffer.read())
dst = src
try:
dst = format_file_contents(src, line_length=line_length, fast=fast, mode=mode)
finally:
if write_back == WriteBack.YES:
- sys.stdout.write(dst)
+ f = io.TextIOWrapper(
+ sys.stdout.buffer,
+ encoding=encoding,
+ newline=newline,
+ write_through=True,
+ )
+ f.write(dst)
+ f.detach()
elif write_back == WriteBack.DIFF:
src_name = "<stdin> (original)"
dst_name = "<stdin> (formatted)"
- sys.stdout.write(diff(src, dst, src_name, dst_name))
+ f = io.TextIOWrapper(
+ sys.stdout.buffer,
+ encoding=encoding,
+ newline=newline,
+ write_through=True,
+ )
+ f.write(diff(src, dst, src_name, dst_name))
+ f.detach()
def format_file_contents(
return dst_contents
+def prepare_input(src: bytes) -> Tuple[str, str, str]:
+ """Analyze `src` and return a tuple of (newline, encoding, decoded_contents)
+
+ Where `newline` is either CRLF or LF, and `decoded_contents` is decoded with
+ universal newlines (i.e. only LF).
+ """
+ srcbuf = io.BytesIO(src)
+ encoding, lines = tokenize.detect_encoding(srcbuf.readline)
+ newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
+ srcbuf.seek(0)
+ return newline, encoding, io.TextIOWrapper(srcbuf, encoding).read()
+
+
GRAMMARS = [
pygram.python_grammar_no_print_statement_no_exec_statement,
pygram.python_grammar_no_print_statement,
"""Given a string with source, return the lib2to3 Node."""
grammar = pygram.python_grammar_no_print_statement
if src_txt[-1] != "\n":
- nl = "\r\n" if "\r\n" in src_txt[:1024] else "\n"
- src_txt += nl
+ src_txt += "\n"
for grammar in GRAMMARS:
drv = driver.Driver(grammar, pytree.convert)
try:
"""
assert root.is_absolute(), f"INTERNAL ERROR: `root` must be absolute but is {root}"
for child in path.iterdir():
- normalized_path = child.resolve().relative_to(root).as_posix()
+ normalized_path = "/" + child.resolve().relative_to(root).as_posix()
if child.is_dir():
normalized_path += "/"
exclude_match = exclude.search(normalized_path)
return stat.st_mtime, stat.st_size
-def filter_cached(
- cache: Cache, sources: Iterable[Path]
-) -> Tuple[List[Path], List[Path]]:
- """Split a list of paths into two.
+def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
+ """Split an iterable of paths in `sources` into two sets.
- The first list contains paths of files that modified on disk or are not in the
- cache. The other list contains paths to non-modified files.
+ The first contains paths of files that modified on disk or are not in the
+ cache. The other contains paths to non-modified files.
"""
- todo, done = [], []
+ todo, done = set(), set()
for src in sources:
src = src.resolve()
if cache.get(src) != get_cache_info(src):
- todo.append(src)
+ todo.add(src)
else:
- done.append(src)
+ done.add(src)
return todo, done
def write_cache(
- cache: Cache, sources: List[Path], line_length: int, mode: FileMode
+ cache: Cache, sources: Iterable[Path], line_length: int, mode: FileMode
) -> None:
"""Update the cache file."""
cache_file = get_cache_file(line_length, mode)