]> git.madduck.net Git - etc/vim.git/blob - src/black/cache.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Fix cache versioning when BLACK_CACHE_DIR is set (#3937)
[etc/vim.git] / src / black / cache.py
1 """Caching of formatted files with feature-based invalidation."""
2 import hashlib
3 import os
4 import pickle
5 import sys
6 import tempfile
7 from dataclasses import dataclass, field
8 from pathlib import Path
9 from typing import Dict, Iterable, NamedTuple, Set, Tuple
10
11 from platformdirs import user_cache_dir
12
13 from _black_version import version as __version__
14 from black.mode import Mode
15
16 if sys.version_info >= (3, 11):
17     from typing import Self
18 else:
19     from typing_extensions import Self
20
21
22 class FileData(NamedTuple):
23     st_mtime: float
24     st_size: int
25     hash: str
26
27
28 def get_cache_dir() -> Path:
29     """Get the cache directory used by black.
30
31     Users can customize this directory on all systems using `BLACK_CACHE_DIR`
32     environment variable. By default, the cache directory is the user cache directory
33     under the black application.
34
35     This result is immediately set to a constant `black.cache.CACHE_DIR` as to avoid
36     repeated calls.
37     """
38     # NOTE: Function mostly exists as a clean way to test getting the cache directory.
39     default_cache_dir = user_cache_dir("black")
40     cache_dir = Path(os.environ.get("BLACK_CACHE_DIR", default_cache_dir))
41     cache_dir = cache_dir / __version__
42     return cache_dir
43
44
45 CACHE_DIR = get_cache_dir()
46
47
48 def get_cache_file(mode: Mode) -> Path:
49     return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
50
51
52 @dataclass
53 class Cache:
54     mode: Mode
55     cache_file: Path
56     file_data: Dict[str, FileData] = field(default_factory=dict)
57
58     @classmethod
59     def read(cls, mode: Mode) -> Self:
60         """Read the cache if it exists and is well formed.
61
62         If it is not well formed, the call to write later should
63         resolve the issue.
64         """
65         cache_file = get_cache_file(mode)
66         if not cache_file.exists():
67             return cls(mode, cache_file)
68
69         with cache_file.open("rb") as fobj:
70             try:
71                 data: Dict[str, Tuple[float, int, str]] = pickle.load(fobj)
72                 file_data = {k: FileData(*v) for k, v in data.items()}
73             except (pickle.UnpicklingError, ValueError, IndexError):
74                 return cls(mode, cache_file)
75
76         return cls(mode, cache_file, file_data)
77
78     @staticmethod
79     def hash_digest(path: Path) -> str:
80         """Return hash digest for path."""
81
82         data = path.read_bytes()
83         return hashlib.sha256(data).hexdigest()
84
85     @staticmethod
86     def get_file_data(path: Path) -> FileData:
87         """Return file data for path."""
88
89         stat = path.stat()
90         hash = Cache.hash_digest(path)
91         return FileData(stat.st_mtime, stat.st_size, hash)
92
93     def is_changed(self, source: Path) -> bool:
94         """Check if source has changed compared to cached version."""
95         res_src = source.resolve()
96         old = self.file_data.get(str(res_src))
97         if old is None:
98             return True
99
100         st = res_src.stat()
101         if st.st_size != old.st_size:
102             return True
103         if int(st.st_mtime) != int(old.st_mtime):
104             new_hash = Cache.hash_digest(res_src)
105             if new_hash != old.hash:
106                 return True
107         return False
108
109     def filtered_cached(self, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
110         """Split an iterable of paths in `sources` into two sets.
111
112         The first contains paths of files that modified on disk or are not in the
113         cache. The other contains paths to non-modified files.
114         """
115         changed: Set[Path] = set()
116         done: Set[Path] = set()
117         for src in sources:
118             if self.is_changed(src):
119                 changed.add(src)
120             else:
121                 done.add(src)
122         return changed, done
123
124     def write(self, sources: Iterable[Path]) -> None:
125         """Update the cache file data and write a new cache file."""
126         self.file_data.update(
127             **{str(src.resolve()): Cache.get_file_data(src) for src in sources}
128         )
129         try:
130             CACHE_DIR.mkdir(parents=True, exist_ok=True)
131             with tempfile.NamedTemporaryFile(
132                 dir=str(self.cache_file.parent), delete=False
133             ) as f:
134                 # We store raw tuples in the cache because pickling NamedTuples
135                 # doesn't work with mypyc on Python 3.8, and because it's faster.
136                 data: Dict[str, Tuple[float, int, str]] = {
137                     k: (*v,) for k, v in self.file_data.items()
138                 }
139                 pickle.dump(data, f, protocol=4)
140             os.replace(f.name, self.cache_file)
141         except OSError:
142             pass