+import asyncio
+import logging
+import sys
+from typing import Any, Iterable
+
+from black.output import err
+
+
+def maybe_install_uvloop() -> None:
+ """If our environment has uvloop installed we use it.
+
+ This is called only from command-line entry points to avoid
+ interfering with the parent process if Black is used as a library.
+
+ """
+ try:
+ import uvloop
+
+ uvloop.install()
+ except ImportError:
+ pass
+
+
+def cancel(tasks: Iterable["asyncio.Task[Any]"]) -> None:
+ """asyncio signal handler that cancels all `tasks` and reports to stderr."""
+ err("Aborted!")
+ for task in tasks:
+ task.cancel()
+
+
+def shutdown(loop: asyncio.AbstractEventLoop) -> None:
+ """Cancel all pending tasks on `loop`, wait for them, and close the loop."""
+ try:
+ if sys.version_info[:2] >= (3, 7):
+ all_tasks = asyncio.all_tasks
+ else:
+ all_tasks = asyncio.Task.all_tasks
+ # This part is borrowed from asyncio/runners.py in Python 3.7b2.
+ to_cancel = [task for task in all_tasks(loop) if not task.done()]
+ if not to_cancel:
+ return
+
+ for task in to_cancel:
+ task.cancel()
+ loop.run_until_complete(
+ asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
+ )
+ finally:
+ # `concurrent.futures.Future` objects cannot be cancelled once they
+ # are already running. There might be some when the `shutdown()` happened.
+ # Silence their logger's spew about the event loop being closed.
+ cf_logger = logging.getLogger("concurrent.futures")
+ cf_logger.setLevel(logging.CRITICAL)
+ loop.close()