From ba61bfe3865d8a8fc29abe7f8a94740b618e80ba Mon Sep 17 00:00:00 2001 From: =?utf8?q?=C5=81ukasz=20Langa?= Date: Fri, 30 Mar 2018 19:31:05 -0700 Subject: [PATCH] Graceful shutdown in case of cancellation --- black.py | 43 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/black.py b/black.py index 6499b22..82fe5d1 100644 --- a/black.py +++ b/black.py @@ -5,9 +5,11 @@ from asyncio.base_events import BaseEventLoop from concurrent.futures import Executor, ProcessPoolExecutor from functools import partial, wraps import keyword +import logging import os from pathlib import Path import tokenize +import signal import sys from typing import ( Callable, @@ -167,7 +169,7 @@ def main( ) ) finally: - loop.close() + shutdown(loop) ctx.exit(return_code) @@ -192,6 +194,9 @@ async def schedule_formatting( ) for src in sources } + _task_values = list(tasks.values()) + loop.add_signal_handler(signal.SIGINT, cancel, _task_values) + loop.add_signal_handler(signal.SIGTERM, cancel, _task_values) await asyncio.wait(tasks.values()) cancelled = [] report = Report(check=not write_back) @@ -200,13 +205,16 @@ async def schedule_formatting( report.failed(src, 'timed out, cancelling') task.cancel() cancelled.append(task) + elif task.cancelled(): + cancelled.append(task) elif task.exception(): report.failed(src, str(task.exception())) else: report.done(src, task.result()) if cancelled: - await asyncio.wait(cancelled, timeout=2) - out('All done! ✨ 🍰 ✨') + await asyncio.gather(*cancelled, loop=loop, return_exceptions=True) + else: + out('All done! ✨ 🍰 ✨') click.echo(str(report)) return report.return_code @@ -1986,5 +1994,34 @@ def diff(a: str, b: str, a_name: str, b_name: str) -> str: ) +def cancel(tasks: List[asyncio.Task]) -> None: + """asyncio signal handler that cancels all `tasks` and reports to stderr.""" + err("Aborted!") + for task in tasks: + task.cancel() + + +def shutdown(loop: BaseEventLoop) -> None: + """Cancel all pending tasks on `loop`, wait for them, and close the loop.""" + try: + # This part is borrowed from asyncio/runners.py in Python 3.7b2. + to_cancel = [task for task in asyncio.Task.all_tasks(loop) if not task.done()] + if not to_cancel: + return + + for task in to_cancel: + task.cancel() + loop.run_until_complete( + asyncio.gather(*to_cancel, loop=loop, return_exceptions=True) + ) + finally: + # `concurrent.futures.Future` objects cannot be cancelled once they + # are already running. There might be some when the `shutdown()` happened. + # Silence their logger's spew about the event loop being closed. + cf_logger = logging.getLogger("concurrent.futures") + cf_logger.setLevel(logging.CRITICAL) + loop.close() + + if __name__ == '__main__': main() -- 2.39.5