import asyncio
-import pickle
from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
+from datetime import datetime
from enum import Enum, Flag
from functools import partial, wraps
import io
from multiprocessing import Manager
import os
from pathlib import Path
+import pickle
import re
-import tokenize
import signal
import sys
+import tokenize
from typing import (
Any,
Callable,
# types
FileContent = str
Encoding = str
+NewLine = str
Depth = int
NodeType = int
LeafID = int
)
finally:
shutdown(loop)
- if verbose or not quiet:
- out("All done! ✨ 🍰 ✨")
- click.echo(str(report))
+ if verbose or not quiet:
+ out("All done! ✨ 🍰 ✨")
+ click.echo(str(report))
ctx.exit(report.return_code)
if src.suffix == ".pyi":
mode |= FileMode.PYI
+ then = datetime.utcfromtimestamp(src.stat().st_mtime)
with open(src, "rb") as buf:
- newline, encoding, src_contents = prepare_input(buf.read())
+ src_contents, encoding, newline = decode_bytes(buf.read())
try:
dst_contents = format_file_contents(
src_contents, line_length=line_length, fast=fast, mode=mode
with open(src, "w", encoding=encoding, newline=newline) as f:
f.write(dst_contents)
elif write_back == write_back.DIFF:
- src_name = f"{src} (original)"
- dst_name = f"{src} (formatted)"
+ now = datetime.utcnow()
+ src_name = f"{src}\t{then} +0000"
+ dst_name = f"{src}\t{now} +0000"
diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
if lock:
lock.acquire()
`line_length`, `fast`, `is_pyi`, and `force_py36` arguments are passed to
:func:`format_file_contents`.
"""
- newline, encoding, src = prepare_input(sys.stdin.buffer.read())
+ then = datetime.utcnow()
+ src, encoding, newline = decode_bytes(sys.stdin.buffer.read())
dst = src
try:
dst = format_file_contents(src, line_length=line_length, fast=fast, mode=mode)
return False
finally:
+ f = io.TextIOWrapper(
+ sys.stdout.buffer, encoding=encoding, newline=newline, write_through=True
+ )
if write_back == WriteBack.YES:
- f = io.TextIOWrapper(
- sys.stdout.buffer,
- encoding=encoding,
- newline=newline,
- write_through=True,
- )
f.write(dst)
- f.detach()
elif write_back == WriteBack.DIFF:
- src_name = "<stdin> (original)"
- dst_name = "<stdin> (formatted)"
- f = io.TextIOWrapper(
- sys.stdout.buffer,
- encoding=encoding,
- newline=newline,
- write_through=True,
- )
+ now = datetime.utcnow()
+ src_name = f"STDIN\t{then} +0000"
+ dst_name = f"STDOUT\t{now} +0000"
f.write(diff(src, dst, src_name, dst_name))
- f.detach()
+ f.detach()
def format_file_contents(
return dst_contents
-def prepare_input(src: bytes) -> Tuple[str, str, str]:
- """Analyze `src` and return a tuple of (newline, encoding, decoded_contents)
+def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine]:
+ """Return a tuple of (decoded_contents, encoding, newline).
- Where `newline` is either CRLF or LF, and `decoded_contents` is decoded with
- universal newlines (i.e. only LF).
+ `newline` is either CRLF or LF but `decoded_contents` is decoded with
+ universal newlines (i.e. only contains LF).
"""
srcbuf = io.BytesIO(src)
encoding, lines = tokenize.detect_encoding(srcbuf.readline)
+ if not lines:
+ return "", encoding, "\n"
+
newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n"
srcbuf.seek(0)
- return newline, encoding, io.TextIOWrapper(srcbuf, encoding).read()
+ with io.TextIOWrapper(srcbuf, encoding) as tiow:
+ return tiow.read(), encoding, newline
GRAMMARS = [
def lib2to3_parse(src_txt: str) -> Node:
"""Given a string with source, return the lib2to3 Node."""
grammar = pygram.python_grammar_no_print_statement
- if src_txt[-1] != "\n":
+ if src_txt[-1:] != "\n":
src_txt += "\n"
for grammar in GRAMMARS:
drv = driver.Driver(grammar, pytree.convert)
syms.dictsetmaker,
syms.listmaker,
syms.testlist_gexp,
+ syms.testlist_star_expr,
}
TEST_DESCENDANTS = {
syms.test,
return False
+ def contains_multiline_strings(self) -> bool:
+ for leaf in self.leaves:
+ if is_multiline_string(leaf):
+ return True
+
+ return False
+
def maybe_remove_trailing_comma(self, closing: Leaf) -> bool:
"""Remove trailing comma if there is one and it's safe."""
if not (
result.append(leaf, preformatted=True)
for comment_after in line.comments_after(leaf):
result.append(comment_after, preformatted=True)
- bracket_split_succeeded_or_raise(head, body, tail)
assert opening_bracket and closing_bracket
+ body.should_explode = should_explode(body, opening_bracket)
+ bracket_split_succeeded_or_raise(head, body, tail)
if (
+ # the body shouldn't be exploded
+ not body.should_explode
# the opening bracket is an optional paren
- opening_bracket.type == token.LPAR
+ and opening_bracket.type == token.LPAR
and not opening_bracket.value
# the closing bracket is an optional paren
and closing_bracket.type == token.RPAR
and not closing_bracket.value
- # there are no standalone comments in the body
- and not line.contains_standalone_comments(0)
- # and it's not an import (optional parens are the only thing we can split
- # on in this case; attempting a split without them is a waste of time)
+ # it's not an import (optional parens are the only thing we can split on
+ # in this case; attempting a split without them is a waste of time)
and not line.is_import
+ # there are no standalone comments in the body
+ and not body.contains_standalone_comments(0)
+ # and we can actually remove the parens
+ and can_omit_invisible_parens(body, line_length)
):
omit = {id(closing_bracket), *omit}
- if can_omit_invisible_parens(body, line_length):
- try:
- yield from right_hand_split(line, line_length, py36=py36, omit=omit)
- return
- except CannotSplit:
- pass
+ try:
+ yield from right_hand_split(line, line_length, py36=py36, omit=omit)
+ return
+
+ except CannotSplit:
+ if not (
+ can_be_split(body)
+ or is_line_short_enough(body, line_length=line_length)
+ ):
+ raise CannotSplit(
+ "Splitting failed, body is still too long and can't be split."
+ )
+
+ elif head.contains_multiline_strings() or tail.contains_multiline_strings():
+ raise CannotSplit(
+ "The current optional pair of parentheses is bound to fail to "
+ "satisfy the splitting algorithm becase the head or the tail "
+ "contains multiline strings which by definition never fit one "
+ "line."
+ )
ensure_visible(opening_bracket)
ensure_visible(closing_bracket)
- body.should_explode = should_explode(body, opening_bracket)
for result in (head, body, tail):
if result:
yield result
)
+def can_be_split(line: Line) -> bool:
+ """Return False if the line cannot be split *for sure*.
+
+ This is not an exhaustive search but a cheap heuristic that we can use to
+ avoid some unfortunate formattings (mostly around wrapping unsplittable code
+ in unnecessary parentheses).
+ """
+ leaves = line.leaves
+ if len(leaves) < 2:
+ return False
+
+ if leaves[0].type == token.STRING and leaves[1].type == token.DOT:
+ call_count = 0
+ dot_count = 0
+ next = leaves[-1]
+ for leaf in leaves[-2::-1]:
+ if leaf.type in OPENING_BRACKETS:
+ if next.type not in CLOSING_BRACKETS:
+ return False
+
+ call_count += 1
+ elif leaf.type == token.DOT:
+ dot_count += 1
+ elif leaf.type == token.NAME:
+ if not (next.type == token.DOT or next.type in OPENING_BRACKETS):
+ return False
+
+ elif leaf.type not in CLOSING_BRACKETS:
+ return False
+
+ if dot_count > 1 and call_count > 1:
+ return False
+
+ return True
+
+
def can_omit_invisible_parens(line: Line, line_length: int) -> bool:
"""Does `line` have a shape safe to reformat without optional parens around it?