]> git.madduck.net Git - etc/vim.git/blob - src/black/concurrency.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

10e288f4f93573ce6f6e599eb0da808f406b6de5
[etc/vim.git] / src / black / concurrency.py
1 """
2 Formatting many files at once via multiprocessing. Contains entrypoint and utilities.
3
4 NOTE: this module is only imported if we need to format several files at once.
5 """
6
7 import asyncio
8 import logging
9 import os
10 import signal
11 import sys
12 from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
13 from multiprocessing import Manager
14 from pathlib import Path
15 from typing import Any, Iterable, Optional, Set
16
17 from mypy_extensions import mypyc_attr
18
19 from black import WriteBack, format_file_in_place
20 from black.cache import Cache, filter_cached, read_cache, write_cache
21 from black.mode import Mode
22 from black.output import err
23 from black.report import Changed, Report
24
25
26 def maybe_install_uvloop() -> None:
27     """If our environment has uvloop installed we use it.
28
29     This is called only from command-line entry points to avoid
30     interfering with the parent process if Black is used as a library.
31     """
32     try:
33         import uvloop
34
35         uvloop.install()
36     except ImportError:
37         pass
38
39
40 def cancel(tasks: Iterable["asyncio.Task[Any]"]) -> None:
41     """asyncio signal handler that cancels all `tasks` and reports to stderr."""
42     err("Aborted!")
43     for task in tasks:
44         task.cancel()
45
46
47 def shutdown(loop: asyncio.AbstractEventLoop) -> None:
48     """Cancel all pending tasks on `loop`, wait for them, and close the loop."""
49     try:
50         if sys.version_info[:2] >= (3, 7):
51             all_tasks = asyncio.all_tasks
52         else:
53             all_tasks = asyncio.Task.all_tasks
54         # This part is borrowed from asyncio/runners.py in Python 3.7b2.
55         to_cancel = [task for task in all_tasks(loop) if not task.done()]
56         if not to_cancel:
57             return
58
59         for task in to_cancel:
60             task.cancel()
61         loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True))
62     finally:
63         # `concurrent.futures.Future` objects cannot be cancelled once they
64         # are already running. There might be some when the `shutdown()` happened.
65         # Silence their logger's spew about the event loop being closed.
66         cf_logger = logging.getLogger("concurrent.futures")
67         cf_logger.setLevel(logging.CRITICAL)
68         loop.close()
69
70
71 # diff-shades depends on being to monkeypatch this function to operate. I know it's
72 # not ideal, but this shouldn't cause any issues ... hopefully. ~ichard26
73 @mypyc_attr(patchable=True)
74 def reformat_many(
75     sources: Set[Path],
76     fast: bool,
77     write_back: WriteBack,
78     mode: Mode,
79     report: Report,
80     workers: Optional[int],
81 ) -> None:
82     """Reformat multiple files using a ProcessPoolExecutor."""
83     maybe_install_uvloop()
84
85     executor: Executor
86     if workers is None:
87         workers = os.cpu_count() or 1
88     if sys.platform == "win32":
89         # Work around https://bugs.python.org/issue26903
90         workers = min(workers, 60)
91     try:
92         executor = ProcessPoolExecutor(max_workers=workers)
93     except (ImportError, NotImplementedError, OSError):
94         # we arrive here if the underlying system does not support multi-processing
95         # like in AWS Lambda or Termux, in which case we gracefully fallback to
96         # a ThreadPoolExecutor with just a single worker (more workers would not do us
97         # any good due to the Global Interpreter Lock)
98         executor = ThreadPoolExecutor(max_workers=1)
99
100     loop = asyncio.new_event_loop()
101     asyncio.set_event_loop(loop)
102     try:
103         loop.run_until_complete(
104             schedule_formatting(
105                 sources=sources,
106                 fast=fast,
107                 write_back=write_back,
108                 mode=mode,
109                 report=report,
110                 loop=loop,
111                 executor=executor,
112             )
113         )
114     finally:
115         try:
116             shutdown(loop)
117         finally:
118             asyncio.set_event_loop(None)
119         if executor is not None:
120             executor.shutdown()
121
122
123 async def schedule_formatting(
124     sources: Set[Path],
125     fast: bool,
126     write_back: WriteBack,
127     mode: Mode,
128     report: "Report",
129     loop: asyncio.AbstractEventLoop,
130     executor: "Executor",
131 ) -> None:
132     """Run formatting of `sources` in parallel using the provided `executor`.
133
134     (Use ProcessPoolExecutors for actual parallelism.)
135
136     `write_back`, `fast`, and `mode` options are passed to
137     :func:`format_file_in_place`.
138     """
139     cache: Cache = {}
140     if write_back not in (WriteBack.DIFF, WriteBack.COLOR_DIFF):
141         cache = read_cache(mode)
142         sources, cached = filter_cached(cache, sources)
143         for src in sorted(cached):
144             report.done(src, Changed.CACHED)
145     if not sources:
146         return
147
148     cancelled = []
149     sources_to_cache = []
150     lock = None
151     if write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF):
152         # For diff output, we need locks to ensure we don't interleave output
153         # from different processes.
154         manager = Manager()
155         lock = manager.Lock()
156     tasks = {
157         asyncio.ensure_future(
158             loop.run_in_executor(
159                 executor, format_file_in_place, src, fast, mode, write_back, lock
160             )
161         ): src
162         for src in sorted(sources)
163     }
164     pending = tasks.keys()
165     try:
166         loop.add_signal_handler(signal.SIGINT, cancel, pending)
167         loop.add_signal_handler(signal.SIGTERM, cancel, pending)
168     except NotImplementedError:
169         # There are no good alternatives for these on Windows.
170         pass
171     while pending:
172         done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
173         for task in done:
174             src = tasks.pop(task)
175             if task.cancelled():
176                 cancelled.append(task)
177             elif task.exception():
178                 report.failed(src, str(task.exception()))
179             else:
180                 changed = Changed.YES if task.result() else Changed.NO
181                 # If the file was written back or was successfully checked as
182                 # well-formatted, store this information in the cache.
183                 if write_back is WriteBack.YES or (
184                     write_back is WriteBack.CHECK and changed is Changed.NO
185                 ):
186                     sources_to_cache.append(src)
187                 report.done(src, changed)
188     if cancelled:
189         await asyncio.gather(*cancelled, return_exceptions=True)
190     if sources_to_cache:
191         write_cache(cache, sources_to_cache, mode)