return
elif len(sources) == 1:
- return_code = run_single_file_mode(
- line_length, check, fast, quiet, write_back, sources[0]
- )
+ return_code = reformat_one(sources[0], line_length, fast, quiet, write_back)
else:
- return_code = run_multi_file_mode(line_length, fast, quiet, write_back, sources)
+ loop = asyncio.get_event_loop()
+ executor = ProcessPoolExecutor(max_workers=os.cpu_count())
+ return_code = 1
+ try:
+ return_code = loop.run_until_complete(
+ schedule_formatting(
+ sources, line_length, write_back, fast, quiet, loop, executor
+ )
+ )
+ finally:
+ shutdown(loop)
ctx.exit(return_code)
-def run_single_file_mode(
- line_length: int,
- check: bool,
- fast: bool,
- quiet: bool,
- write_back: WriteBack,
- src: Path,
+def reformat_one(
+ src: Path, line_length: int, fast: bool, quiet: bool, write_back: WriteBack
) -> int:
- report = Report(check=check, quiet=quiet)
+ """Reformat a single file under `src` without spawning child processes.
+
+ If `quiet` is True, non-error messages are not output. `line_length`,
+ `write_back`, and `fast` options are passed to :func:`format_file_in_place`.
+ """
+ report = Report(check=write_back is WriteBack.NO, quiet=quiet)
try:
+ changed = Changed.NO
if not src.is_file() and str(src) == "-":
- changed = format_stdin_to_stdout(
+ if format_stdin_to_stdout(
line_length=line_length, fast=fast, write_back=write_back
- )
+ ):
+ changed = Changed.YES
else:
- changed = Changed.NO
cache: Cache = {}
if write_back != WriteBack.DIFF:
cache = read_cache()
src = src.resolve()
if src in cache and cache[src] == get_cache_info(src):
changed = Changed.CACHED
- if changed is not Changed.CACHED:
- changed = format_file_in_place(
+ if (
+ changed is not Changed.CACHED
+ and format_file_in_place(
src, line_length=line_length, fast=fast, write_back=write_back
)
+ ):
+ changed = Changed.YES
if write_back != WriteBack.DIFF and changed is not Changed.NO:
write_cache(cache, [src])
report.done(src, changed)
return report.return_code
-def run_multi_file_mode(
- line_length: int,
- fast: bool,
- quiet: bool,
- write_back: WriteBack,
- sources: List[Path],
-) -> int:
- loop = asyncio.get_event_loop()
- executor = ProcessPoolExecutor(max_workers=os.cpu_count())
- return_code = 1
- try:
- return_code = loop.run_until_complete(
- schedule_formatting(
- sources, line_length, write_back, fast, quiet, loop, executor
- )
- )
- finally:
- shutdown(loop)
- return return_code
-
-
async def schedule_formatting(
sources: List[Path],
line_length: int,
for src in sources
}
_task_values = list(tasks.values())
- loop.add_signal_handler(signal.SIGINT, cancel, _task_values)
- loop.add_signal_handler(signal.SIGTERM, cancel, _task_values)
+ try:
+ loop.add_signal_handler(signal.SIGINT, cancel, _task_values)
+ loop.add_signal_handler(signal.SIGTERM, cancel, _task_values)
+ except NotImplementedError:
+ # There are no good alternatives for these on Windows
+ pass
await asyncio.wait(_task_values)
for src, task in tasks.items():
if not task.done():
report.failed(src, str(task.exception()))
else:
formatted.append(src)
- report.done(src, task.result())
+ report.done(src, Changed.YES if task.result() else Changed.NO)
if cancelled:
await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
fast: bool,
write_back: WriteBack = WriteBack.NO,
lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
-) -> Changed:
+) -> bool:
"""Format file under `src` path. Return True if changed.
If `write_back` is True, write reformatted code back to stdout.
src_contents, line_length=line_length, fast=fast
)
except NothingChanged:
- return Changed.NO
+ return False
if write_back == write_back.YES:
with open(src, "w", encoding=src_buffer.encoding) as f:
finally:
if lock:
lock.release()
- return Changed.YES
+ return True
def format_stdin_to_stdout(
line_length: int, fast: bool, write_back: WriteBack = WriteBack.NO
-) -> Changed:
+) -> bool:
"""Format file on stdin. Return True if changed.
If `write_back` is True, write reformatted code back to stdout.
dst = src
try:
dst = format_file_contents(src, line_length=line_length, fast=fast)
- return Changed.YES
+ return True
except NothingChanged:
- return Changed.NO
+ return False
finally:
if write_back == WriteBack.YES:
bracket_match: Dict[Tuple[Depth, NodeType], Leaf] = Factory(dict)
delimiters: Dict[LeafID, Priority] = Factory(dict)
previous: Optional[Leaf] = None
+ _for_loop_variable: bool = False
+ _lambda_arguments: bool = False
def mark(self, leaf: Leaf) -> None:
"""Mark `leaf` with bracket-related metadata. Keep track of delimiters.
if leaf.type == token.COMMENT:
return
+ self.maybe_decrement_after_for_loop_variable(leaf)
+ self.maybe_decrement_after_lambda_arguments(leaf)
if leaf.type in CLOSING_BRACKETS:
self.depth -= 1
opening_bracket = self.bracket_match.pop((self.depth, leaf.type))
self.bracket_match[self.depth, BRACKET[leaf.type]] = leaf
self.depth += 1
self.previous = leaf
+ self.maybe_increment_lambda_arguments(leaf)
+ self.maybe_increment_for_loop_variable(leaf)
def any_open_brackets(self) -> bool:
"""Return True if there is an yet unmatched open bracket on the line."""
def max_delimiter_priority(self, exclude: Iterable[LeafID] = ()) -> int:
"""Return the highest priority of a delimiter found on the line.
- Values are consistent with what `is_delimiter()` returns.
+ Values are consistent with what `is_split_*_delimiter()` return.
Raises ValueError on no delimiters.
"""
return max(v for k, v in self.delimiters.items() if k not in exclude)
+ def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
+ """In a for loop, or comprehension, the variables are often unpacks.
+
+ To avoid splitting on the comma in this situation, increase the depth of
+ tokens between `for` and `in`.
+ """
+ if leaf.type == token.NAME and leaf.value == "for":
+ self.depth += 1
+ self._for_loop_variable = True
+ return True
+
+ return False
+
+ def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
+ """See `maybe_increment_for_loop_variable` above for explanation."""
+ if self._for_loop_variable and leaf.type == token.NAME and leaf.value == "in":
+ self.depth -= 1
+ self._for_loop_variable = False
+ return True
+
+ return False
+
+ def maybe_increment_lambda_arguments(self, leaf: Leaf) -> bool:
+ """In a lambda expression, there might be more than one argument.
+
+ To avoid splitting on the comma in this situation, increase the depth of
+ tokens between `lambda` and `:`.
+ """
+ if leaf.type == token.NAME and leaf.value == "lambda":
+ self.depth += 1
+ self._lambda_arguments = True
+ return True
+
+ return False
+
+ def maybe_decrement_after_lambda_arguments(self, leaf: Leaf) -> bool:
+ """See `maybe_increment_lambda_arguments` above for explanation."""
+ if self._lambda_arguments and leaf.type == token.COLON:
+ self.depth -= 1
+ self._lambda_arguments = False
+ return True
+
+ return False
+
@dataclass
class Line:
comments: List[Tuple[Index, Leaf]] = Factory(list)
bracket_tracker: BracketTracker = Factory(BracketTracker)
inside_brackets: bool = False
- has_for: bool = False
- _for_loop_variable: bool = False
def append(self, leaf: Leaf, preformatted: bool = False) -> None:
"""Add a new `leaf` to the end of the line.
# imports, for which we only preserve newlines.
leaf.prefix += whitespace(leaf)
if self.inside_brackets or not preformatted:
- self.maybe_decrement_after_for_loop_variable(leaf)
self.bracket_tracker.mark(leaf)
self.maybe_remove_trailing_comma(leaf)
- self.maybe_increment_for_loop_variable(leaf)
if not self.append_comment(leaf):
self.leaves.append(leaf)
return False
- def maybe_increment_for_loop_variable(self, leaf: Leaf) -> bool:
- """In a for loop, or comprehension, the variables are often unpacks.
-
- To avoid splitting on the comma in this situation, increase the depth of
- tokens between `for` and `in`.
- """
- if leaf.type == token.NAME and leaf.value == "for":
- self.has_for = True
- self.bracket_tracker.depth += 1
- self._for_loop_variable = True
- return True
-
- return False
-
- def maybe_decrement_after_for_loop_variable(self, leaf: Leaf) -> bool:
- """See `maybe_increment_for_loop_variable` above for explanation."""
- if self._for_loop_variable and leaf.type == token.NAME and leaf.value == "in":
- self.bracket_tracker.depth -= 1
- self._for_loop_variable = False
- return True
-
- return False
-
def append_comment(self, comment: Leaf) -> bool:
"""Add an inline or standalone comment to the line."""
if (
if p.type in {syms.parameters, syms.arglist}:
# untyped function signatures or calls
- if t == token.RPAR:
- return NO
-
if not prev or prev.type != token.COMMA:
return NO
elif p.type == syms.varargslist:
# lambdas
- if t == token.RPAR:
- return NO
-
if prev and prev.type != token.COMMA:
return NO
# dots, but not the first one.
return NO
- elif (
- p.type == syms.listmaker
- or p.type == syms.testlist_gexp
- or p.type == syms.subscriptlist
- ):
- # list interior, including unpacking
- if not prev:
- return NO
-
elif p.type == syms.dictsetmaker:
- # dict and set interior, including unpacking
- if not prev:
- return NO
-
- if prev.type == token.DOUBLESTAR:
+ # dict unpacking
+ if prev and prev.type == token.DOUBLESTAR:
return NO
elif p.type in {syms.factor, syms.star_expr}:
return 0
-def is_delimiter(leaf: Leaf, previous: Leaf = None) -> int:
- """Return the priority of the `leaf` delimiter. Return 0 if not delimiter.
-
- Higher numbers are higher priority.
- """
- return max(
- is_split_before_delimiter(leaf, previous),
- is_split_after_delimiter(leaf, previous),
- )
-
-
def generate_comments(leaf: Leaf) -> Iterator[Leaf]:
"""Clean the prefix of the `leaf` and generate comments from it, if any.
def max_delimiter_priority_in_atom(node: LN) -> int:
+ """Return maximum delimiter priority inside `node`.
+
+ This is specific to atoms with contents contained in a pair of parentheses.
+ If `node` isn't an atom or there are no enclosing parentheses, returns 0.
+ """
if node.type != syms.atom:
return 0