import asyncio import logging import sys from typing import Any, Iterable from black.output import err def maybe_install_uvloop() -> None: """If our environment has uvloop installed we use it. This is called only from command-line entry points to avoid interfering with the parent process if Black is used as a library. """ try: import uvloop uvloop.install() except ImportError: pass def cancel(tasks: Iterable["asyncio.Task[Any]"]) -> None: """asyncio signal handler that cancels all `tasks` and reports to stderr.""" err("Aborted!") for task in tasks: task.cancel() def shutdown(loop: asyncio.AbstractEventLoop) -> None: """Cancel all pending tasks on `loop`, wait for them, and close the loop.""" try: if sys.version_info[:2] >= (3, 7): all_tasks = asyncio.all_tasks else: all_tasks = asyncio.Task.all_tasks # This part is borrowed from asyncio/runners.py in Python 3.7b2. to_cancel = [task for task in all_tasks(loop) if not task.done()] if not to_cancel: return for task in to_cancel: task.cancel() if sys.version_info >= (3, 7): loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True)) else: loop.run_until_complete( asyncio.gather(*to_cancel, loop=loop, return_exceptions=True) ) finally: # `concurrent.futures.Future` objects cannot be cancelled once they # are already running. There might be some when the `shutdown()` happened. # Silence their logger's spew about the event loop being closed. cf_logger = logging.getLogger("concurrent.futures") cf_logger.setLevel(logging.CRITICAL) loop.close()