"""Caching of formatted files with feature-based invalidation."""
-
+import hashlib
import os
import pickle
+import sys
import tempfile
+from dataclasses import dataclass, field
from pathlib import Path
-from typing import Dict, Iterable, Set, Tuple
+from typing import Dict, Iterable, NamedTuple, Set, Tuple
from platformdirs import user_cache_dir
from _black_version import version as __version__
from black.mode import Mode
-# types
-Timestamp = float
-FileSize = int
-CacheInfo = Tuple[Timestamp, FileSize]
-Cache = Dict[str, CacheInfo]
+if sys.version_info >= (3, 11):
+ from typing import Self
+else:
+ from typing_extensions import Self
+
+
+class FileData(NamedTuple):
+ st_mtime: float
+ st_size: int
+ hash: str
def get_cache_dir() -> Path:
CACHE_DIR = get_cache_dir()
-def read_cache(mode: Mode) -> Cache:
- """Read the cache if it exists and is well formed.
-
- If it is not well formed, the call to write_cache later should resolve the issue.
- """
- cache_file = get_cache_file(mode)
- if not cache_file.exists():
- return {}
-
- with cache_file.open("rb") as fobj:
- try:
- cache: Cache = pickle.load(fobj)
- except (pickle.UnpicklingError, ValueError, IndexError):
- return {}
-
- return cache
-
-
def get_cache_file(mode: Mode) -> Path:
return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
-def get_cache_info(path: Path) -> CacheInfo:
- """Return the information used to check if a file is already formatted or not."""
- stat = path.stat()
- return stat.st_mtime, stat.st_size
-
-
-def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
- """Split an iterable of paths in `sources` into two sets.
-
- The first contains paths of files that modified on disk or are not in the
- cache. The other contains paths to non-modified files.
- """
- todo, done = set(), set()
- for src in sources:
- res_src = src.resolve()
- if cache.get(str(res_src)) != get_cache_info(res_src):
- todo.add(src)
- else:
- done.add(src)
- return todo, done
-
-
-def write_cache(cache: Cache, sources: Iterable[Path], mode: Mode) -> None:
- """Update the cache file."""
- cache_file = get_cache_file(mode)
- try:
- CACHE_DIR.mkdir(parents=True, exist_ok=True)
- new_cache = {
- **cache,
- **{str(src.resolve()): get_cache_info(src) for src in sources},
- }
- with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
- pickle.dump(new_cache, f, protocol=4)
- os.replace(f.name, cache_file)
- except OSError:
- pass
+@dataclass
+class Cache:
+ mode: Mode
+ cache_file: Path
+ file_data: Dict[str, FileData] = field(default_factory=dict)
+
+ @classmethod
+ def read(cls, mode: Mode) -> Self:
+ """Read the cache if it exists and is well formed.
+
+ If it is not well formed, the call to write later should
+ resolve the issue.
+ """
+ cache_file = get_cache_file(mode)
+ if not cache_file.exists():
+ return cls(mode, cache_file)
+
+ with cache_file.open("rb") as fobj:
+ try:
+ file_data: Dict[str, FileData] = pickle.load(fobj)
+ except (pickle.UnpicklingError, ValueError, IndexError):
+ return cls(mode, cache_file)
+
+ return cls(mode, cache_file, file_data)
+
+ @staticmethod
+ def hash_digest(path: Path) -> str:
+ """Return hash digest for path."""
+
+ data = path.read_bytes()
+ return hashlib.sha256(data).hexdigest()
+
+ @staticmethod
+ def get_file_data(path: Path) -> FileData:
+ """Return file data for path."""
+
+ stat = path.stat()
+ hash = Cache.hash_digest(path)
+ return FileData(stat.st_mtime, stat.st_size, hash)
+
+ def is_changed(self, source: Path) -> bool:
+ """Check if source has changed compared to cached version."""
+ res_src = source.resolve()
+ old = self.file_data.get(str(res_src))
+ if old is None:
+ return True
+
+ st = res_src.stat()
+ if st.st_size != old.st_size:
+ return True
+ if int(st.st_mtime) != int(old.st_mtime):
+ new_hash = Cache.hash_digest(res_src)
+ if new_hash != old.hash:
+ return True
+ return False
+
+ def filtered_cached(self, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
+ """Split an iterable of paths in `sources` into two sets.
+
+ The first contains paths of files that modified on disk or are not in the
+ cache. The other contains paths to non-modified files.
+ """
+ changed: Set[Path] = set()
+ done: Set[Path] = set()
+ for src in sources:
+ if self.is_changed(src):
+ changed.add(src)
+ else:
+ done.add(src)
+ return changed, done
+
+ def write(self, sources: Iterable[Path]) -> None:
+ """Update the cache file data and write a new cache file."""
+ self.file_data.update(
+ **{str(src.resolve()): Cache.get_file_data(src) for src in sources}
+ )
+ try:
+ CACHE_DIR.mkdir(parents=True, exist_ok=True)
+ with tempfile.NamedTemporaryFile(
+ dir=str(self.cache_file.parent), delete=False
+ ) as f:
+ pickle.dump(self.file_data, f, protocol=4)
+ os.replace(f.name, self.cache_file)
+ except OSError:
+ pass
import black.files
from black import Feature, TargetVersion
from black import re_compile_maybe_verbose as compile_pattern
-from black.cache import get_cache_dir, get_cache_file
+from black.cache import FileData, get_cache_dir, get_cache_file
from black.debug import DebugVisitor
from black.output import color_diff, diff
from black.report import Report
self.invokeBlack([str(path), "--pyi"])
actual = path.read_text(encoding="utf-8")
# verify cache with --pyi is separate
- pyi_cache = black.read_cache(pyi_mode)
- self.assertIn(str(path), pyi_cache)
- normal_cache = black.read_cache(DEFAULT_MODE)
- self.assertNotIn(str(path), normal_cache)
+ pyi_cache = black.Cache.read(pyi_mode)
+ assert not pyi_cache.is_changed(path)
+ normal_cache = black.Cache.read(DEFAULT_MODE)
+ assert normal_cache.is_changed(path)
self.assertFormatEqual(expected, actual)
black.assert_equivalent(contents, actual)
black.assert_stable(contents, actual, pyi_mode)
actual = path.read_text(encoding="utf-8")
self.assertEqual(actual, expected)
# verify cache with --pyi is separate
- pyi_cache = black.read_cache(pyi_mode)
- normal_cache = black.read_cache(reg_mode)
+ pyi_cache = black.Cache.read(pyi_mode)
+ normal_cache = black.Cache.read(reg_mode)
for path in paths:
- self.assertIn(str(path), pyi_cache)
- self.assertNotIn(str(path), normal_cache)
+ assert not pyi_cache.is_changed(path)
+ assert normal_cache.is_changed(path)
def test_pipe_force_pyi(self) -> None:
source, expected = read_data("miscellaneous", "force_pyi")
self.invokeBlack([str(path), *PY36_ARGS])
actual = path.read_text(encoding="utf-8")
# verify cache with --target-version is separate
- py36_cache = black.read_cache(py36_mode)
- self.assertIn(str(path), py36_cache)
- normal_cache = black.read_cache(reg_mode)
- self.assertNotIn(str(path), normal_cache)
+ py36_cache = black.Cache.read(py36_mode)
+ assert not py36_cache.is_changed(path)
+ normal_cache = black.Cache.read(reg_mode)
+ assert normal_cache.is_changed(path)
self.assertEqual(actual, expected)
@event_loop()
actual = path.read_text(encoding="utf-8")
self.assertEqual(actual, expected)
# verify cache with --target-version is separate
- pyi_cache = black.read_cache(py36_mode)
- normal_cache = black.read_cache(reg_mode)
+ pyi_cache = black.Cache.read(py36_mode)
+ normal_cache = black.Cache.read(reg_mode)
for path in paths:
- self.assertIn(str(path), pyi_cache)
- self.assertNotIn(str(path), normal_cache)
+ assert not pyi_cache.is_changed(path)
+ assert normal_cache.is_changed(path)
def test_pipe_force_py36(self) -> None:
source, expected = read_data("miscellaneous", "force_py36")
with cache_dir() as workspace:
cache_file = get_cache_file(mode)
cache_file.write_text("this is not a pickle", encoding="utf-8")
- assert black.read_cache(mode) == {}
+ assert black.Cache.read(mode).file_data == {}
src = (workspace / "test.py").resolve()
src.write_text("print('hello')", encoding="utf-8")
invokeBlack([str(src)])
- cache = black.read_cache(mode)
- assert str(src) in cache
+ cache = black.Cache.read(mode)
+ assert not cache.is_changed(src)
def test_cache_single_file_already_cached(self) -> None:
mode = DEFAULT_MODE
with cache_dir() as workspace:
src = (workspace / "test.py").resolve()
src.write_text("print('hello')", encoding="utf-8")
- black.write_cache({}, [src], mode)
+ cache = black.Cache.read(mode)
+ cache.write([src])
invokeBlack([str(src)])
assert src.read_text(encoding="utf-8") == "print('hello')"
one.write_text("print('hello')", encoding="utf-8")
two = (workspace / "two.py").resolve()
two.write_text("print('hello')", encoding="utf-8")
- black.write_cache({}, [one], mode)
+ cache = black.Cache.read(mode)
+ cache.write([one])
invokeBlack([str(workspace)])
assert one.read_text(encoding="utf-8") == "print('hello')"
assert two.read_text(encoding="utf-8") == 'print("hello")\n'
- cache = black.read_cache(mode)
- assert str(one) in cache
- assert str(two) in cache
+ cache = black.Cache.read(mode)
+ assert not cache.is_changed(one)
+ assert not cache.is_changed(two)
@pytest.mark.parametrize("color", [False, True], ids=["no-color", "with-color"])
def test_no_cache_when_writeback_diff(self, color: bool) -> None:
with cache_dir() as workspace:
src = (workspace / "test.py").resolve()
src.write_text("print('hello')", encoding="utf-8")
- with patch("black.read_cache") as read_cache, patch(
- "black.write_cache"
+ with patch.object(black.Cache, "read") as read_cache, patch.object(
+ black.Cache, "write"
) as write_cache:
cmd = [str(src), "--diff"]
if color:
invokeBlack(cmd)
cache_file = get_cache_file(mode)
assert cache_file.exists() is False
+ read_cache.assert_called_once()
write_cache.assert_not_called()
- read_cache.assert_not_called()
@pytest.mark.parametrize("color", [False, True], ids=["no-color", "with-color"])
@event_loop()
def test_read_cache_no_cachefile(self) -> None:
mode = DEFAULT_MODE
with cache_dir():
- assert black.read_cache(mode) == {}
+ assert black.Cache.read(mode).file_data == {}
def test_write_cache_read_cache(self) -> None:
mode = DEFAULT_MODE
with cache_dir() as workspace:
src = (workspace / "test.py").resolve()
src.touch()
- black.write_cache({}, [src], mode)
- cache = black.read_cache(mode)
- assert str(src) in cache
- assert cache[str(src)] == black.get_cache_info(src)
+ write_cache = black.Cache.read(mode)
+ write_cache.write([src])
+ read_cache = black.Cache.read(mode)
+ assert not read_cache.is_changed(src)
def test_filter_cached(self) -> None:
with TemporaryDirectory() as workspace:
uncached.touch()
cached.touch()
cached_but_changed.touch()
- cache = {
- str(cached): black.get_cache_info(cached),
- str(cached_but_changed): (0.0, 0),
- }
- todo, done = black.cache.filter_cached(
- cache, {uncached, cached, cached_but_changed}
- )
+ cache = black.Cache.read(DEFAULT_MODE)
+
+ orig_func = black.Cache.get_file_data
+
+ def wrapped_func(path: Path) -> FileData:
+ if path == cached:
+ return orig_func(path)
+ if path == cached_but_changed:
+ return FileData(0.0, 0, "")
+ raise AssertionError
+
+ with patch.object(black.Cache, "get_file_data", side_effect=wrapped_func):
+ cache.write([cached, cached_but_changed])
+ todo, done = cache.filtered_cached({uncached, cached, cached_but_changed})
assert todo == {uncached, cached_but_changed}
assert done == {cached}
+ def test_filter_cached_hash(self) -> None:
+ with TemporaryDirectory() as workspace:
+ path = Path(workspace)
+ src = (path / "test.py").resolve()
+ src.write_text("print('hello')", encoding="utf-8")
+ st = src.stat()
+ cache = black.Cache.read(DEFAULT_MODE)
+ cache.write([src])
+ cached_file_data = cache.file_data[str(src)]
+
+ todo, done = cache.filtered_cached([src])
+ assert todo == set()
+ assert done == {src}
+ assert cached_file_data.st_mtime == st.st_mtime
+
+ # Modify st_mtime
+ cached_file_data = cache.file_data[str(src)] = FileData(
+ cached_file_data.st_mtime - 1,
+ cached_file_data.st_size,
+ cached_file_data.hash,
+ )
+ todo, done = cache.filtered_cached([src])
+ assert todo == set()
+ assert done == {src}
+ assert cached_file_data.st_mtime < st.st_mtime
+ assert cached_file_data.st_size == st.st_size
+ assert cached_file_data.hash == black.Cache.hash_digest(src)
+
+ # Modify contents
+ src.write_text("print('hello world')", encoding="utf-8")
+ new_st = src.stat()
+ todo, done = cache.filtered_cached([src])
+ assert todo == {src}
+ assert done == set()
+ assert cached_file_data.st_mtime < new_st.st_mtime
+ assert cached_file_data.st_size != new_st.st_size
+ assert cached_file_data.hash != black.Cache.hash_digest(src)
+
def test_write_cache_creates_directory_if_needed(self) -> None:
mode = DEFAULT_MODE
with cache_dir(exists=False) as workspace:
assert not workspace.exists()
- black.write_cache({}, [], mode)
+ cache = black.Cache.read(mode)
+ cache.write([])
assert workspace.exists()
@event_loop()
clean = (workspace / "clean.py").resolve()
clean.write_text('print("hello")\n', encoding="utf-8")
invokeBlack([str(workspace)], exit_code=123)
- cache = black.read_cache(mode)
- assert str(failing) not in cache
- assert str(clean) in cache
+ cache = black.Cache.read(mode)
+ assert cache.is_changed(failing)
+ assert not cache.is_changed(clean)
def test_write_cache_write_fail(self) -> None:
mode = DEFAULT_MODE
- with cache_dir(), patch.object(Path, "open") as mock:
- mock.side_effect = OSError
- black.write_cache({}, [], mode)
+ with cache_dir():
+ cache = black.Cache.read(mode)
+ with patch.object(Path, "open") as mock:
+ mock.side_effect = OSError
+ cache.write([])
def test_read_cache_line_lengths(self) -> None:
mode = DEFAULT_MODE
with cache_dir() as workspace:
path = (workspace / "file.py").resolve()
path.touch()
- black.write_cache({}, [path], mode)
- one = black.read_cache(mode)
- assert str(path) in one
- two = black.read_cache(short_mode)
- assert str(path) not in two
+ cache = black.Cache.read(mode)
+ cache.write([path])
+ one = black.Cache.read(mode)
+ assert not one.is_changed(path)
+ two = black.Cache.read(short_mode)
+ assert two.is_changed(path)
def assert_collected_sources(