X-Git-Url: https://git.madduck.net/etc/vim.git/blobdiff_plain/9afffacaa0e5ac911f9feacb916bc48473dcb117..7aa37ea0adf864baf3ef3dfbcfaf5ff1ff780250:/src/black/concurrency.py diff --git a/src/black/concurrency.py b/src/black/concurrency.py index 24f67b6..55c96b6 100644 --- a/src/black/concurrency.py +++ b/src/black/concurrency.py @@ -1,9 +1,27 @@ +""" +Formatting many files at once via multiprocessing. Contains entrypoint and utilities. + +NOTE: this module is only imported if we need to format several files at once. +""" + import asyncio import logging +import os +import signal import sys -from typing import Any, Iterable +import traceback +from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor +from multiprocessing import Manager +from pathlib import Path +from typing import Any, Iterable, Optional, Set + +from mypy_extensions import mypyc_attr +from black import WriteBack, format_file_in_place +from black.cache import Cache +from black.mode import Mode from black.output import err +from black.report import Changed, Report def maybe_install_uvloop() -> None: @@ -11,7 +29,6 @@ def maybe_install_uvloop() -> None: This is called only from command-line entry points to avoid interfering with the parent process if Black is used as a library. - """ try: import uvloop @@ -31,23 +48,14 @@ def cancel(tasks: Iterable["asyncio.Task[Any]"]) -> None: def shutdown(loop: asyncio.AbstractEventLoop) -> None: """Cancel all pending tasks on `loop`, wait for them, and close the loop.""" try: - if sys.version_info[:2] >= (3, 7): - all_tasks = asyncio.all_tasks - else: - all_tasks = asyncio.Task.all_tasks # This part is borrowed from asyncio/runners.py in Python 3.7b2. - to_cancel = [task for task in all_tasks(loop) if not task.done()] + to_cancel = [task for task in asyncio.all_tasks(loop) if not task.done()] if not to_cancel: return for task in to_cancel: task.cancel() - if sys.version_info >= (3, 7): - loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True)) - else: - loop.run_until_complete( - asyncio.gather(*to_cancel, loop=loop, return_exceptions=True) - ) + loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True)) finally: # `concurrent.futures.Future` objects cannot be cancelled once they # are already running. There might be some when the `shutdown()` happened. @@ -55,3 +63,128 @@ def shutdown(loop: asyncio.AbstractEventLoop) -> None: cf_logger = logging.getLogger("concurrent.futures") cf_logger.setLevel(logging.CRITICAL) loop.close() + + +# diff-shades depends on being to monkeypatch this function to operate. I know it's +# not ideal, but this shouldn't cause any issues ... hopefully. ~ichard26 +@mypyc_attr(patchable=True) +def reformat_many( + sources: Set[Path], + fast: bool, + write_back: WriteBack, + mode: Mode, + report: Report, + workers: Optional[int], +) -> None: + """Reformat multiple files using a ProcessPoolExecutor.""" + maybe_install_uvloop() + + executor: Executor + if workers is None: + workers = int(os.environ.get("BLACK_NUM_WORKERS", 0)) + workers = workers or os.cpu_count() or 1 + if sys.platform == "win32": + # Work around https://bugs.python.org/issue26903 + workers = min(workers, 60) + try: + executor = ProcessPoolExecutor(max_workers=workers) + except (ImportError, NotImplementedError, OSError): + # we arrive here if the underlying system does not support multi-processing + # like in AWS Lambda or Termux, in which case we gracefully fallback to + # a ThreadPoolExecutor with just a single worker (more workers would not do us + # any good due to the Global Interpreter Lock) + executor = ThreadPoolExecutor(max_workers=1) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete( + schedule_formatting( + sources=sources, + fast=fast, + write_back=write_back, + mode=mode, + report=report, + loop=loop, + executor=executor, + ) + ) + finally: + try: + shutdown(loop) + finally: + asyncio.set_event_loop(None) + if executor is not None: + executor.shutdown() + + +async def schedule_formatting( + sources: Set[Path], + fast: bool, + write_back: WriteBack, + mode: Mode, + report: "Report", + loop: asyncio.AbstractEventLoop, + executor: "Executor", +) -> None: + """Run formatting of `sources` in parallel using the provided `executor`. + + (Use ProcessPoolExecutors for actual parallelism.) + + `write_back`, `fast`, and `mode` options are passed to + :func:`format_file_in_place`. + """ + cache = Cache.read(mode) + if write_back not in (WriteBack.DIFF, WriteBack.COLOR_DIFF): + sources, cached = cache.filtered_cached(sources) + for src in sorted(cached): + report.done(src, Changed.CACHED) + if not sources: + return + + cancelled = [] + sources_to_cache = [] + lock = None + if write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF): + # For diff output, we need locks to ensure we don't interleave output + # from different processes. + manager = Manager() + lock = manager.Lock() + tasks = { + asyncio.ensure_future( + loop.run_in_executor( + executor, format_file_in_place, src, fast, mode, write_back, lock + ) + ): src + for src in sorted(sources) + } + pending = tasks.keys() + try: + loop.add_signal_handler(signal.SIGINT, cancel, pending) + loop.add_signal_handler(signal.SIGTERM, cancel, pending) + except NotImplementedError: + # There are no good alternatives for these on Windows. + pass + while pending: + done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + for task in done: + src = tasks.pop(task) + if task.cancelled(): + cancelled.append(task) + elif exc := task.exception(): + if report.verbose: + traceback.print_exception(type(exc), exc, exc.__traceback__) + report.failed(src, str(exc)) + else: + changed = Changed.YES if task.result() else Changed.NO + # If the file was written back or was successfully checked as + # well-formatted, store this information in the cache. + if write_back is WriteBack.YES or ( + write_back is WriteBack.CHECK and changed is Changed.NO + ): + sources_to_cache.append(src) + report.done(src, changed) + if cancelled: + await asyncio.gather(*cancelled, return_exceptions=True) + if sources_to_cache: + cache.write(sources_to_cache)