import asyncio
import logging
+import os
import signal
import sys
+import traceback
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import Manager
from pathlib import Path
from mypy_extensions import mypyc_attr
-from black import DEFAULT_WORKERS, WriteBack, format_file_in_place
-from black.cache import Cache, filter_cached, read_cache, write_cache
+from black import WriteBack, format_file_in_place
+from black.cache import Cache
from black.mode import Mode
from black.output import err
from black.report import Changed, Report
def shutdown(loop: asyncio.AbstractEventLoop) -> None:
"""Cancel all pending tasks on `loop`, wait for them, and close the loop."""
try:
- if sys.version_info[:2] >= (3, 7):
- all_tasks = asyncio.all_tasks
- else:
- all_tasks = asyncio.Task.all_tasks
# This part is borrowed from asyncio/runners.py in Python 3.7b2.
- to_cancel = [task for task in all_tasks(loop) if not task.done()]
+ to_cancel = [task for task in asyncio.all_tasks(loop) if not task.done()]
if not to_cancel:
return
for task in to_cancel:
task.cancel()
- if sys.version_info >= (3, 7):
- loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True))
- else:
- loop.run_until_complete(
- asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
- )
+ loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True))
finally:
# `concurrent.futures.Future` objects cannot be cancelled once they
# are already running. There might be some when the `shutdown()` happened.
maybe_install_uvloop()
executor: Executor
- worker_count = workers if workers is not None else DEFAULT_WORKERS
+ if workers is None:
+ workers = int(os.environ.get("BLACK_NUM_WORKERS", 0))
+ workers = workers or os.cpu_count() or 1
if sys.platform == "win32":
# Work around https://bugs.python.org/issue26903
- assert worker_count is not None
- worker_count = min(worker_count, 60)
+ workers = min(workers, 60)
try:
- executor = ProcessPoolExecutor(max_workers=worker_count)
+ executor = ProcessPoolExecutor(max_workers=workers)
except (ImportError, NotImplementedError, OSError):
# we arrive here if the underlying system does not support multi-processing
# like in AWS Lambda or Termux, in which case we gracefully fallback to
`write_back`, `fast`, and `mode` options are passed to
:func:`format_file_in_place`.
"""
- cache: Cache = {}
+ cache = Cache.read(mode)
if write_back not in (WriteBack.DIFF, WriteBack.COLOR_DIFF):
- cache = read_cache(mode)
- sources, cached = filter_cached(cache, sources)
+ sources, cached = cache.filtered_cached(sources)
for src in sorted(cached):
report.done(src, Changed.CACHED)
if not sources:
src = tasks.pop(task)
if task.cancelled():
cancelled.append(task)
- elif task.exception():
- report.failed(src, str(task.exception()))
+ elif exc := task.exception():
+ if report.verbose:
+ traceback.print_exception(type(exc), exc, exc.__traceback__)
+ report.failed(src, str(exc))
else:
changed = Changed.YES if task.result() else Changed.NO
# If the file was written back or was successfully checked as
sources_to_cache.append(src)
report.done(src, changed)
if cancelled:
- if sys.version_info >= (3, 7):
- await asyncio.gather(*cancelled, return_exceptions=True)
- else:
- await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
+ await asyncio.gather(*cancelled, return_exceptions=True)
if sources_to_cache:
- write_cache(cache, sources_to_cache, mode)
+ cache.write(sources_to_cache)