from multiprocessing import Manager
import os
from pathlib import Path
+import re
import tokenize
import signal
import sys
loop.add_signal_handler(signal.SIGTERM, cancel, _task_values)
await asyncio.wait(tasks.values())
cancelled = []
- report = Report(check=not write_back, quiet=quiet)
+ report = Report(check=write_back is WriteBack.NO, quiet=quiet)
for src, task in tasks.items():
if not task.done():
report.failed(src, "timed out, cancelling")
leaf.opening_bracket = opening_bracket
leaf.bracket_depth = self.depth
if self.depth == 0:
- after_delim = is_split_after_delimiter(leaf, self.previous)
- before_delim = is_split_before_delimiter(leaf, self.previous)
- if after_delim > before_delim:
- self.delimiters[id(leaf)] = after_delim
- elif before_delim > after_delim and self.previous is not None:
- self.delimiters[id(self.previous)] = before_delim
+ delim = is_split_before_delimiter(leaf, self.previous)
+ if delim and self.previous is not None:
+ self.delimiters[id(self.previous)] = delim
+ else:
+ delim = is_split_after_delimiter(leaf, self.previous)
+ if delim:
+ self.delimiters[id(leaf)] = delim
if leaf.type in OPENING_BRACKETS:
self.bracket_match[self.depth, BRACKET[leaf.type]] = leaf
self.depth += 1
if leaf.type == token.COMMA:
return COMMA_PRIORITY
- if (
- leaf.type in VARARGS
- and leaf.parent
- and leaf.parent.type in {syms.argument, syms.typedargslist}
- ):
- return MATH_PRIORITY
-
return 0
Higher numbers are higher priority.
"""
+ if (
+ leaf.type in VARARGS
+ and leaf.parent
+ and leaf.parent.type in {syms.argument, syms.typedargslist}
+ ):
+ # * and ** might also be MATH_OPERATORS but in this case they are not.
+ # Don't treat them as a delimiter.
+ return 0
+
if (
leaf.type in MATH_OPERATORS
and leaf.parent
if first_quote_pos == -1:
return # There's an internal error
+ prefix = leaf.value[:first_quote_pos]
body = leaf.value[first_quote_pos + len(orig_quote):-len(orig_quote)]
- new_body = body.replace(f"\\{orig_quote}", orig_quote).replace(
- new_quote, f"\\{new_quote}"
- )
+ unescaped_new_quote = re.compile(r"(([^\\]|^)(\\\\)*)" + new_quote)
+ escaped_orig_quote = re.compile(r"\\(\\\\)*" + orig_quote)
+ if "r" in prefix.casefold():
+ if unescaped_new_quote.search(body):
+ # There's at least one unescaped new_quote in this raw string
+ # so converting is impossible
+ return
+
+ # Do not introduce or remove backslashes in raw strings
+ new_body = body
+ else:
+ new_body = escaped_orig_quote.sub(f"\\1{orig_quote}", body)
+ new_body = unescaped_new_quote.sub(f"\\1\\\\{new_quote}", new_body)
if new_quote == '"""' and new_body[-1] == '"':
# edge case:
new_body = new_body[:-1] + '\\"'
if new_escape_count == orig_escape_count and orig_quote == '"':
return # Prefer double quotes
- prefix = leaf.value[:first_quote_pos]
leaf.value = f"{prefix}{new_quote}{new_body}{new_quote}"