attrs = ">=17.4.0"
click = "*"
setuptools = ">=38.6.0"
+appdirs = "*"
[dev-packages]
pre-commit = "*"
{
"_meta": {
"hash": {
- "sha256": "40e1fcca5bf4adcd0e688675714c4b2a771009b6e2005a0f375de1a46a64c906"
+ "sha256": "b6412a09cc7dd70b0dcd83aa9f1ab659f4c0e2ba413060ab03f7ba4b064bebce"
},
"pipfile-spec": 6,
"requires": {},
]
},
"default": {
+ "appdirs": {
+ "hashes": [
+ "sha256:9e5896d1372858f8dd3344faf4e5014d21849c756c8d5701f78f8a103b372d92",
+ "sha256:d8b24664561d0d34ddfaec54636d502d7cea6e29c3eaf68f3df6180863e2166e"
+ ],
+ "index": "pypi",
+ "version": "==1.4.3"
+ },
"attrs": {
"hashes": [
"sha256:1c7960ccfd6a005cd9f7ba884e6316b5e430a3f1a6c37c5f87d8b43f83b54ec9",
},
"aspy.yaml": {
"hashes": [
- "sha256:6215f44900ff65f27dbd00a36b06a7926276436ed377320cfd4febd69bbd4a94",
- "sha256:be70cc0ccd1ee1d30f589f2403792eb2ffa7546470af0a17179541b13d8374df"
+ "sha256:c959530fab398e2391516bc8d5146489f9273b07d87dd8ba5e8b73406f7cc1fa",
+ "sha256:da95110d120a9168c9f43601b9cb732f006d8f193ee2c9b402c823026e4a9387"
],
- "version": "==1.0.0"
+ "version": "==1.1.0"
},
"attrs": {
"hashes": [
},
"cached-property": {
"hashes": [
- "sha256:6e6935ec62567fbcbc346fad84fcea9bc77e3547b7267e62bc5b7ed8e5438ae8",
- "sha256:a2fa0f89dd422f7e5dd992a4a3e0ce209d5d1e47a4db28fd0a7b5273ec8da3f0"
+ "sha256:67acb3ee8234245e8aea3784a492272239d9c4b487eba2fdcce9d75460d34520",
+ "sha256:bf093e640b7294303c7cc7ba3212f00b7a07d0416c1d923465995c9ef860a139"
],
- "version": "==1.4.0"
+ "version": "==1.4.2"
},
"certifi": {
"hashes": [
- "sha256:14131608ad2fd56836d33a71ee60fa1c82bc9d2c8d98b7bdbc631fe1b3cd1296",
- "sha256:edbc3f203427eef571f79a7692bb160a2b0f7ccaa31953e99bd17e307cf63f7d"
+ "sha256:13e698f54293db9f89122b0581843a782ad0934a4fe0172d2a980ba77fc61bb7",
+ "sha256:9fa520c1bacfb634fa7af20a76bcbd3d5fb390481724c597da32c719a7dca4b0"
],
- "version": "==2018.1.18"
+ "version": "==2018.4.16"
},
"cfgv": {
"hashes": [
},
"identify": {
"hashes": [
- "sha256:53be6ea950a5f40e13be2dd87e67413eb6879527b831333196ab2a54de38f499",
- "sha256:c0bfb29634e04cde8e54aee2d55aff9dad30d6ea1f3e9e3ce731934d78635aa1"
+ "sha256:5cbcc7fca1263bd87fc4dea1abfd7abbbc3807c9b9f09e6f999da6857a0fe35a",
+ "sha256:a42f6ed9c3ad02b187c8a17027bb9042a54f463d8e617ca208038a25ec69faa7"
],
- "version": "==1.0.8"
+ "version": "==1.0.13"
},
"idna": {
"hashes": [
},
"mypy": {
"hashes": [
- "sha256:3bd95a1369810f7693366911d85be9f0a0bd994f6cb7162b7a994e5ded90e3d9",
- "sha256:7247f9948d7cdaae9408a4ee1662a01853c24e668117b4419acf025b05fbe3ce"
+ "sha256:04bffef22377b3f56f96da2d032e5d0b2e8a9062a127afc008dc4b0e64cede7a",
+ "sha256:cdd3ddf96a2cc2e955bcc7b2a16b25e400f132393375b45a2d719c91ac1a8291"
],
"index": "pypi",
- "version": "==0.580"
+ "version": "==0.590"
},
"nodeenv": {
"hashes": [
},
"pycodestyle": {
"hashes": [
+ "sha256:1ec08a51c901dfe44921576ed6e4c1f5b7ecbad403f871397feedb5eb8e4fa14",
+ "sha256:5ff2fbcbab997895ba9ead77e1b38b3ebc2e5c3b8a6194ef918666e4c790a00e",
"sha256:682256a5b318149ca0d2a9185d365d8864a768a28db66a84a2ea946bcc426766",
"sha256:6c4245ade1edfad79c3446fadfc96b0de2759662dc29d07d80a6f27ad1ca6ba9"
],
},
"pytz": {
"hashes": [
- "sha256:07edfc3d4d2705a20a6e99d97f0c4b61c800b8232dc1c04d87e8554f130148dd",
- "sha256:3a47ff71597f821cd84a162e71593004286e5be07a340fd462f0d33a760782b5",
- "sha256:410bcd1d6409026fbaa65d9ed33bf6dd8b1e94a499e32168acfc7b332e4095c0",
- "sha256:5bd55c744e6feaa4d599a6cbd8228b4f8f9ba96de2c38d56f08e534b3c9edf0d",
- "sha256:61242a9abc626379574a166dc0e96a66cd7c3b27fc10868003fa210be4bff1c9",
- "sha256:887ab5e5b32e4d0c86efddd3d055c1f363cbaa583beb8da5e22d2fa2f64d51ef",
- "sha256:ba18e6a243b3625513d85239b3e49055a2f0318466e0b8a92b8fb8ca7ccdf55f",
- "sha256:ed6509d9af298b7995d69a440e2822288f2eca1681b8cce37673dbb10091e5fe",
- "sha256:f93ddcdd6342f94cea379c73cddb5724e0d6d0a1c91c9bdef364dc0368ba4fda"
+ "sha256:65ae0c8101309c45772196b21b74c46b2e5d11b6275c45d251b150d5da334555",
+ "sha256:c06425302f2cf668f1bba7a0a03f3c1d34d4ebeef2c72003da308b3947c7f749"
],
- "version": "==2018.3"
+ "version": "==2018.4"
},
"pyyaml": {
"hashes": [
},
"tqdm": {
"hashes": [
- "sha256:4f2eb1d14804caf7095500fe11da0e481a47af912e7b57c93f886ac3c40a49dd",
- "sha256:91ac47ec2ba6bb92b7ba37706f4dea37019ddd784b22fd279a4b12d93327191d"
+ "sha256:597e7526c85df881d51e094360181a84533aede1cb3f5a1cada8bbd4de557efd",
+ "sha256:fe3d218d5b61993d415aa2a9db6dd64c0e4cefb90164ebb197ef3b1d99f531dc"
],
- "version": "==4.20.0"
+ "version": "==4.23.0"
},
"twine": {
"hashes": [
that is pinned to the latest release on PyPI. If you'd rather run on
master, this is also an option.
+
+## Ignoring non-modified files
+
+*Black* remembers files it already formatted, unless the `--diff` flag is used or
+code is passed via standard input. This information is stored per-user. The exact
+location of the file depends on the black version and the system on which black
+is run. The file is non-portable. The standard location on common operating systems
+is:
+
+* Windows: `C:\\Users\<username>\AppData\Local\black\black\Cache\<version>\cache.pkl`
+* macOS: `/Users/<username>/Library/Caches/black/<version>/cache.pkl`
+* Linux: `/home/<username>/.cache/black/<version>/cache.pkl`
+
+
## Testimonials
**Dusty Phillips**, [writer](https://smile.amazon.com/s/ref=nb_sb_noss?url=search-alias%3Daps&field-keywords=dusty+phillips):
#!/usr/bin/env python3
import asyncio
+import pickle
from asyncio.base_events import BaseEventLoop
from concurrent.futures import Executor, ProcessPoolExecutor
from enum import Enum
Union,
)
+from appdirs import user_cache_dir
from attr import dataclass, Factory
import click
Index = int
LN = Union[Leaf, Node]
SplitFunc = Callable[["Line", bool], Iterator["Line"]]
+Timestamp = float
+FileSize = int
+CacheInfo = Tuple[Timestamp, FileSize]
+Cache = Dict[Path, CacheInfo]
out = partial(click.secho, bold=True, err=True)
err = partial(click.secho, fg="red", err=True)
DIFF = 2
+class Changed(Enum):
+ NO = 0
+ CACHED = 1
+ YES = 2
+
+
@click.command()
@click.option(
"-l",
write_back = WriteBack.YES
if len(sources) == 0:
ctx.exit(0)
+ return
+
elif len(sources) == 1:
- p = sources[0]
- report = Report(check=check, quiet=quiet)
- try:
- if not p.is_file() and str(p) == "-":
- changed = format_stdin_to_stdout(
- line_length=line_length, fast=fast, write_back=write_back
- )
- else:
- changed = format_file_in_place(
- p, line_length=line_length, fast=fast, write_back=write_back
- )
- report.done(p, changed)
- except Exception as exc:
- report.failed(p, str(exc))
- ctx.exit(report.return_code)
+ return_code = run_single_file_mode(
+ line_length, check, fast, quiet, write_back, sources[0]
+ )
else:
- loop = asyncio.get_event_loop()
- executor = ProcessPoolExecutor(max_workers=os.cpu_count())
- return_code = 1
- try:
- return_code = loop.run_until_complete(
- schedule_formatting(
- sources, line_length, write_back, fast, quiet, loop, executor
+ return_code = run_multi_file_mode(line_length, fast, quiet, write_back, sources)
+ ctx.exit(return_code)
+
+
+def run_single_file_mode(
+ line_length: int,
+ check: bool,
+ fast: bool,
+ quiet: bool,
+ write_back: WriteBack,
+ src: Path,
+) -> int:
+ report = Report(check=check, quiet=quiet)
+ try:
+ if not src.is_file() and str(src) == "-":
+ changed = format_stdin_to_stdout(
+ line_length=line_length, fast=fast, write_back=write_back
+ )
+ else:
+ changed = Changed.NO
+ cache: Cache = {}
+ if write_back != WriteBack.DIFF:
+ cache = read_cache()
+ src = src.resolve()
+ if src in cache and cache[src] == get_cache_info(src):
+ changed = Changed.CACHED
+ if changed is not Changed.CACHED:
+ changed = format_file_in_place(
+ src, line_length=line_length, fast=fast, write_back=write_back
)
+ if write_back != WriteBack.DIFF and changed is not Changed.NO:
+ write_cache(cache, [src])
+ report.done(src, changed)
+ except Exception as exc:
+ report.failed(src, str(exc))
+ return report.return_code
+
+
+def run_multi_file_mode(
+ line_length: int,
+ fast: bool,
+ quiet: bool,
+ write_back: WriteBack,
+ sources: List[Path],
+) -> int:
+ loop = asyncio.get_event_loop()
+ executor = ProcessPoolExecutor(max_workers=os.cpu_count())
+ return_code = 1
+ try:
+ return_code = loop.run_until_complete(
+ schedule_formatting(
+ sources, line_length, write_back, fast, quiet, loop, executor
)
- finally:
- shutdown(loop)
- ctx.exit(return_code)
+ )
+ finally:
+ shutdown(loop)
+ return return_code
async def schedule_formatting(
`line_length`, `write_back`, and `fast` options are passed to
:func:`format_file_in_place`.
"""
- lock = None
- if write_back == WriteBack.DIFF:
- # For diff output, we need locks to ensure we don't interleave output
- # from different processes.
- manager = Manager()
- lock = manager.Lock()
- tasks = {
- src: loop.run_in_executor(
- executor, format_file_in_place, src, line_length, fast, write_back, lock
- )
- for src in sources
- }
- _task_values = list(tasks.values())
- loop.add_signal_handler(signal.SIGINT, cancel, _task_values)
- loop.add_signal_handler(signal.SIGTERM, cancel, _task_values)
- await asyncio.wait(tasks.values())
- cancelled = []
report = Report(check=write_back is WriteBack.NO, quiet=quiet)
- for src, task in tasks.items():
- if not task.done():
- report.failed(src, "timed out, cancelling")
- task.cancel()
- cancelled.append(task)
- elif task.cancelled():
- cancelled.append(task)
- elif task.exception():
- report.failed(src, str(task.exception()))
- else:
- report.done(src, task.result())
+ cache: Cache = {}
+ if write_back != WriteBack.DIFF:
+ cache = read_cache()
+ sources, cached = filter_cached(cache, sources)
+ for src in cached:
+ report.done(src, Changed.CACHED)
+ cancelled = []
+ formatted = []
+ if sources:
+ lock = None
+ if write_back == WriteBack.DIFF:
+ # For diff output, we need locks to ensure we don't interleave output
+ # from different processes.
+ manager = Manager()
+ lock = manager.Lock()
+ tasks = {
+ src: loop.run_in_executor(
+ executor, format_file_in_place, src, line_length, fast, write_back, lock
+ )
+ for src in sources
+ }
+ _task_values = list(tasks.values())
+ loop.add_signal_handler(signal.SIGINT, cancel, _task_values)
+ loop.add_signal_handler(signal.SIGTERM, cancel, _task_values)
+ await asyncio.wait(_task_values)
+ for src, task in tasks.items():
+ if not task.done():
+ report.failed(src, "timed out, cancelling")
+ task.cancel()
+ cancelled.append(task)
+ elif task.cancelled():
+ cancelled.append(task)
+ elif task.exception():
+ report.failed(src, str(task.exception()))
+ else:
+ formatted.append(src)
+ report.done(src, task.result())
+
if cancelled:
await asyncio.gather(*cancelled, loop=loop, return_exceptions=True)
elif not quiet:
out("All done! ✨ 🍰 ✨")
if not quiet:
click.echo(str(report))
+
+ if write_back != WriteBack.DIFF and formatted:
+ write_cache(cache, formatted)
+
return report.return_code
fast: bool,
write_back: WriteBack = WriteBack.NO,
lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
-) -> bool:
+) -> Changed:
"""Format file under `src` path. Return True if changed.
If `write_back` is True, write reformatted code back to stdout.
`line_length` and `fast` options are passed to :func:`format_file_contents`.
"""
+
with tokenize.open(src) as src_buffer:
src_contents = src_buffer.read()
try:
src_contents, line_length=line_length, fast=fast
)
except NothingChanged:
- return False
+ return Changed.NO
if write_back == write_back.YES:
with open(src, "w", encoding=src_buffer.encoding) as f:
finally:
if lock:
lock.release()
- return True
+ return Changed.YES
def format_stdin_to_stdout(
line_length: int, fast: bool, write_back: WriteBack = WriteBack.NO
-) -> bool:
+) -> Changed:
"""Format file on stdin. Return True if changed.
If `write_back` is True, write reformatted code back to stdout.
dst = src
try:
dst = format_file_contents(src, line_length=line_length, fast=fast)
- return True
+ return Changed.YES
except NothingChanged:
- return False
+ return Changed.NO
finally:
if write_back == WriteBack.YES:
same_count: int = 0
failure_count: int = 0
- def done(self, src: Path, changed: bool) -> None:
+ def done(self, src: Path, changed: Changed) -> None:
"""Increment the counter for successful reformatting. Write out a message."""
- if changed:
+ if changed is Changed.YES:
reformatted = "would reformat" if self.check else "reformatted"
if not self.quiet:
out(f"{reformatted} {src}")
self.change_count += 1
else:
if not self.quiet:
- out(f"{src} already well formatted, good job.", bold=False)
+ if changed is Changed.NO:
+ msg = f"{src} already well formatted, good job."
+ else:
+ msg = f"{src} wasn't modified on disk since last run."
+ out(msg, bold=False)
self.same_count += 1
def failed(self, src: Path, message: str) -> None:
return regex.sub(replacement, regex.sub(replacement, original))
+CACHE_DIR = Path(user_cache_dir("black", version=__version__))
+CACHE_FILE = CACHE_DIR / "cache.pickle"
+
+
+def read_cache() -> Cache:
+ """Read the cache if it exists and is well formed.
+
+ If it is not well formed, the call to write_cache later should resolve the issue.
+ """
+ if not CACHE_FILE.exists():
+ return {}
+
+ with CACHE_FILE.open("rb") as fobj:
+ try:
+ cache: Cache = pickle.load(fobj)
+ except pickle.UnpicklingError:
+ return {}
+
+ return cache
+
+
+def get_cache_info(path: Path) -> CacheInfo:
+ """Return the information used to check if a file is already formatted or not."""
+ stat = path.stat()
+ return stat.st_mtime, stat.st_size
+
+
+def filter_cached(
+ cache: Cache, sources: Iterable[Path]
+) -> Tuple[List[Path], List[Path]]:
+ """Split a list of paths into two.
+
+ The first list contains paths of files that modified on disk or are not in the
+ cache. The other list contains paths to non-modified files.
+ """
+ todo, done = [], []
+ for src in sources:
+ src = src.resolve()
+ if cache.get(src) != get_cache_info(src):
+ todo.append(src)
+ else:
+ done.append(src)
+ return todo, done
+
+
+def write_cache(cache: Cache, sources: List[Path]) -> None:
+ """Update the cache file."""
+ try:
+ if not CACHE_DIR.exists():
+ CACHE_DIR.mkdir(parents=True)
+ new_cache = {**cache, **{src.resolve(): get_cache_info(src) for src in sources}}
+ with CACHE_FILE.open("wb") as fobj:
+ pickle.dump(new_cache, fobj, protocol=pickle.HIGHEST_PROTOCOL)
+ except OSError:
+ pass
+
+
if __name__ == "__main__":
main()
package_data={"blib2to3": ["*.txt"]},
python_requires=">=3.6",
zip_safe=False,
- install_requires=["click", "attrs>=17.4.0"],
+ install_requires=["click", "attrs>=17.4.0", "appdirs"],
test_suite="tests.test_black",
classifiers=[
"Development Status :: 3 - Alpha",
#!/usr/bin/env python3
+import asyncio
+from contextlib import contextmanager
from functools import partial
from io import StringIO
import os
from pathlib import Path
import sys
-from typing import Any, List, Tuple
+from tempfile import TemporaryDirectory
+from typing import Any, List, Tuple, Iterator
import unittest
from unittest.mock import patch
from click import unstyle
+from click.testing import CliRunner
import black
return "".join(_input).strip() + "\n", "".join(_output).strip() + "\n"
+@contextmanager
+def cache_dir(exists: bool = True) -> Iterator[Path]:
+ with TemporaryDirectory() as workspace:
+ cache_dir = Path(workspace)
+ if not exists:
+ cache_dir = cache_dir / "new"
+ cache_file = cache_dir / "cache.pkl"
+ with patch("black.CACHE_DIR", cache_dir), patch("black.CACHE_FILE", cache_file):
+ yield cache_dir
+
+
+@contextmanager
+def event_loop(close: bool) -> Iterator[None]:
+ policy = asyncio.get_event_loop_policy()
+ old_loop = policy.get_event_loop()
+ loop = policy.new_event_loop()
+ asyncio.set_event_loop(loop)
+ try:
+ yield
+
+ finally:
+ policy.set_event_loop(old_loop)
+ if close:
+ loop.close()
+
+
class BlackTestCase(unittest.TestCase):
maxDiff = None
self.assertFormatEqual(expected, actual)
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, line_length=ll)
- self.assertFalse(ff(THIS_FILE))
+ self.assertIs(ff(THIS_FILE), black.Changed.NO)
@patch("black.dump_to_file", dump_to_stderr)
def test_black(self) -> None:
self.assertFormatEqual(expected, actual)
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, line_length=ll)
- self.assertFalse(ff(THIS_DIR / ".." / "black.py"))
+ self.assertIs(ff(THIS_DIR / ".." / "black.py"), black.Changed.NO)
def test_piping(self) -> None:
source, expected = read_data("../black")
self.assertFormatEqual(expected, actual)
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, line_length=ll)
- self.assertFalse(ff(THIS_DIR / ".." / "setup.py"))
+ self.assertIs(ff(THIS_DIR / ".." / "setup.py"), black.Changed.NO)
@patch("black.dump_to_file", dump_to_stderr)
def test_function(self) -> None:
err_lines.append(msg)
with patch("black.out", out), patch("black.err", err):
- report.done(Path("f1"), changed=False)
+ report.done(Path("f1"), black.Changed.NO)
self.assertEqual(len(out_lines), 1)
self.assertEqual(len(err_lines), 0)
self.assertEqual(out_lines[-1], "f1 already well formatted, good job.")
self.assertEqual(unstyle(str(report)), "1 file left unchanged.")
self.assertEqual(report.return_code, 0)
- report.done(Path("f2"), changed=True)
+ report.done(Path("f2"), black.Changed.YES)
self.assertEqual(len(out_lines), 2)
self.assertEqual(len(err_lines), 0)
self.assertEqual(out_lines[-1], "reformatted f2")
self.assertEqual(
unstyle(str(report)), "1 file reformatted, 1 file left unchanged."
)
+ report.done(Path("f3"), black.Changed.CACHED)
+ self.assertEqual(len(out_lines), 3)
+ self.assertEqual(len(err_lines), 0)
+ self.assertEqual(
+ out_lines[-1], "f3 wasn't modified on disk since last run."
+ )
+ self.assertEqual(
+ unstyle(str(report)), "1 file reformatted, 2 files left unchanged."
+ )
self.assertEqual(report.return_code, 0)
report.check = True
self.assertEqual(report.return_code, 1)
report.check = False
report.failed(Path("e1"), "boom")
- self.assertEqual(len(out_lines), 2)
+ self.assertEqual(len(out_lines), 3)
self.assertEqual(len(err_lines), 1)
self.assertEqual(err_lines[-1], "error: cannot format e1: boom")
self.assertEqual(
unstyle(str(report)),
- "1 file reformatted, 1 file left unchanged, "
+ "1 file reformatted, 2 files left unchanged, "
"1 file failed to reformat.",
)
self.assertEqual(report.return_code, 123)
- report.done(Path("f3"), changed=True)
- self.assertEqual(len(out_lines), 3)
+ report.done(Path("f3"), black.Changed.YES)
+ self.assertEqual(len(out_lines), 4)
self.assertEqual(len(err_lines), 1)
self.assertEqual(out_lines[-1], "reformatted f3")
self.assertEqual(
unstyle(str(report)),
- "2 files reformatted, 1 file left unchanged, "
+ "2 files reformatted, 2 files left unchanged, "
"1 file failed to reformat.",
)
self.assertEqual(report.return_code, 123)
report.failed(Path("e2"), "boom")
- self.assertEqual(len(out_lines), 3)
+ self.assertEqual(len(out_lines), 4)
self.assertEqual(len(err_lines), 2)
self.assertEqual(err_lines[-1], "error: cannot format e2: boom")
self.assertEqual(
unstyle(str(report)),
- "2 files reformatted, 1 file left unchanged, "
+ "2 files reformatted, 2 files left unchanged, "
"2 files failed to reformat.",
)
self.assertEqual(report.return_code, 123)
- report.done(Path("f4"), changed=False)
- self.assertEqual(len(out_lines), 4)
+ report.done(Path("f4"), black.Changed.NO)
+ self.assertEqual(len(out_lines), 5)
self.assertEqual(len(err_lines), 2)
self.assertEqual(out_lines[-1], "f4 already well formatted, good job.")
self.assertEqual(
unstyle(str(report)),
- "2 files reformatted, 2 files left unchanged, "
+ "2 files reformatted, 3 files left unchanged, "
"2 files failed to reformat.",
)
self.assertEqual(report.return_code, 123)
report.check = True
self.assertEqual(
unstyle(str(report)),
- "2 files would be reformatted, 2 files would be left unchanged, "
+ "2 files would be reformatted, 3 files would be left unchanged, "
"2 files would fail to reformat.",
)
self.assertTrue("Actual tree:" in out_str)
self.assertEqual("".join(err_lines), "")
+ def test_cache_broken_file(self) -> None:
+ with cache_dir() as workspace:
+ with black.CACHE_FILE.open("w") as fobj:
+ fobj.write("this is not a pickle")
+ self.assertEqual(black.read_cache(), {})
+ src = (workspace / "test.py").resolve()
+ with src.open("w") as fobj:
+ fobj.write("print('hello')")
+ result = CliRunner().invoke(black.main, [str(src)])
+ self.assertEqual(result.exit_code, 0)
+ cache = black.read_cache()
+ self.assertIn(src, cache)
+
+ def test_cache_single_file_already_cached(self) -> None:
+ with cache_dir() as workspace:
+ src = (workspace / "test.py").resolve()
+ with src.open("w") as fobj:
+ fobj.write("print('hello')")
+ black.write_cache({}, [src])
+ result = CliRunner().invoke(black.main, [str(src)])
+ self.assertEqual(result.exit_code, 0)
+ with src.open("r") as fobj:
+ self.assertEqual(fobj.read(), "print('hello')")
+
+ @event_loop(close=False)
+ def test_cache_multiple_files(self) -> None:
+ with cache_dir() as workspace:
+ one = (workspace / "one.py").resolve()
+ with one.open("w") as fobj:
+ fobj.write("print('hello')")
+ two = (workspace / "two.py").resolve()
+ with two.open("w") as fobj:
+ fobj.write("print('hello')")
+ black.write_cache({}, [one])
+ result = CliRunner().invoke(black.main, [str(workspace)])
+ self.assertEqual(result.exit_code, 0)
+ with one.open("r") as fobj:
+ self.assertEqual(fobj.read(), "print('hello')")
+ with two.open("r") as fobj:
+ self.assertEqual(fobj.read(), 'print("hello")\n')
+ cache = black.read_cache()
+ self.assertIn(one, cache)
+ self.assertIn(two, cache)
+
+ def test_no_cache_when_writeback_diff(self) -> None:
+ with cache_dir() as workspace:
+ src = (workspace / "test.py").resolve()
+ with src.open("w") as fobj:
+ fobj.write("print('hello')")
+ result = CliRunner().invoke(black.main, [str(src), "--diff"])
+ self.assertEqual(result.exit_code, 0)
+ self.assertFalse(black.CACHE_FILE.exists())
+
+ def test_no_cache_when_stdin(self) -> None:
+ with cache_dir():
+ result = CliRunner().invoke(black.main, ["-"], input="print('hello')")
+ self.assertEqual(result.exit_code, 0)
+ self.assertFalse(black.CACHE_FILE.exists())
+
+ def test_read_cache_no_cachefile(self) -> None:
+ with cache_dir():
+ self.assertEqual(black.read_cache(), {})
+
+ def test_write_cache_read_cache(self) -> None:
+ with cache_dir() as workspace:
+ src = (workspace / "test.py").resolve()
+ src.touch()
+ black.write_cache({}, [src])
+ cache = black.read_cache()
+ self.assertIn(src, cache)
+ self.assertEqual(cache[src], black.get_cache_info(src))
+
+ def test_filter_cached(self) -> None:
+ with TemporaryDirectory() as workspace:
+ path = Path(workspace)
+ uncached = (path / "uncached").resolve()
+ cached = (path / "cached").resolve()
+ cached_but_changed = (path / "changed").resolve()
+ uncached.touch()
+ cached.touch()
+ cached_but_changed.touch()
+ cache = {cached: black.get_cache_info(cached), cached_but_changed: (0.0, 0)}
+ todo, done = black.filter_cached(
+ cache, [uncached, cached, cached_but_changed]
+ )
+ self.assertEqual(todo, [uncached, cached_but_changed])
+ self.assertEqual(done, [cached])
+
+ def test_write_cache_creates_directory_if_needed(self) -> None:
+ with cache_dir(exists=False) as workspace:
+ self.assertFalse(workspace.exists())
+ black.write_cache({}, [])
+ self.assertTrue(workspace.exists())
+
+ @event_loop(close=False)
+ def test_failed_formatting_does_not_get_cached(self) -> None:
+ with cache_dir() as workspace:
+ failing = (workspace / "failing.py").resolve()
+ with failing.open("w") as fobj:
+ fobj.write("not actually python")
+ clean = (workspace / "clean.py").resolve()
+ with clean.open("w") as fobj:
+ fobj.write('print("hello")\n')
+ result = CliRunner().invoke(black.main, [str(workspace)])
+ self.assertEqual(result.exit_code, 123)
+ cache = black.read_cache()
+ self.assertNotIn(failing, cache)
+ self.assertIn(clean, cache)
+
+ def test_write_cache_write_fail(self):
+ with cache_dir(), patch.object(Path, "open") as mock:
+ mock.side_effect = OSError
+ black.write_cache({}, [])
+
if __name__ == "__main__":
unittest.main()