]> git.madduck.net Git - etc/vim.git/blobdiff - tests/test_black.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Upgrade mypy (#3864)
[etc/vim.git] / tests / test_black.py
index 5f2e6f5b14c55056ed14e5535a6d9d646112696a..4fb6aef9bca126dff9718c151a8b7398867b2eb3 100644 (file)
@@ -41,7 +41,7 @@ import black
 import black.files
 from black import Feature, TargetVersion
 from black import re_compile_maybe_verbose as compile_pattern
 import black.files
 from black import Feature, TargetVersion
 from black import re_compile_maybe_verbose as compile_pattern
-from black.cache import get_cache_dir, get_cache_file
+from black.cache import FileData, get_cache_dir, get_cache_file
 from black.debug import DebugVisitor
 from black.output import color_diff, diff
 from black.report import Report
 from black.debug import DebugVisitor
 from black.output import color_diff, diff
 from black.report import Report
@@ -104,6 +104,7 @@ class FakeContext(click.Context):
 
     def __init__(self) -> None:
         self.default_map: Dict[str, Any] = {}
 
     def __init__(self) -> None:
         self.default_map: Dict[str, Any] = {}
+        self.params: Dict[str, Any] = {}
         # Dummy root, since most of the tests don't care about it
         self.obj: Dict[str, Any] = {"root": PROJECT_ROOT}
 
         # Dummy root, since most of the tests don't care about it
         self.obj: Dict[str, Any] = {"root": PROJECT_ROOT}
 
@@ -148,8 +149,7 @@ class BlackTestCase(BlackBaseTestCase):
         tmp_file = Path(black.dump_to_file())
         try:
             self.assertFalse(ff(tmp_file, write_back=black.WriteBack.YES))
         tmp_file = Path(black.dump_to_file())
         try:
             self.assertFalse(ff(tmp_file, write_back=black.WriteBack.YES))
-            with open(tmp_file, encoding="utf8") as f:
-                actual = f.read()
+            actual = tmp_file.read_text(encoding="utf-8")
         finally:
             os.unlink(tmp_file)
         self.assertFormatEqual(expected, actual)
         finally:
             os.unlink(tmp_file)
         self.assertFormatEqual(expected, actual)
@@ -177,7 +177,7 @@ class BlackTestCase(BlackBaseTestCase):
                     ff(tmp_file, mode=mode, write_back=black.WriteBack.YES)
                 )
                 with open(tmp_file, "rb") as f:
                     ff(tmp_file, mode=mode, write_back=black.WriteBack.YES)
                 )
                 with open(tmp_file, "rb") as f:
-                    actual = f.read().decode("utf8")
+                    actual = f.read().decode("utf-8")
             finally:
                 os.unlink(tmp_file)
             self.assertFormatEqual(expected, actual)
             finally:
                 os.unlink(tmp_file)
             self.assertFormatEqual(expected, actual)
@@ -197,7 +197,7 @@ class BlackTestCase(BlackBaseTestCase):
                 f"--line-length={black.DEFAULT_LINE_LENGTH}",
                 f"--config={EMPTY_CONFIG}",
             ],
                 f"--line-length={black.DEFAULT_LINE_LENGTH}",
                 f"--config={EMPTY_CONFIG}",
             ],
-            input=BytesIO(source.encode("utf8")),
+            input=BytesIO(source.encode("utf-8")),
         )
         self.assertEqual(result.exit_code, 0)
         self.assertFormatEqual(expected, result.output)
         )
         self.assertEqual(result.exit_code, 0)
         self.assertFormatEqual(expected, result.output)
@@ -220,7 +220,7 @@ class BlackTestCase(BlackBaseTestCase):
             f"--config={EMPTY_CONFIG}",
         ]
         result = BlackRunner().invoke(
             f"--config={EMPTY_CONFIG}",
         ]
         result = BlackRunner().invoke(
-            black.main, args, input=BytesIO(source.encode("utf8"))
+            black.main, args, input=BytesIO(source.encode("utf-8"))
         )
         self.assertEqual(result.exit_code, 0)
         actual = diff_header.sub(DETERMINISTIC_HEADER, result.output)
         )
         self.assertEqual(result.exit_code, 0)
         actual = diff_header.sub(DETERMINISTIC_HEADER, result.output)
@@ -238,7 +238,7 @@ class BlackTestCase(BlackBaseTestCase):
             f"--config={EMPTY_CONFIG}",
         ]
         result = BlackRunner().invoke(
             f"--config={EMPTY_CONFIG}",
         ]
         result = BlackRunner().invoke(
-            black.main, args, input=BytesIO(source.encode("utf8"))
+            black.main, args, input=BytesIO(source.encode("utf-8"))
         )
         actual = result.output
         # Again, the contents are checked in a different test, so only look for colors.
         )
         actual = result.output
         # Again, the contents are checked in a different test, so only look for colors.
@@ -285,8 +285,7 @@ class BlackTestCase(BlackBaseTestCase):
         tmp_file = Path(black.dump_to_file(source))
         try:
             self.assertTrue(ff(tmp_file, write_back=black.WriteBack.YES))
         tmp_file = Path(black.dump_to_file(source))
         try:
             self.assertTrue(ff(tmp_file, write_back=black.WriteBack.YES))
-            with open(tmp_file, encoding="utf8") as f:
-                actual = f.read()
+            actual = tmp_file.read_text(encoding="utf-8")
         finally:
             os.unlink(tmp_file)
         self.assertFormatEqual(expected, actual)
         finally:
             os.unlink(tmp_file)
         self.assertFormatEqual(expected, actual)
@@ -389,8 +388,7 @@ class BlackTestCase(BlackBaseTestCase):
             black.main, [str(tmp_file), "-x", f"--config={EMPTY_CONFIG}"]
         )
         self.assertEqual(result.exit_code, 0)
             black.main, [str(tmp_file), "-x", f"--config={EMPTY_CONFIG}"]
         )
         self.assertEqual(result.exit_code, 0)
-        with open(tmp_file, encoding="utf8") as f:
-            actual = f.read()
+        actual = tmp_file.read_text(encoding="utf-8")
         self.assertFormatEqual(source, actual)
 
     def test_skip_source_first_line_when_mixing_newlines(self) -> None:
         self.assertFormatEqual(source, actual)
 
     def test_skip_source_first_line_when_mixing_newlines(self) -> None:
@@ -494,9 +492,7 @@ class BlackTestCase(BlackBaseTestCase):
         project_root = Path(THIS_DIR / "data" / "nested_gitignore_tests")
         working_directory = project_root / "root"
         target_abspath = working_directory / "child"
         project_root = Path(THIS_DIR / "data" / "nested_gitignore_tests")
         working_directory = project_root / "root"
         target_abspath = working_directory / "child"
-        target_contents = (
-            src.relative_to(working_directory) for src in target_abspath.iterdir()
-        )
+        target_contents = list(target_abspath.iterdir())
 
         def mock_n_calls(responses: List[bool]) -> Callable[[], bool]:
             def _mocked_calls() -> bool:
 
         def mock_n_calls(responses: List[bool]) -> Callable[[], bool]:
             def _mocked_calls() -> bool:
@@ -509,11 +505,11 @@ class BlackTestCase(BlackBaseTestCase):
         with patch("pathlib.Path.iterdir", return_value=target_contents), patch(
             "pathlib.Path.cwd", return_value=working_directory
         ), patch("pathlib.Path.is_dir", side_effect=mock_n_calls([True])):
         with patch("pathlib.Path.iterdir", return_value=target_contents), patch(
             "pathlib.Path.cwd", return_value=working_directory
         ), patch("pathlib.Path.is_dir", side_effect=mock_n_calls([True])):
-            ctx = FakeContext()
-            ctx.obj["root"] = project_root
+            # Note that the root folder (project_root) isn't the folder
+            # named "root" (aka working_directory)
             report = MagicMock(verbose=True)
             black.get_sources(
             report = MagicMock(verbose=True)
             black.get_sources(
-                ctx=ctx,
+                root=project_root,
                 src=("./child",),
                 quiet=False,
                 verbose=True,
                 src=("./child",),
                 quiet=False,
                 verbose=True,
@@ -529,7 +525,7 @@ class BlackTestCase(BlackBaseTestCase):
             for _, mock_args, _ in report.path_ignored.mock_calls
         ), "A symbolic link was reported."
         report.path_ignored.assert_called_once_with(
             for _, mock_args, _ in report.path_ignored.mock_calls
         ), "A symbolic link was reported."
         report.path_ignored.assert_called_once_with(
-            Path("child", "b.py"), "matches a .gitignore file content"
+            Path("root", "child", "b.py"), "matches a .gitignore file content"
         )
 
     def test_report_verbose(self) -> None:
         )
 
     def test_report_verbose(self) -> None:
@@ -1080,7 +1076,7 @@ class BlackTestCase(BlackBaseTestCase):
                 (workspace / "one.py").resolve(),
                 (workspace / "two.py").resolve(),
             ]:
                 (workspace / "one.py").resolve(),
                 (workspace / "two.py").resolve(),
             ]:
-                f.write_text('print("hello")\n')
+                f.write_text('print("hello")\n', encoding="utf-8")
             self.invokeBlack([str(workspace)])
 
     @event_loop()
             self.invokeBlack([str(workspace)])
 
     @event_loop()
@@ -1117,16 +1113,14 @@ class BlackTestCase(BlackBaseTestCase):
         contents, expected = read_data("miscellaneous", "force_pyi")
         with cache_dir() as workspace:
             path = (workspace / "file.py").resolve()
         contents, expected = read_data("miscellaneous", "force_pyi")
         with cache_dir() as workspace:
             path = (workspace / "file.py").resolve()
-            with open(path, "w") as fh:
-                fh.write(contents)
+            path.write_text(contents, encoding="utf-8")
             self.invokeBlack([str(path), "--pyi"])
             self.invokeBlack([str(path), "--pyi"])
-            with open(path, "r") as fh:
-                actual = fh.read()
+            actual = path.read_text(encoding="utf-8")
             # verify cache with --pyi is separate
             # verify cache with --pyi is separate
-            pyi_cache = black.read_cache(pyi_mode)
-            self.assertIn(str(path), pyi_cache)
-            normal_cache = black.read_cache(DEFAULT_MODE)
-            self.assertNotIn(str(path), normal_cache)
+            pyi_cache = black.Cache.read(pyi_mode)
+            assert not pyi_cache.is_changed(path)
+            normal_cache = black.Cache.read(DEFAULT_MODE)
+            assert normal_cache.is_changed(path)
         self.assertFormatEqual(expected, actual)
         black.assert_equivalent(contents, actual)
         black.assert_stable(contents, actual, pyi_mode)
         self.assertFormatEqual(expected, actual)
         black.assert_equivalent(contents, actual)
         black.assert_stable(contents, actual, pyi_mode)
@@ -1142,24 +1136,22 @@ class BlackTestCase(BlackBaseTestCase):
                 (workspace / "file2.py").resolve(),
             ]
             for path in paths:
                 (workspace / "file2.py").resolve(),
             ]
             for path in paths:
-                with open(path, "w") as fh:
-                    fh.write(contents)
+                path.write_text(contents, encoding="utf-8")
             self.invokeBlack([str(p) for p in paths] + ["--pyi"])
             for path in paths:
             self.invokeBlack([str(p) for p in paths] + ["--pyi"])
             for path in paths:
-                with open(path, "r") as fh:
-                    actual = fh.read()
+                actual = path.read_text(encoding="utf-8")
                 self.assertEqual(actual, expected)
             # verify cache with --pyi is separate
                 self.assertEqual(actual, expected)
             # verify cache with --pyi is separate
-            pyi_cache = black.read_cache(pyi_mode)
-            normal_cache = black.read_cache(reg_mode)
+            pyi_cache = black.Cache.read(pyi_mode)
+            normal_cache = black.Cache.read(reg_mode)
             for path in paths:
             for path in paths:
-                self.assertIn(str(path), pyi_cache)
-                self.assertNotIn(str(path), normal_cache)
+                assert not pyi_cache.is_changed(path)
+                assert normal_cache.is_changed(path)
 
     def test_pipe_force_pyi(self) -> None:
         source, expected = read_data("miscellaneous", "force_pyi")
         result = CliRunner().invoke(
 
     def test_pipe_force_pyi(self) -> None:
         source, expected = read_data("miscellaneous", "force_pyi")
         result = CliRunner().invoke(
-            black.main, ["-", "-q", "--pyi"], input=BytesIO(source.encode("utf8"))
+            black.main, ["-", "-q", "--pyi"], input=BytesIO(source.encode("utf-8"))
         )
         self.assertEqual(result.exit_code, 0)
         actual = result.output
         )
         self.assertEqual(result.exit_code, 0)
         actual = result.output
@@ -1171,16 +1163,14 @@ class BlackTestCase(BlackBaseTestCase):
         source, expected = read_data("miscellaneous", "force_py36")
         with cache_dir() as workspace:
             path = (workspace / "file.py").resolve()
         source, expected = read_data("miscellaneous", "force_py36")
         with cache_dir() as workspace:
             path = (workspace / "file.py").resolve()
-            with open(path, "w") as fh:
-                fh.write(source)
+            path.write_text(source, encoding="utf-8")
             self.invokeBlack([str(path), *PY36_ARGS])
             self.invokeBlack([str(path), *PY36_ARGS])
-            with open(path, "r") as fh:
-                actual = fh.read()
+            actual = path.read_text(encoding="utf-8")
             # verify cache with --target-version is separate
             # verify cache with --target-version is separate
-            py36_cache = black.read_cache(py36_mode)
-            self.assertIn(str(path), py36_cache)
-            normal_cache = black.read_cache(reg_mode)
-            self.assertNotIn(str(path), normal_cache)
+            py36_cache = black.Cache.read(py36_mode)
+            assert not py36_cache.is_changed(path)
+            normal_cache = black.Cache.read(reg_mode)
+            assert normal_cache.is_changed(path)
         self.assertEqual(actual, expected)
 
     @event_loop()
         self.assertEqual(actual, expected)
 
     @event_loop()
@@ -1194,26 +1184,24 @@ class BlackTestCase(BlackBaseTestCase):
                 (workspace / "file2.py").resolve(),
             ]
             for path in paths:
                 (workspace / "file2.py").resolve(),
             ]
             for path in paths:
-                with open(path, "w") as fh:
-                    fh.write(source)
+                path.write_text(source, encoding="utf-8")
             self.invokeBlack([str(p) for p in paths] + PY36_ARGS)
             for path in paths:
             self.invokeBlack([str(p) for p in paths] + PY36_ARGS)
             for path in paths:
-                with open(path, "r") as fh:
-                    actual = fh.read()
+                actual = path.read_text(encoding="utf-8")
                 self.assertEqual(actual, expected)
             # verify cache with --target-version is separate
                 self.assertEqual(actual, expected)
             # verify cache with --target-version is separate
-            pyi_cache = black.read_cache(py36_mode)
-            normal_cache = black.read_cache(reg_mode)
+            pyi_cache = black.Cache.read(py36_mode)
+            normal_cache = black.Cache.read(reg_mode)
             for path in paths:
             for path in paths:
-                self.assertIn(str(path), pyi_cache)
-                self.assertNotIn(str(path), normal_cache)
+                assert not pyi_cache.is_changed(path)
+                assert normal_cache.is_changed(path)
 
     def test_pipe_force_py36(self) -> None:
         source, expected = read_data("miscellaneous", "force_py36")
         result = CliRunner().invoke(
             black.main,
             ["-", "-q", "--target-version=py36"],
 
     def test_pipe_force_py36(self) -> None:
         source, expected = read_data("miscellaneous", "force_py36")
         result = CliRunner().invoke(
             black.main,
             ["-", "-q", "--target-version=py36"],
-            input=BytesIO(source.encode("utf8")),
+            input=BytesIO(source.encode("utf-8")),
         )
         self.assertEqual(result.exit_code, 0)
         actual = result.output
         )
         self.assertEqual(result.exit_code, 0)
         actual = result.output
@@ -1442,11 +1430,11 @@ class BlackTestCase(BlackBaseTestCase):
             contents = nl.join(["def f(  ):", "    pass"])
             runner = BlackRunner()
             result = runner.invoke(
             contents = nl.join(["def f(  ):", "    pass"])
             runner = BlackRunner()
             result = runner.invoke(
-                black.main, ["-", "--fast"], input=BytesIO(contents.encode("utf8"))
+                black.main, ["-", "--fast"], input=BytesIO(contents.encode("utf-8"))
             )
             self.assertEqual(result.exit_code, 0)
             output = result.stdout_bytes
             )
             self.assertEqual(result.exit_code, 0)
             output = result.stdout_bytes
-            self.assertIn(nl.encode("utf8"), output)
+            self.assertIn(nl.encode("utf-8"), output)
             if nl == "\n":
                 self.assertNotIn(b"\r\n", output)
 
             if nl == "\n":
                 self.assertNotIn(b"\r\n", output)
 
@@ -1465,30 +1453,6 @@ class BlackTestCase(BlackBaseTestCase):
         with self.assertRaises(AssertionError):
             black.assert_equivalent("{}", "None")
 
         with self.assertRaises(AssertionError):
             black.assert_equivalent("{}", "None")
 
-    def test_shhh_click(self) -> None:
-        try:
-            from click import _unicodefun  # type: ignore
-        except ImportError:
-            self.skipTest("Incompatible Click version")
-
-        if not hasattr(_unicodefun, "_verify_python_env"):
-            self.skipTest("Incompatible Click version")
-
-        # First, let's see if Click is crashing with a preferred ASCII charset.
-        with patch("locale.getpreferredencoding") as gpe:
-            gpe.return_value = "ASCII"
-            with self.assertRaises(RuntimeError):
-                _unicodefun._verify_python_env()
-        # Now, let's silence Click...
-        black.patch_click()
-        # ...and confirm it's silent.
-        with patch("locale.getpreferredencoding") as gpe:
-            gpe.return_value = "ASCII"
-            try:
-                _unicodefun._verify_python_env()
-            except RuntimeError as re:
-                self.fail(f"`patch_click()` failed, exception still raised: {re}")
-
     def test_root_logger_not_used_directly(self) -> None:
         def fail(*args: Any, **kwargs: Any) -> None:
             self.fail("Record created with root logger")
     def test_root_logger_not_used_directly(self) -> None:
         def fail(*args: Any, **kwargs: Any) -> None:
             self.fail("Record created with root logger")
@@ -1620,6 +1584,39 @@ class BlackTestCase(BlackBaseTestCase):
         self.assertEqual(config["exclude"], r"\.pyi?$")
         self.assertEqual(config["include"], r"\.py?$")
 
         self.assertEqual(config["exclude"], r"\.pyi?$")
         self.assertEqual(config["include"], r"\.py?$")
 
+    def test_read_pyproject_toml_from_stdin(self) -> None:
+        with TemporaryDirectory() as workspace:
+            root = Path(workspace)
+
+            src_dir = root / "src"
+            src_dir.mkdir()
+
+            src_pyproject = src_dir / "pyproject.toml"
+            src_pyproject.touch()
+
+            test_toml_content = (THIS_DIR / "test.toml").read_text(encoding="utf-8")
+            src_pyproject.write_text(test_toml_content, encoding="utf-8")
+
+            src_python = src_dir / "foo.py"
+            src_python.touch()
+
+            fake_ctx = FakeContext()
+            fake_ctx.params["src"] = ("-",)
+            fake_ctx.params["stdin_filename"] = str(src_python)
+
+            with change_directory(root):
+                black.read_pyproject_toml(fake_ctx, FakeParameter(), None)
+
+            config = fake_ctx.default_map
+            self.assertEqual(config["verbose"], "1")
+            self.assertEqual(config["check"], "no")
+            self.assertEqual(config["diff"], "y")
+            self.assertEqual(config["color"], "True")
+            self.assertEqual(config["line_length"], "79")
+            self.assertEqual(config["target_version"], ["py36", "py37", "py38"])
+            self.assertEqual(config["exclude"], r"\.pyi?$")
+            self.assertEqual(config["include"], r"\.py?$")
+
     @pytest.mark.incompatible_with_mypyc
     def test_find_project_root(self) -> None:
         with TemporaryDirectory() as workspace:
     @pytest.mark.incompatible_with_mypyc
     def test_find_project_root(self) -> None:
         with TemporaryDirectory() as workspace:
@@ -1951,22 +1948,23 @@ class TestCaching:
         mode = DEFAULT_MODE
         with cache_dir() as workspace:
             cache_file = get_cache_file(mode)
         mode = DEFAULT_MODE
         with cache_dir() as workspace:
             cache_file = get_cache_file(mode)
-            cache_file.write_text("this is not a pickle")
-            assert black.read_cache(mode) == {}
+            cache_file.write_text("this is not a pickle", encoding="utf-8")
+            assert black.Cache.read(mode).file_data == {}
             src = (workspace / "test.py").resolve()
             src = (workspace / "test.py").resolve()
-            src.write_text("print('hello')")
+            src.write_text("print('hello')", encoding="utf-8")
             invokeBlack([str(src)])
             invokeBlack([str(src)])
-            cache = black.read_cache(mode)
-            assert str(src) in cache
+            cache = black.Cache.read(mode)
+            assert not cache.is_changed(src)
 
     def test_cache_single_file_already_cached(self) -> None:
         mode = DEFAULT_MODE
         with cache_dir() as workspace:
             src = (workspace / "test.py").resolve()
 
     def test_cache_single_file_already_cached(self) -> None:
         mode = DEFAULT_MODE
         with cache_dir() as workspace:
             src = (workspace / "test.py").resolve()
-            src.write_text("print('hello')")
-            black.write_cache({}, [src], mode)
+            src.write_text("print('hello')", encoding="utf-8")
+            cache = black.Cache.read(mode)
+            cache.write([src])
             invokeBlack([str(src)])
             invokeBlack([str(src)])
-            assert src.read_text() == "print('hello')"
+            assert src.read_text(encoding="utf-8") == "print('hello')"
 
     @event_loop()
     def test_cache_multiple_files(self) -> None:
 
     @event_loop()
     def test_cache_multiple_files(self) -> None:
@@ -1975,30 +1973,26 @@ class TestCaching:
             "concurrent.futures.ProcessPoolExecutor", new=ThreadPoolExecutor
         ):
             one = (workspace / "one.py").resolve()
             "concurrent.futures.ProcessPoolExecutor", new=ThreadPoolExecutor
         ):
             one = (workspace / "one.py").resolve()
-            with one.open("w") as fobj:
-                fobj.write("print('hello')")
+            one.write_text("print('hello')", encoding="utf-8")
             two = (workspace / "two.py").resolve()
             two = (workspace / "two.py").resolve()
-            with two.open("w") as fobj:
-                fobj.write("print('hello')")
-            black.write_cache({}, [one], mode)
+            two.write_text("print('hello')", encoding="utf-8")
+            cache = black.Cache.read(mode)
+            cache.write([one])
             invokeBlack([str(workspace)])
             invokeBlack([str(workspace)])
-            with one.open("r") as fobj:
-                assert fobj.read() == "print('hello')"
-            with two.open("r") as fobj:
-                assert fobj.read() == 'print("hello")\n'
-            cache = black.read_cache(mode)
-            assert str(one) in cache
-            assert str(two) in cache
+            assert one.read_text(encoding="utf-8") == "print('hello')"
+            assert two.read_text(encoding="utf-8") == 'print("hello")\n'
+            cache = black.Cache.read(mode)
+            assert not cache.is_changed(one)
+            assert not cache.is_changed(two)
 
     @pytest.mark.parametrize("color", [False, True], ids=["no-color", "with-color"])
     def test_no_cache_when_writeback_diff(self, color: bool) -> None:
         mode = DEFAULT_MODE
         with cache_dir() as workspace:
             src = (workspace / "test.py").resolve()
 
     @pytest.mark.parametrize("color", [False, True], ids=["no-color", "with-color"])
     def test_no_cache_when_writeback_diff(self, color: bool) -> None:
         mode = DEFAULT_MODE
         with cache_dir() as workspace:
             src = (workspace / "test.py").resolve()
-            with src.open("w") as fobj:
-                fobj.write("print('hello')")
-            with patch("black.read_cache") as read_cache, patch(
-                "black.write_cache"
+            src.write_text("print('hello')", encoding="utf-8")
+            with patch.object(black.Cache, "read") as read_cache, patch.object(
+                black.Cache, "write"
             ) as write_cache:
                 cmd = [str(src), "--diff"]
                 if color:
             ) as write_cache:
                 cmd = [str(src), "--diff"]
                 if color:
@@ -2006,8 +2000,8 @@ class TestCaching:
                 invokeBlack(cmd)
                 cache_file = get_cache_file(mode)
                 assert cache_file.exists() is False
                 invokeBlack(cmd)
                 cache_file = get_cache_file(mode)
                 assert cache_file.exists() is False
+                read_cache.assert_called_once()
                 write_cache.assert_not_called()
                 write_cache.assert_not_called()
-                read_cache.assert_not_called()
 
     @pytest.mark.parametrize("color", [False, True], ids=["no-color", "with-color"])
     @event_loop()
 
     @pytest.mark.parametrize("color", [False, True], ids=["no-color", "with-color"])
     @event_loop()
@@ -2015,8 +2009,7 @@ class TestCaching:
         with cache_dir() as workspace:
             for tag in range(0, 4):
                 src = (workspace / f"test{tag}.py").resolve()
         with cache_dir() as workspace:
             for tag in range(0, 4):
                 src = (workspace / f"test{tag}.py").resolve()
-                with src.open("w") as fobj:
-                    fobj.write("print('hello')")
+                src.write_text("print('hello')", encoding="utf-8")
             with patch(
                 "black.concurrency.Manager", wraps=multiprocessing.Manager
             ) as mgr:
             with patch(
                 "black.concurrency.Manager", wraps=multiprocessing.Manager
             ) as mgr:
@@ -2041,17 +2034,17 @@ class TestCaching:
     def test_read_cache_no_cachefile(self) -> None:
         mode = DEFAULT_MODE
         with cache_dir():
     def test_read_cache_no_cachefile(self) -> None:
         mode = DEFAULT_MODE
         with cache_dir():
-            assert black.read_cache(mode) == {}
+            assert black.Cache.read(mode).file_data == {}
 
     def test_write_cache_read_cache(self) -> None:
         mode = DEFAULT_MODE
         with cache_dir() as workspace:
             src = (workspace / "test.py").resolve()
             src.touch()
 
     def test_write_cache_read_cache(self) -> None:
         mode = DEFAULT_MODE
         with cache_dir() as workspace:
             src = (workspace / "test.py").resolve()
             src.touch()
-            black.write_cache({}, [src], mode)
-            cache = black.read_cache(mode)
-            assert str(src) in cache
-            assert cache[str(src)] == black.get_cache_info(src)
+            write_cache = black.Cache.read(mode)
+            write_cache.write([src])
+            read_cache = black.Cache.read(mode)
+            assert not read_cache.is_changed(src)
 
     def test_filter_cached(self) -> None:
         with TemporaryDirectory() as workspace:
 
     def test_filter_cached(self) -> None:
         with TemporaryDirectory() as workspace:
@@ -2062,21 +2055,67 @@ class TestCaching:
             uncached.touch()
             cached.touch()
             cached_but_changed.touch()
             uncached.touch()
             cached.touch()
             cached_but_changed.touch()
-            cache = {
-                str(cached): black.get_cache_info(cached),
-                str(cached_but_changed): (0.0, 0),
-            }
-            todo, done = black.cache.filter_cached(
-                cache, {uncached, cached, cached_but_changed}
-            )
+            cache = black.Cache.read(DEFAULT_MODE)
+
+            orig_func = black.Cache.get_file_data
+
+            def wrapped_func(path: Path) -> FileData:
+                if path == cached:
+                    return orig_func(path)
+                if path == cached_but_changed:
+                    return FileData(0.0, 0, "")
+                raise AssertionError
+
+            with patch.object(black.Cache, "get_file_data", side_effect=wrapped_func):
+                cache.write([cached, cached_but_changed])
+            todo, done = cache.filtered_cached({uncached, cached, cached_but_changed})
             assert todo == {uncached, cached_but_changed}
             assert done == {cached}
 
             assert todo == {uncached, cached_but_changed}
             assert done == {cached}
 
+    def test_filter_cached_hash(self) -> None:
+        with TemporaryDirectory() as workspace:
+            path = Path(workspace)
+            src = (path / "test.py").resolve()
+            src.write_text("print('hello')", encoding="utf-8")
+            st = src.stat()
+            cache = black.Cache.read(DEFAULT_MODE)
+            cache.write([src])
+            cached_file_data = cache.file_data[str(src)]
+
+            todo, done = cache.filtered_cached([src])
+            assert todo == set()
+            assert done == {src}
+            assert cached_file_data.st_mtime == st.st_mtime
+
+            # Modify st_mtime
+            cached_file_data = cache.file_data[str(src)] = FileData(
+                cached_file_data.st_mtime - 1,
+                cached_file_data.st_size,
+                cached_file_data.hash,
+            )
+            todo, done = cache.filtered_cached([src])
+            assert todo == set()
+            assert done == {src}
+            assert cached_file_data.st_mtime < st.st_mtime
+            assert cached_file_data.st_size == st.st_size
+            assert cached_file_data.hash == black.Cache.hash_digest(src)
+
+            # Modify contents
+            src.write_text("print('hello world')", encoding="utf-8")
+            new_st = src.stat()
+            todo, done = cache.filtered_cached([src])
+            assert todo == {src}
+            assert done == set()
+            assert cached_file_data.st_mtime < new_st.st_mtime
+            assert cached_file_data.st_size != new_st.st_size
+            assert cached_file_data.hash != black.Cache.hash_digest(src)
+
     def test_write_cache_creates_directory_if_needed(self) -> None:
         mode = DEFAULT_MODE
         with cache_dir(exists=False) as workspace:
             assert not workspace.exists()
     def test_write_cache_creates_directory_if_needed(self) -> None:
         mode = DEFAULT_MODE
         with cache_dir(exists=False) as workspace:
             assert not workspace.exists()
-            black.write_cache({}, [], mode)
+            cache = black.Cache.read(mode)
+            cache.write([])
             assert workspace.exists()
 
     @event_loop()
             assert workspace.exists()
 
     @event_loop()
@@ -2086,21 +2125,21 @@ class TestCaching:
             "concurrent.futures.ProcessPoolExecutor", new=ThreadPoolExecutor
         ):
             failing = (workspace / "failing.py").resolve()
             "concurrent.futures.ProcessPoolExecutor", new=ThreadPoolExecutor
         ):
             failing = (workspace / "failing.py").resolve()
-            with failing.open("w") as fobj:
-                fobj.write("not actually python")
+            failing.write_text("not actually python", encoding="utf-8")
             clean = (workspace / "clean.py").resolve()
             clean = (workspace / "clean.py").resolve()
-            with clean.open("w") as fobj:
-                fobj.write('print("hello")\n')
+            clean.write_text('print("hello")\n', encoding="utf-8")
             invokeBlack([str(workspace)], exit_code=123)
             invokeBlack([str(workspace)], exit_code=123)
-            cache = black.read_cache(mode)
-            assert str(failing) not in cache
-            assert str(clean) in cache
+            cache = black.Cache.read(mode)
+            assert cache.is_changed(failing)
+            assert not cache.is_changed(clean)
 
     def test_write_cache_write_fail(self) -> None:
         mode = DEFAULT_MODE
 
     def test_write_cache_write_fail(self) -> None:
         mode = DEFAULT_MODE
-        with cache_dir(), patch.object(Path, "open") as mock:
-            mock.side_effect = OSError
-            black.write_cache({}, [], mode)
+        with cache_dir():
+            cache = black.Cache.read(mode)
+            with patch.object(Path, "open") as mock:
+                mock.side_effect = OSError
+                cache.write([])
 
     def test_read_cache_line_lengths(self) -> None:
         mode = DEFAULT_MODE
 
     def test_read_cache_line_lengths(self) -> None:
         mode = DEFAULT_MODE
@@ -2108,18 +2147,19 @@ class TestCaching:
         with cache_dir() as workspace:
             path = (workspace / "file.py").resolve()
             path.touch()
         with cache_dir() as workspace:
             path = (workspace / "file.py").resolve()
             path.touch()
-            black.write_cache({}, [path], mode)
-            one = black.read_cache(mode)
-            assert str(path) in one
-            two = black.read_cache(short_mode)
-            assert str(path) not in two
+            cache = black.Cache.read(mode)
+            cache.write([path])
+            one = black.Cache.read(mode)
+            assert not one.is_changed(path)
+            two = black.Cache.read(short_mode)
+            assert two.is_changed(path)
 
 
 def assert_collected_sources(
     src: Sequence[Union[str, Path]],
     expected: Sequence[Union[str, Path]],
     *,
 
 
 def assert_collected_sources(
     src: Sequence[Union[str, Path]],
     expected: Sequence[Union[str, Path]],
     *,
-    ctx: Optional[FakeContext] = None,
+    root: Optional[Path] = None,
     exclude: Optional[str] = None,
     include: Optional[str] = None,
     extend_exclude: Optional[str] = None,
     exclude: Optional[str] = None,
     include: Optional[str] = None,
     extend_exclude: Optional[str] = None,
@@ -2135,7 +2175,7 @@ def assert_collected_sources(
     )
     gs_force_exclude = None if force_exclude is None else compile_pattern(force_exclude)
     collected = black.get_sources(
     )
     gs_force_exclude = None if force_exclude is None else compile_pattern(force_exclude)
     collected = black.get_sources(
-        ctx=ctx or FakeContext(),
+        root=root or THIS_DIR,
         src=gs_src,
         quiet=False,
         verbose=False,
         src=gs_src,
         quiet=False,
         verbose=False,
@@ -2171,9 +2211,7 @@ class TestFileCollection:
             base / "b/.definitely_exclude/a.pyi",
         ]
         src = [base / "b/"]
             base / "b/.definitely_exclude/a.pyi",
         ]
         src = [base / "b/"]
-        ctx = FakeContext()
-        ctx.obj["root"] = base
-        assert_collected_sources(src, expected, ctx=ctx, extend_exclude=r"/exclude/")
+        assert_collected_sources(src, expected, root=base, extend_exclude=r"/exclude/")
 
     def test_gitignore_used_on_multiple_sources(self) -> None:
         root = Path(DATA_DIR / "gitignore_used_on_multiple_sources")
 
     def test_gitignore_used_on_multiple_sources(self) -> None:
         root = Path(DATA_DIR / "gitignore_used_on_multiple_sources")
@@ -2181,10 +2219,8 @@ class TestFileCollection:
             root / "dir1" / "b.py",
             root / "dir2" / "b.py",
         ]
             root / "dir1" / "b.py",
             root / "dir2" / "b.py",
         ]
-        ctx = FakeContext()
-        ctx.obj["root"] = root
         src = [root / "dir1", root / "dir2"]
         src = [root / "dir1", root / "dir2"]
-        assert_collected_sources(src, expected, ctx=ctx)
+        assert_collected_sources(src, expected, root=root)
 
     @patch("black.find_project_root", lambda *args: (THIS_DIR.resolve(), None))
     def test_exclude_for_issue_1572(self) -> None:
 
     @patch("black.find_project_root", lambda *args: (THIS_DIR.resolve(), None))
     def test_exclude_for_issue_1572(self) -> None:
@@ -2290,9 +2326,7 @@ class TestFileCollection:
         # If gitignore with */* is in root
         root = Path(DATA_DIR / "ignore_subfolders_gitignore_tests" / "subdir")
         expected = [root / "b.py"]
         # If gitignore with */* is in root
         root = Path(DATA_DIR / "ignore_subfolders_gitignore_tests" / "subdir")
         expected = [root / "b.py"]
-        ctx = FakeContext()
-        ctx.obj["root"] = root
-        assert_collected_sources([root], expected, ctx=ctx)
+        assert_collected_sources([root], expected, root=root)
 
         # If .gitignore with */* is nested
         root = Path(DATA_DIR / "ignore_subfolders_gitignore_tests")
 
         # If .gitignore with */* is nested
         root = Path(DATA_DIR / "ignore_subfolders_gitignore_tests")
@@ -2300,17 +2334,13 @@ class TestFileCollection:
             root / "a.py",
             root / "subdir" / "b.py",
         ]
             root / "a.py",
             root / "subdir" / "b.py",
         ]
-        ctx = FakeContext()
-        ctx.obj["root"] = root
-        assert_collected_sources([root], expected, ctx=ctx)
+        assert_collected_sources([root], expected, root=root)
 
         # If command is executed from outer dir
         root = Path(DATA_DIR / "ignore_subfolders_gitignore_tests")
         target = root / "subdir"
         expected = [target / "b.py"]
 
         # If command is executed from outer dir
         root = Path(DATA_DIR / "ignore_subfolders_gitignore_tests")
         target = root / "subdir"
         expected = [target / "b.py"]
-        ctx = FakeContext()
-        ctx.obj["root"] = root
-        assert_collected_sources([target], expected, ctx=ctx)
+        assert_collected_sources([target], expected, root=root)
 
     def test_empty_include(self) -> None:
         path = DATA_DIR / "include_exclude_tests"
 
     def test_empty_include(self) -> None:
         path = DATA_DIR / "include_exclude_tests"
@@ -2343,38 +2373,48 @@ class TestFileCollection:
         )
 
     @pytest.mark.incompatible_with_mypyc
         )
 
     @pytest.mark.incompatible_with_mypyc
-    def test_symlink_out_of_root_directory(self) -> None:
+    def test_symlinks(self) -> None:
         path = MagicMock()
         root = THIS_DIR.resolve()
         path = MagicMock()
         root = THIS_DIR.resolve()
-        child = MagicMock()
         include = re.compile(black.DEFAULT_INCLUDES)
         exclude = re.compile(black.DEFAULT_EXCLUDES)
         report = black.Report()
         gitignore = PathSpec.from_lines("gitwildmatch", [])
         include = re.compile(black.DEFAULT_INCLUDES)
         exclude = re.compile(black.DEFAULT_EXCLUDES)
         report = black.Report()
         gitignore = PathSpec.from_lines("gitwildmatch", [])
-        # `child` should behave like a symlink which resolved path is clearly
-        # outside of the `root` directory.
-        path.iterdir.return_value = [child]
-        child.resolve.return_value = Path("/a/b/c")
-        child.as_posix.return_value = "/a/b/c"
-        try:
-            list(
-                black.gen_python_files(
-                    path.iterdir(),
-                    root,
-                    include,
-                    exclude,
-                    None,
-                    None,
-                    report,
-                    {path: gitignore},
-                    verbose=False,
-                    quiet=False,
-                )
+
+        regular = MagicMock()
+        outside_root_symlink = MagicMock()
+        ignored_symlink = MagicMock()
+
+        path.iterdir.return_value = [regular, outside_root_symlink, ignored_symlink]
+
+        regular.absolute.return_value = root / "regular.py"
+        regular.resolve.return_value = root / "regular.py"
+        regular.is_dir.return_value = False
+
+        outside_root_symlink.absolute.return_value = root / "symlink.py"
+        outside_root_symlink.resolve.return_value = Path("/nowhere")
+
+        ignored_symlink.absolute.return_value = root / ".mypy_cache" / "symlink.py"
+
+        files = list(
+            black.gen_python_files(
+                path.iterdir(),
+                root,
+                include,
+                exclude,
+                None,
+                None,
+                report,
+                {path: gitignore},
+                verbose=False,
+                quiet=False,
             )
             )
-        except ValueError as ve:
-            pytest.fail(f"`get_python_files_in_dir()` failed: {ve}")
+        )
+        assert files == [regular]
+
         path.iterdir.assert_called_once()
         path.iterdir.assert_called_once()
-        child.resolve.assert_called_once()
+        outside_root_symlink.resolve.assert_called_once()
+        ignored_symlink.resolve.assert_not_called()
 
     @patch("black.find_project_root", lambda *args: (THIS_DIR.resolve(), None))
     def test_get_sources_with_stdin(self) -> None:
 
     @patch("black.find_project_root", lambda *args: (THIS_DIR.resolve(), None))
     def test_get_sources_with_stdin(self) -> None: