]> git.madduck.net Git - etc/vim.git/blob - src/black/cache.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Treat raw strings like other docstrings (#3947)
[etc/vim.git] / src / black / cache.py
1 """Caching of formatted files with feature-based invalidation."""
2
3 import hashlib
4 import os
5 import pickle
6 import sys
7 import tempfile
8 from dataclasses import dataclass, field
9 from pathlib import Path
10 from typing import Dict, Iterable, NamedTuple, Set, Tuple
11
12 from platformdirs import user_cache_dir
13
14 from _black_version import version as __version__
15 from black.mode import Mode
16
17 if sys.version_info >= (3, 11):
18     from typing import Self
19 else:
20     from typing_extensions import Self
21
22
23 class FileData(NamedTuple):
24     st_mtime: float
25     st_size: int
26     hash: str
27
28
29 def get_cache_dir() -> Path:
30     """Get the cache directory used by black.
31
32     Users can customize this directory on all systems using `BLACK_CACHE_DIR`
33     environment variable. By default, the cache directory is the user cache directory
34     under the black application.
35
36     This result is immediately set to a constant `black.cache.CACHE_DIR` as to avoid
37     repeated calls.
38     """
39     # NOTE: Function mostly exists as a clean way to test getting the cache directory.
40     default_cache_dir = user_cache_dir("black")
41     cache_dir = Path(os.environ.get("BLACK_CACHE_DIR", default_cache_dir))
42     cache_dir = cache_dir / __version__
43     return cache_dir
44
45
46 CACHE_DIR = get_cache_dir()
47
48
49 def get_cache_file(mode: Mode) -> Path:
50     return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
51
52
53 @dataclass
54 class Cache:
55     mode: Mode
56     cache_file: Path
57     file_data: Dict[str, FileData] = field(default_factory=dict)
58
59     @classmethod
60     def read(cls, mode: Mode) -> Self:
61         """Read the cache if it exists and is well formed.
62
63         If it is not well formed, the call to write later should
64         resolve the issue.
65         """
66         cache_file = get_cache_file(mode)
67         if not cache_file.exists():
68             return cls(mode, cache_file)
69
70         with cache_file.open("rb") as fobj:
71             try:
72                 data: Dict[str, Tuple[float, int, str]] = pickle.load(fobj)
73                 file_data = {k: FileData(*v) for k, v in data.items()}
74             except (pickle.UnpicklingError, ValueError, IndexError):
75                 return cls(mode, cache_file)
76
77         return cls(mode, cache_file, file_data)
78
79     @staticmethod
80     def hash_digest(path: Path) -> str:
81         """Return hash digest for path."""
82
83         data = path.read_bytes()
84         return hashlib.sha256(data).hexdigest()
85
86     @staticmethod
87     def get_file_data(path: Path) -> FileData:
88         """Return file data for path."""
89
90         stat = path.stat()
91         hash = Cache.hash_digest(path)
92         return FileData(stat.st_mtime, stat.st_size, hash)
93
94     def is_changed(self, source: Path) -> bool:
95         """Check if source has changed compared to cached version."""
96         res_src = source.resolve()
97         old = self.file_data.get(str(res_src))
98         if old is None:
99             return True
100
101         st = res_src.stat()
102         if st.st_size != old.st_size:
103             return True
104         if int(st.st_mtime) != int(old.st_mtime):
105             new_hash = Cache.hash_digest(res_src)
106             if new_hash != old.hash:
107                 return True
108         return False
109
110     def filtered_cached(self, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
111         """Split an iterable of paths in `sources` into two sets.
112
113         The first contains paths of files that modified on disk or are not in the
114         cache. The other contains paths to non-modified files.
115         """
116         changed: Set[Path] = set()
117         done: Set[Path] = set()
118         for src in sources:
119             if self.is_changed(src):
120                 changed.add(src)
121             else:
122                 done.add(src)
123         return changed, done
124
125     def write(self, sources: Iterable[Path]) -> None:
126         """Update the cache file data and write a new cache file."""
127         self.file_data.update(
128             **{str(src.resolve()): Cache.get_file_data(src) for src in sources}
129         )
130         try:
131             CACHE_DIR.mkdir(parents=True, exist_ok=True)
132             with tempfile.NamedTemporaryFile(
133                 dir=str(self.cache_file.parent), delete=False
134             ) as f:
135                 # We store raw tuples in the cache because pickling NamedTuples
136                 # doesn't work with mypyc on Python 3.8, and because it's faster.
137                 data: Dict[str, Tuple[float, int, str]] = {
138                     k: (*v,) for k, v in self.file_data.items()
139                 }
140                 pickle.dump(data, f, protocol=4)
141             os.replace(f.name, self.cache_file)
142         except OSError:
143             pass