X-Git-Url: https://git.madduck.net/etc/vim.git/blobdiff_plain/e269f44b25737360e0dc65379f889dfa931dc68a..bb588073ab286a9f1f8d839ab2cebe13011dd22c:/src/black/concurrency.py?ds=sidebyside diff --git a/src/black/concurrency.py b/src/black/concurrency.py index d77ea40..55c96b6 100644 --- a/src/black/concurrency.py +++ b/src/black/concurrency.py @@ -6,8 +6,10 @@ NOTE: this module is only imported if we need to format several files at once. import asyncio import logging +import os import signal import sys +import traceback from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor from multiprocessing import Manager from pathlib import Path @@ -15,8 +17,8 @@ from typing import Any, Iterable, Optional, Set from mypy_extensions import mypyc_attr -from black import DEFAULT_WORKERS, WriteBack, format_file_in_place -from black.cache import Cache, filter_cached, read_cache, write_cache +from black import WriteBack, format_file_in_place +from black.cache import Cache from black.mode import Mode from black.output import err from black.report import Changed, Report @@ -46,23 +48,14 @@ def cancel(tasks: Iterable["asyncio.Task[Any]"]) -> None: def shutdown(loop: asyncio.AbstractEventLoop) -> None: """Cancel all pending tasks on `loop`, wait for them, and close the loop.""" try: - if sys.version_info[:2] >= (3, 7): - all_tasks = asyncio.all_tasks - else: - all_tasks = asyncio.Task.all_tasks # This part is borrowed from asyncio/runners.py in Python 3.7b2. - to_cancel = [task for task in all_tasks(loop) if not task.done()] + to_cancel = [task for task in asyncio.all_tasks(loop) if not task.done()] if not to_cancel: return for task in to_cancel: task.cancel() - if sys.version_info >= (3, 7): - loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True)) - else: - loop.run_until_complete( - asyncio.gather(*to_cancel, loop=loop, return_exceptions=True) - ) + loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True)) finally: # `concurrent.futures.Future` objects cannot be cancelled once they # are already running. There might be some when the `shutdown()` happened. @@ -87,13 +80,14 @@ def reformat_many( maybe_install_uvloop() executor: Executor - worker_count = workers if workers is not None else DEFAULT_WORKERS + if workers is None: + workers = int(os.environ.get("BLACK_NUM_WORKERS", 0)) + workers = workers or os.cpu_count() or 1 if sys.platform == "win32": # Work around https://bugs.python.org/issue26903 - assert worker_count is not None - worker_count = min(worker_count, 60) + workers = min(workers, 60) try: - executor = ProcessPoolExecutor(max_workers=worker_count) + executor = ProcessPoolExecutor(max_workers=workers) except (ImportError, NotImplementedError, OSError): # we arrive here if the underlying system does not support multi-processing # like in AWS Lambda or Termux, in which case we gracefully fallback to @@ -140,10 +134,9 @@ async def schedule_formatting( `write_back`, `fast`, and `mode` options are passed to :func:`format_file_in_place`. """ - cache: Cache = {} + cache = Cache.read(mode) if write_back not in (WriteBack.DIFF, WriteBack.COLOR_DIFF): - cache = read_cache(mode) - sources, cached = filter_cached(cache, sources) + sources, cached = cache.filtered_cached(sources) for src in sorted(cached): report.done(src, Changed.CACHED) if not sources: @@ -178,8 +171,10 @@ async def schedule_formatting( src = tasks.pop(task) if task.cancelled(): cancelled.append(task) - elif task.exception(): - report.failed(src, str(task.exception())) + elif exc := task.exception(): + if report.verbose: + traceback.print_exception(type(exc), exc, exc.__traceback__) + report.failed(src, str(exc)) else: changed = Changed.YES if task.result() else Changed.NO # If the file was written back or was successfully checked as @@ -190,9 +185,6 @@ async def schedule_formatting( sources_to_cache.append(src) report.done(src, changed) if cancelled: - if sys.version_info >= (3, 7): - await asyncio.gather(*cancelled, return_exceptions=True) - else: - await asyncio.gather(*cancelled, loop=loop, return_exceptions=True) + await asyncio.gather(*cancelled, return_exceptions=True) if sources_to_cache: - write_cache(cache, sources_to_cache, mode) + cache.write(sources_to_cache)