Timestamp = float
FileSize = int
CacheInfo = Tuple[Timestamp, FileSize]
-Cache = Dict[Path, CacheInfo]
+Cache = Dict[str, CacheInfo]
out = partial(click.secho, bold=True, err=True)
err = partial(click.secho, fg="red", err=True)
if write_back not in (WriteBack.DIFF, WriteBack.COLOR_DIFF):
cache = read_cache(mode)
res_src = src.resolve()
- if res_src in cache and cache[res_src] == get_cache_info(res_src):
+ res_src_s = str(res_src)
+ if res_src_s in cache and cache[res_src_s] == get_cache_info(res_src):
changed = Changed.CACHED
if changed is not Changed.CACHED and format_file_in_place(
src, fast=fast, write_back=write_back, mode=mode
dst_name = f"{src}\t{now} +0000"
diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
- if write_back == write_back.COLOR_DIFF:
+ if write_back == WriteBack.COLOR_DIFF:
diff_contents = color_diff(diff_contents)
with lock or nullcontext():
comments: Dict[LeafID, List[Leaf]] = field(default_factory=dict)
bracket_tracker: BracketTracker = field(default_factory=BracketTracker)
inside_brackets: bool = False
- should_explode: bool = False
+ should_split_rhs: bool = False
+ magic_trailing_comma: Optional[Leaf] = None
def append(self, leaf: Leaf, preformatted: bool = False) -> None:
"""Add a new `leaf` to the end of the line.
self.bracket_tracker.mark(leaf)
if self.mode.magic_trailing_comma:
if self.has_magic_trailing_comma(leaf):
- self.should_explode = True
+ self.magic_trailing_comma = leaf
elif self.has_magic_trailing_comma(leaf, ensure_removable=True):
self.remove_trailing_comma()
if not self.append_comment(leaf):
mode=self.mode,
depth=self.depth,
inside_brackets=self.inside_brackets,
- should_explode=self.should_explode,
+ should_split_rhs=self.should_split_rhs,
+ magic_trailing_comma=self.magic_trailing_comma,
)
def __str__(self) -> str:
FMT_OFF = {"# fmt: off", "# fmt:off", "# yapf: disable"}
+FMT_SKIP = {"# fmt: skip", "# fmt:skip"}
+FMT_PASS = {*FMT_OFF, *FMT_SKIP}
FMT_ON = {"# fmt: on", "# fmt:on", "# yapf: enable"}
consumed = 0
nlines = 0
ignored_lines = 0
- for index, line in enumerate(prefix.split("\n")):
+ for index, line in enumerate(re.split("\r?\n", prefix)):
consumed += len(line) + 1 # adding the length of the split '\n'
line = line.lstrip()
if not line:
transformers: List[Transformer]
if (
not line.contains_uncollapsable_type_comments()
- and not line.should_explode
+ and not line.should_split_rhs
+ and not line.magic_trailing_comma
and (
is_line_short_enough(line, line_length=mode.line_length, line_str=line_str)
or line.contains_unsplittable_type_ignore()
mode=line.mode,
depth=line.depth + 1,
inside_brackets=True,
- should_explode=line.should_explode,
+ should_split_rhs=line.should_split_rhs,
+ magic_trailing_comma=line.magic_trailing_comma,
)
string_leaf = Leaf(token.STRING, string_value)
insert_str_child(string_leaf)
result.append(leaf, preformatted=True)
for comment_after in original.comments_after(leaf):
result.append(comment_after, preformatted=True)
- if is_body and should_split_body_explode(result, opening_bracket):
- result.should_explode = True
+ if is_body and should_split_line(result, opening_bracket):
+ result.should_split_rhs = True
return result
for leaf in node.leaves():
previous_consumed = 0
for comment in list_comments(leaf.prefix, is_endmarker=False):
- if comment.value in FMT_OFF:
- # We only want standalone comments. If there's no previous leaf or
- # the previous leaf is indentation, it's a standalone comment in
- # disguise.
- if comment.type != STANDALONE_COMMENT:
- prev = preceding_leaf(leaf)
- if prev and prev.type not in WHITESPACE:
+ if comment.value not in FMT_PASS:
+ previous_consumed = comment.consumed
+ continue
+ # We only want standalone comments. If there's no previous leaf or
+ # the previous leaf is indentation, it's a standalone comment in
+ # disguise.
+ if comment.value in FMT_PASS and comment.type != STANDALONE_COMMENT:
+ prev = preceding_leaf(leaf)
+ if prev:
+ if comment.value in FMT_OFF and prev.type not in WHITESPACE:
+ continue
+ if comment.value in FMT_SKIP and prev.type in WHITESPACE:
continue
- ignored_nodes = list(generate_ignored_nodes(leaf))
- if not ignored_nodes:
- continue
-
- first = ignored_nodes[0] # Can be a container node with the `leaf`.
- parent = first.parent
- prefix = first.prefix
- first.prefix = prefix[comment.consumed :]
- hidden_value = (
- comment.value + "\n" + "".join(str(n) for n in ignored_nodes)
- )
- if hidden_value.endswith("\n"):
- # That happens when one of the `ignored_nodes` ended with a NEWLINE
- # leaf (possibly followed by a DEDENT).
- hidden_value = hidden_value[:-1]
- first_idx: Optional[int] = None
- for ignored in ignored_nodes:
- index = ignored.remove()
- if first_idx is None:
- first_idx = index
- assert parent is not None, "INTERNAL ERROR: fmt: on/off handling (1)"
- assert first_idx is not None, "INTERNAL ERROR: fmt: on/off handling (2)"
- parent.insert_child(
- first_idx,
- Leaf(
- STANDALONE_COMMENT,
- hidden_value,
- prefix=prefix[:previous_consumed] + "\n" * comment.newlines,
- ),
- )
- return True
+ ignored_nodes = list(generate_ignored_nodes(leaf, comment))
+ if not ignored_nodes:
+ continue
- previous_consumed = comment.consumed
+ first = ignored_nodes[0] # Can be a container node with the `leaf`.
+ parent = first.parent
+ prefix = first.prefix
+ first.prefix = prefix[comment.consumed :]
+ hidden_value = "".join(str(n) for n in ignored_nodes)
+ if comment.value in FMT_OFF:
+ hidden_value = comment.value + "\n" + hidden_value
+ if comment.value in FMT_SKIP:
+ hidden_value += " " + comment.value
+ if hidden_value.endswith("\n"):
+ # That happens when one of the `ignored_nodes` ended with a NEWLINE
+ # leaf (possibly followed by a DEDENT).
+ hidden_value = hidden_value[:-1]
+ first_idx: Optional[int] = None
+ for ignored in ignored_nodes:
+ index = ignored.remove()
+ if first_idx is None:
+ first_idx = index
+ assert parent is not None, "INTERNAL ERROR: fmt: on/off handling (1)"
+ assert first_idx is not None, "INTERNAL ERROR: fmt: on/off handling (2)"
+ parent.insert_child(
+ first_idx,
+ Leaf(
+ STANDALONE_COMMENT,
+ hidden_value,
+ prefix=prefix[:previous_consumed] + "\n" * comment.newlines,
+ ),
+ )
+ return True
return False
-def generate_ignored_nodes(leaf: Leaf) -> Iterator[LN]:
+def generate_ignored_nodes(leaf: Leaf, comment: ProtoComment) -> Iterator[LN]:
"""Starting from the container of `leaf`, generate all leaves until `# fmt: on`.
+ If comment is skip, returns leaf only.
Stops at the end of the block.
"""
container: Optional[LN] = container_of(leaf)
+ if comment.value in FMT_SKIP:
+ prev_sibling = leaf.prev_sibling
+ if comment.value in leaf.prefix and prev_sibling is not None:
+ leaf.prefix = leaf.prefix.replace(comment.value, "")
+ siblings = [prev_sibling]
+ while (
+ "\n" not in prev_sibling.prefix
+ and prev_sibling.prev_sibling is not None
+ ):
+ prev_sibling = prev_sibling.prev_sibling
+ siblings.insert(0, prev_sibling)
+ for sibling in siblings:
+ yield sibling
+ elif leaf.parent is not None:
+ yield leaf.parent
+ return
while container is not None and container.type != token.ENDMARKER:
if is_fmt_on(container):
return
leaf.value = ")"
-def should_split_body_explode(line: Line, opening_bracket: Leaf) -> bool:
+def should_split_line(line: Line, opening_bracket: Leaf) -> bool:
"""Should `line` be immediately split with `delimiter_split()` after RHS?"""
if not (opening_bracket.parent and opening_bracket.value in "[{("):
"""
omit: Set[LeafID] = set()
- if not line.should_explode:
+ if not line.magic_trailing_comma:
yield omit
length = 4 * line.depth
elif leaf.type in CLOSING_BRACKETS:
prev = line.leaves[index - 1] if index > 0 else None
if (
- line.should_explode
- and prev
+ prev
and prev.type == token.COMMA
and not is_one_tuple_between(
leaf.opening_bracket, leaf, line.leaves
yield omit
if (
- line.should_explode
- and prev
+ prev
and prev.type == token.COMMA
and not is_one_tuple_between(leaf.opening_bracket, leaf, line.leaves)
):
@mypyc_attr(patchable=True)
-def dump_to_file(*output: str) -> str:
+def dump_to_file(*output: str, ensure_final_newline: bool = True) -> str:
"""Dump `output` to a temporary file. Return path to the file."""
with tempfile.NamedTemporaryFile(
mode="w", prefix="blk_", suffix=".log", delete=False, encoding="utf8"
) as f:
for lines in output:
f.write(lines)
- if lines and lines[-1] != "\n":
+ if ensure_final_newline and lines and lines[-1] != "\n":
f.write("\n")
return f.name
"""Return a unified diff string between strings `a` and `b`."""
import difflib
- a_lines = [line + "\n" for line in a.splitlines()]
- b_lines = [line + "\n" for line in b.splitlines()]
- return "".join(
- difflib.unified_diff(a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5)
- )
+ a_lines = [line for line in a.splitlines(keepends=True)]
+ b_lines = [line for line in b.splitlines(keepends=True)]
+ diff_lines = []
+ for line in difflib.unified_diff(
+ a_lines, b_lines, fromfile=a_name, tofile=b_name, n=5
+ ):
+ # Work around https://bugs.python.org/issue2142
+ # See https://www.gnu.org/software/diffutils/manual/html_node/Incomplete-Lines.html
+ if line[-1] == "\n":
+ diff_lines.append(line)
+ else:
+ diff_lines.append(line + "\n")
+ diff_lines.append("\\ No newline at end of file\n")
+ return "".join(diff_lines)
def cancel(tasks: Iterable["asyncio.Task[Any]"]) -> None:
penultimate = line.leaves[-2]
last = line.leaves[-1]
- if line.should_explode:
+ if line.magic_trailing_comma:
try:
penultimate, last = last_two_except(line.leaves, omit=omit_on_explode)
except LookupError:
# unnecessary.
return True
- if line.should_explode and penultimate.type == token.COMMA:
+ if line.magic_trailing_comma and penultimate.type == token.COMMA:
# The rightmost non-omitted bracket pair is the one we want to explode on.
return True
"""
todo, done = set(), set()
for src in sources:
- src = src.resolve()
- if cache.get(src) != get_cache_info(src):
+ res_src = src.resolve()
+ if cache.get(str(res_src)) != get_cache_info(res_src):
todo.add(src)
else:
done.add(src)
cache_file = get_cache_file(mode)
try:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
- new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
+ new_cache = {
+ **cache,
+ **{str(src.resolve()): get_cache_info(src) for src in sources},
+ }
with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
pickle.dump(new_cache, f, protocol=4)
os.replace(f.name, cache_file)