from blib2to3.pgen2 import driver, token
from blib2to3.pgen2.parse import ParseError
-__version__ = "18.4a2"
+__version__ = "18.4a3"
DEFAULT_LINE_LENGTH = 88
+
# types
syms = pygram.python_symbols
FileContent = str
write_back = WriteBack.DIFF
else:
write_back = WriteBack.YES
+ report = Report(check=check, quiet=quiet)
if len(sources) == 0:
ctx.exit(0)
return
elif len(sources) == 1:
- return_code = reformat_one(
- sources[0], line_length, fast, quiet, write_back, check
- )
+ reformat_one(sources[0], line_length, fast, write_back, report)
else:
loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=os.cpu_count())
- return_code = 1
try:
- return_code = loop.run_until_complete(
+ loop.run_until_complete(
schedule_formatting(
- sources, line_length, write_back, fast, quiet, loop, executor, check
+ sources, line_length, fast, write_back, report, loop, executor
)
)
finally:
shutdown(loop)
- ctx.exit(return_code)
+ if not quiet:
+ out("All done! ✨ 🍰 ✨")
+ click.echo(str(report))
+ ctx.exit(report.return_code)
def reformat_one(
- src: Path,
- line_length: int,
- fast: bool,
- quiet: bool,
- write_back: WriteBack,
- check: bool,
-) -> int:
+ src: Path, line_length: int, fast: bool, write_back: WriteBack, report: "Report"
+) -> None:
"""Reformat a single file under `src` without spawning child processes.
If `quiet` is True, non-error messages are not output. `line_length`,
`write_back`, and `fast` options are passed to :func:`format_file_in_place`.
"""
- report = Report(check=check, quiet=quiet)
try:
changed = Changed.NO
if not src.is_file() and str(src) == "-":
else:
cache: Cache = {}
if write_back != WriteBack.DIFF:
- cache = read_cache()
+ cache = read_cache(line_length)
src = src.resolve()
if src in cache and cache[src] == get_cache_info(src):
changed = Changed.CACHED
):
changed = Changed.YES
if write_back != WriteBack.DIFF and changed is not Changed.NO:
- write_cache(cache, [src])
+ write_cache(cache, [src], line_length)
report.done(src, changed)
except Exception as exc:
report.failed(src, str(exc))
- return report.return_code
async def schedule_formatting(
sources: List[Path],
line_length: int,
- write_back: WriteBack,
fast: bool,
- quiet: bool,
+ write_back: WriteBack,
+ report: "Report",
loop: BaseEventLoop,
executor: Executor,
- check: bool,
-) -> int:
+) -> None:
"""Run formatting of `sources` in parallel using the provided `executor`.
(Use ProcessPoolExecutors for actual parallelism.)
`line_length`, `write_back`, and `fast` options are passed to
:func:`format_file_in_place`.
"""
- report = Report(check=check, quiet=quiet)
cache: Cache = {}
if write_back != WriteBack.DIFF:
- cache = read_cache()
+ cache = read_cache(line_length)
sources, cached = filter_cached(cache, sources)
for src in cached:
report.done(src, Changed.CACHED)
if cancelled:
await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
- elif not quiet:
- out("All done! ✨ 🍰 ✨")
- if not quiet:
- click.echo(str(report))
-
if write_back != WriteBack.DIFF and formatted:
- write_cache(cache, formatted)
-
- return report.return_code
+ write_cache(cache, formatted, line_length)
def format_file_in_place(
with open(src, "w", encoding=src_buffer.encoding) as f:
f.write(dst_contents)
elif write_back == write_back.DIFF:
- src_name = f"{src.name} (original)"
- dst_name = f"{src.name} (formatted)"
+ src_name = f"{src} (original)"
+ dst_name = f"{src} (formatted)"
diff_contents = diff(src_contents, dst_contents, src_name, dst_name)
if lock:
lock.acquire()
GRAMMARS = [
pygram.python_grammar_no_print_statement_no_exec_statement,
pygram.python_grammar_no_print_statement,
- pygram.python_grammar_no_exec_statement,
pygram.python_grammar,
]
}
COMPREHENSION_PRIORITY = 20
COMMA_PRIORITY = 10
+TERNARY_PRIORITY = 7
LOGIC_PRIORITY = 5
STRING_PRIORITY = 4
COMPARATOR_PRIORITY = 3
# Don't insert empty lines before the first line in the file.
return 0, 0
- if self.previous_line and self.previous_line.is_decorator:
- # Don't insert empty lines between decorators.
+ if self.previous_line.is_decorator:
+ return 0, 0
+
+ if (
+ self.previous_line.is_comment
+ and self.previous_line.depth == current_line.depth
+ and before == 0
+ ):
return 0, 0
newlines = 2
newlines -= 1
return newlines, 0
- if current_line.is_flow_control:
- return before, 1
-
if (
self.previous_line
and self.previous_line.is_import
):
return (before or 1), 0
- if (
- self.previous_line
- and self.previous_line.is_yield
- and (not current_line.is_yield or depth != self.previous_line.depth)
- ):
- return (before or 1), 0
-
return before, 0
def visit_DEDENT(self, node: Node) -> Iterator[Line]:
"""Decrease indentation level, maybe yield a line."""
- # DEDENT has no value. Additionally, in blib2to3 it never holds comments.
+ # The current line might still wait for trailing comments. At DEDENT time
+ # there won't be any (they would be prefixes on the preceding NEWLINE).
+ # Emit the line then.
+ yield from self.line()
+
+ # While DEDENT has no value, its prefix may contain standalone comments
+ # that belong to the current indentation level. Get 'em.
+ yield from self.visit_default(node)
+
+ # Finally, emit the dedent.
yield from self.line(-1)
def visit_stmt(
):
return COMPREHENSION_PRIORITY
+ if (
+ leaf.type == token.NAME
+ and leaf.value in {"if", "else"}
+ and leaf.parent
+ and leaf.parent.type == syms.test
+ ):
+ return TERNARY_PRIORITY
+
if leaf.type == token.NAME and leaf.value in LOGIC_OPERATORS and leaf.parent:
return LOGIC_PRIORITY
split_funcs: List[SplitFunc]
if line.is_def:
split_funcs = [left_hand_split]
+ elif line.is_import:
+ split_funcs = [explode_split]
elif line.inside_brackets:
split_funcs = [delimiter_split, standalone_comment_split, right_hand_split]
else:
yield current_line
+def explode_split(
+ line: Line, py36: bool = False, omit: Collection[LeafID] = ()
+) -> Iterator[Line]:
+ """Split by rightmost bracket and immediately split contents by a delimiter."""
+ new_lines = list(right_hand_split(line, py36, omit))
+ if len(new_lines) != 3:
+ yield from new_lines
+ return
+
+ yield new_lines[0]
+
+ try:
+ yield from delimiter_split(new_lines[1], py36)
+ except CannotSplit:
+ yield new_lines[1]
+
+ yield new_lines[2]
+
+
def is_import(leaf: Leaf) -> bool:
"""Return True if the given leaf starts an import statement."""
p = leaf.parent
CACHE_DIR = Path(user_cache_dir("black", version=__version__))
-CACHE_FILE = CACHE_DIR / "cache.pickle"
-def read_cache() -> Cache:
+def get_cache_file(line_length: int) -> Path:
+ return CACHE_DIR / f"cache.{line_length}.pickle"
+
+
+def read_cache(line_length: int) -> Cache:
"""Read the cache if it exists and is well formed.
If it is not well formed, the call to write_cache later should resolve the issue.
"""
- if not CACHE_FILE.exists():
+ cache_file = get_cache_file(line_length)
+ if not cache_file.exists():
return {}
- with CACHE_FILE.open("rb") as fobj:
+ with cache_file.open("rb") as fobj:
try:
cache: Cache = pickle.load(fobj)
except pickle.UnpicklingError:
return todo, done
-def write_cache(cache: Cache, sources: List[Path]) -> None:
+def write_cache(cache: Cache, sources: List[Path], line_length: int) -> None:
"""Update the cache file."""
+ cache_file = get_cache_file(line_length)
try:
if not CACHE_DIR.exists():
CACHE_DIR.mkdir(parents=True)
new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
- with CACHE_FILE.open("wb") as fobj:
+ with cache_file.open("wb") as fobj:
pickle.dump(new_cache, fobj, protocol=pickle.HIGHEST_PROTOCOL)
except OSError:
pass