X-Git-Url: https://git.madduck.net/etc/vim.git/blobdiff_plain/898915d5569f503c278ef53cb6f10e003034943c..5d5bf6e0878539baeef797b87636235b8c02be3f:/tests/test_black.py?ds=inline diff --git a/tests/test_black.py b/tests/test_black.py index 5f2e6f5..537ca80 100644 --- a/tests/test_black.py +++ b/tests/test_black.py @@ -9,7 +9,6 @@ import os import re import sys import types -import unittest from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager, redirect_stderr from dataclasses import replace @@ -41,7 +40,7 @@ import black import black.files from black import Feature, TargetVersion from black import re_compile_maybe_verbose as compile_pattern -from black.cache import get_cache_dir, get_cache_file +from black.cache import FileData, get_cache_dir, get_cache_file from black.debug import DebugVisitor from black.output import color_diff, diff from black.report import Report @@ -104,6 +103,7 @@ class FakeContext(click.Context): def __init__(self) -> None: self.default_map: Dict[str, Any] = {} + self.params: Dict[str, Any] = {} # Dummy root, since most of the tests don't care about it self.obj: Dict[str, Any] = {"root": PROJECT_ROOT} @@ -148,8 +148,7 @@ class BlackTestCase(BlackBaseTestCase): tmp_file = Path(black.dump_to_file()) try: self.assertFalse(ff(tmp_file, write_back=black.WriteBack.YES)) - with open(tmp_file, encoding="utf8") as f: - actual = f.read() + actual = tmp_file.read_text(encoding="utf-8") finally: os.unlink(tmp_file) self.assertFormatEqual(expected, actual) @@ -177,7 +176,7 @@ class BlackTestCase(BlackBaseTestCase): ff(tmp_file, mode=mode, write_back=black.WriteBack.YES) ) with open(tmp_file, "rb") as f: - actual = f.read().decode("utf8") + actual = f.read().decode("utf-8") finally: os.unlink(tmp_file) self.assertFormatEqual(expected, actual) @@ -188,7 +187,9 @@ class BlackTestCase(BlackBaseTestCase): ) def test_piping(self) -> None: - source, expected = read_data_from_file(PROJECT_ROOT / "src/black/__init__.py") + _, source, expected = read_data_from_file( + PROJECT_ROOT / "src/black/__init__.py" + ) result = BlackRunner().invoke( black.main, [ @@ -197,7 +198,7 @@ class BlackTestCase(BlackBaseTestCase): f"--line-length={black.DEFAULT_LINE_LENGTH}", f"--config={EMPTY_CONFIG}", ], - input=BytesIO(source.encode("utf8")), + input=BytesIO(source.encode("utf-8")), ) self.assertEqual(result.exit_code, 0) self.assertFormatEqual(expected, result.output) @@ -210,8 +211,8 @@ class BlackTestCase(BlackBaseTestCase): r"(STDIN|STDOUT)\t\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d\.\d\d\d\d\d\d" r"\+\d\d:\d\d" ) - source, _ = read_data("simple_cases", "expression.py") - expected, _ = read_data("simple_cases", "expression.diff") + source, _ = read_data("cases", "expression.py") + expected, _ = read_data("cases", "expression.diff") args = [ "-", "--fast", @@ -220,7 +221,7 @@ class BlackTestCase(BlackBaseTestCase): f"--config={EMPTY_CONFIG}", ] result = BlackRunner().invoke( - black.main, args, input=BytesIO(source.encode("utf8")) + black.main, args, input=BytesIO(source.encode("utf-8")) ) self.assertEqual(result.exit_code, 0) actual = diff_header.sub(DETERMINISTIC_HEADER, result.output) @@ -228,7 +229,7 @@ class BlackTestCase(BlackBaseTestCase): self.assertEqual(expected, actual) def test_piping_diff_with_color(self) -> None: - source, _ = read_data("simple_cases", "expression.py") + source, _ = read_data("cases", "expression.py") args = [ "-", "--fast", @@ -238,7 +239,7 @@ class BlackTestCase(BlackBaseTestCase): f"--config={EMPTY_CONFIG}", ] result = BlackRunner().invoke( - black.main, args, input=BytesIO(source.encode("utf8")) + black.main, args, input=BytesIO(source.encode("utf-8")) ) actual = result.output # Again, the contents are checked in a different test, so only look for colors. @@ -264,7 +265,7 @@ class BlackTestCase(BlackBaseTestCase): black.assert_stable(source, actual, black.FileMode()) def test_pep_572_version_detection(self) -> None: - source, _ = read_data("py_38", "pep_572") + source, _ = read_data("cases", "pep_572") root = black.lib2to3_parse(source) features = black.get_features_used(root) self.assertIn(black.Feature.ASSIGNMENT_EXPRESSIONS, features) @@ -273,7 +274,7 @@ class BlackTestCase(BlackBaseTestCase): def test_pep_695_version_detection(self) -> None: for file in ("type_aliases", "type_params"): - source, _ = read_data("py_312", file) + source, _ = read_data("cases", file) root = black.lib2to3_parse(source) features = black.get_features_used(root) self.assertIn(black.Feature.TYPE_PARAMS, features) @@ -281,12 +282,11 @@ class BlackTestCase(BlackBaseTestCase): self.assertIn(black.TargetVersion.PY312, versions) def test_expression_ff(self) -> None: - source, expected = read_data("simple_cases", "expression.py") + source, expected = read_data("cases", "expression.py") tmp_file = Path(black.dump_to_file(source)) try: self.assertTrue(ff(tmp_file, write_back=black.WriteBack.YES)) - with open(tmp_file, encoding="utf8") as f: - actual = f.read() + actual = tmp_file.read_text(encoding="utf-8") finally: os.unlink(tmp_file) self.assertFormatEqual(expected, actual) @@ -295,8 +295,8 @@ class BlackTestCase(BlackBaseTestCase): black.assert_stable(source, actual, DEFAULT_MODE) def test_expression_diff(self) -> None: - source, _ = read_data("simple_cases", "expression.py") - expected, _ = read_data("simple_cases", "expression.diff") + source, _ = read_data("cases", "expression.py") + expected, _ = read_data("cases", "expression.diff") tmp_file = Path(black.dump_to_file(source)) diff_header = re.compile( rf"{re.escape(str(tmp_file))}\t\d\d\d\d-\d\d-\d\d " @@ -321,8 +321,8 @@ class BlackTestCase(BlackBaseTestCase): self.assertEqual(expected, actual, msg) def test_expression_diff_with_color(self) -> None: - source, _ = read_data("simple_cases", "expression.py") - expected, _ = read_data("simple_cases", "expression.diff") + source, _ = read_data("cases", "expression.py") + expected, _ = read_data("cases", "expression.diff") tmp_file = Path(black.dump_to_file(source)) try: result = BlackRunner().invoke( @@ -341,7 +341,7 @@ class BlackTestCase(BlackBaseTestCase): self.assertIn("\033[0m", actual) def test_detect_pos_only_arguments(self) -> None: - source, _ = read_data("py_38", "pep_570") + source, _ = read_data("cases", "pep_570") root = black.lib2to3_parse(source) features = black.get_features_used(root) self.assertIn(black.Feature.POS_ONLY_ARGUMENTS, features) @@ -389,8 +389,7 @@ class BlackTestCase(BlackBaseTestCase): black.main, [str(tmp_file), "-x", f"--config={EMPTY_CONFIG}"] ) self.assertEqual(result.exit_code, 0) - with open(tmp_file, encoding="utf8") as f: - actual = f.read() + actual = tmp_file.read_text(encoding="utf-8") self.assertFormatEqual(source, actual) def test_skip_source_first_line_when_mixing_newlines(self) -> None: @@ -404,7 +403,7 @@ class BlackTestCase(BlackBaseTestCase): self.assertEqual(test_file.read_bytes(), expected) def test_skip_magic_trailing_comma(self) -> None: - source, _ = read_data("simple_cases", "expression") + source, _ = read_data("cases", "expression") expected, _ = read_data( "miscellaneous", "expression_skip_magic_trailing_comma.diff" ) @@ -436,7 +435,7 @@ class BlackTestCase(BlackBaseTestCase): @patch("black.dump_to_file", dump_to_stderr) def test_async_as_identifier(self) -> None: source_path = get_case_path("miscellaneous", "async_as_identifier") - source, expected = read_data_from_file(source_path) + _, source, expected = read_data_from_file(source_path) actual = fs(source) self.assertFormatEqual(expected, actual) major, minor = sys.version_info[:2] @@ -450,8 +449,8 @@ class BlackTestCase(BlackBaseTestCase): @patch("black.dump_to_file", dump_to_stderr) def test_python37(self) -> None: - source_path = get_case_path("py_37", "python37") - source, expected = read_data_from_file(source_path) + source_path = get_case_path("cases", "python37") + _, source, expected = read_data_from_file(source_path) actual = fs(source) self.assertFormatEqual(expected, actual) major, minor = sys.version_info[:2] @@ -494,9 +493,7 @@ class BlackTestCase(BlackBaseTestCase): project_root = Path(THIS_DIR / "data" / "nested_gitignore_tests") working_directory = project_root / "root" target_abspath = working_directory / "child" - target_contents = ( - src.relative_to(working_directory) for src in target_abspath.iterdir() - ) + target_contents = list(target_abspath.iterdir()) def mock_n_calls(responses: List[bool]) -> Callable[[], bool]: def _mocked_calls() -> bool: @@ -509,11 +506,11 @@ class BlackTestCase(BlackBaseTestCase): with patch("pathlib.Path.iterdir", return_value=target_contents), patch( "pathlib.Path.cwd", return_value=working_directory ), patch("pathlib.Path.is_dir", side_effect=mock_n_calls([True])): - ctx = FakeContext() - ctx.obj["root"] = project_root + # Note that the root folder (project_root) isn't the folder + # named "root" (aka working_directory) report = MagicMock(verbose=True) black.get_sources( - ctx=ctx, + root=project_root, src=("./child",), quiet=False, verbose=True, @@ -529,7 +526,7 @@ class BlackTestCase(BlackBaseTestCase): for _, mock_args, _ in report.path_ignored.mock_calls ), "A symbolic link was reported." report.path_ignored.assert_called_once_with( - Path("child", "b.py"), "matches a .gitignore file content" + Path("root", "child", "b.py"), "matches a .gitignore file content" ) def test_report_verbose(self) -> None: @@ -889,7 +886,7 @@ class BlackTestCase(BlackBaseTestCase): self.assertEqual(black.get_features_used(node), {Feature.NUMERIC_UNDERSCORES}) node = black.lib2to3_parse("123456\n") self.assertEqual(black.get_features_used(node), set()) - source, expected = read_data("simple_cases", "function") + source, expected = read_data("cases", "function") node = black.lib2to3_parse(source) expected_features = { Feature.TRAILING_COMMA_IN_CALL, @@ -899,7 +896,7 @@ class BlackTestCase(BlackBaseTestCase): self.assertEqual(black.get_features_used(node), expected_features) node = black.lib2to3_parse(expected) self.assertEqual(black.get_features_used(node), expected_features) - source, expected = read_data("simple_cases", "expression") + source, expected = read_data("cases", "expression") node = black.lib2to3_parse(source) self.assertEqual(black.get_features_used(node), set()) node = black.lib2to3_parse(expected) @@ -1051,9 +1048,10 @@ class BlackTestCase(BlackBaseTestCase): self.assertEqual(len(n.children), 1) self.assertEqual(n.children[0].type, black.token.ENDMARKER) + @patch("tests.conftest.PRINT_FULL_TREE", True) + @patch("tests.conftest.PRINT_TREE_DIFF", False) @pytest.mark.incompatible_with_mypyc - @unittest.skipIf(os.environ.get("SKIP_AST_PRINT"), "user set SKIP_AST_PRINT") - def test_assertFormatEqual(self) -> None: + def test_assertFormatEqual_print_full_tree(self) -> None: out_lines = [] err_lines = [] @@ -1072,6 +1070,29 @@ class BlackTestCase(BlackBaseTestCase): self.assertIn("Actual tree:", out_str) self.assertEqual("".join(err_lines), "") + @patch("tests.conftest.PRINT_FULL_TREE", False) + @patch("tests.conftest.PRINT_TREE_DIFF", True) + @pytest.mark.incompatible_with_mypyc + def test_assertFormatEqual_print_tree_diff(self) -> None: + out_lines = [] + err_lines = [] + + def out(msg: str, **kwargs: Any) -> None: + out_lines.append(msg) + + def err(msg: str, **kwargs: Any) -> None: + err_lines.append(msg) + + with patch("black.output._out", out), patch("black.output._err", err): + with self.assertRaises(AssertionError): + self.assertFormatEqual("j = [1, 2, 3]\n", "j = [1, 2, 3,]\n") + + out_str = "".join(out_lines) + self.assertIn("Tree Diff:", out_str) + self.assertIn("+ COMMA", out_str) + self.assertIn("+ ','", out_str) + self.assertEqual("".join(err_lines), "") + @event_loop() @patch("concurrent.futures.ProcessPoolExecutor", MagicMock(side_effect=OSError)) def test_works_in_mono_process_only_environment(self) -> None: @@ -1080,7 +1101,7 @@ class BlackTestCase(BlackBaseTestCase): (workspace / "one.py").resolve(), (workspace / "two.py").resolve(), ]: - f.write_text('print("hello")\n') + f.write_text('print("hello")\n', encoding="utf-8") self.invokeBlack([str(workspace)]) @event_loop() @@ -1090,7 +1111,7 @@ class BlackTestCase(BlackBaseTestCase): src1 = get_case_path("miscellaneous", "string_quotes") self.invokeBlack([str(src1), "--diff", "--check"], exit_code=1) # Files which will not be reformatted. - src2 = get_case_path("simple_cases", "composition") + src2 = get_case_path("cases", "composition") self.invokeBlack([str(src2), "--diff", "--check"]) # Multi file command. self.invokeBlack([str(src1), str(src2), "--diff", "--check"], exit_code=1) @@ -1117,16 +1138,14 @@ class BlackTestCase(BlackBaseTestCase): contents, expected = read_data("miscellaneous", "force_pyi") with cache_dir() as workspace: path = (workspace / "file.py").resolve() - with open(path, "w") as fh: - fh.write(contents) + path.write_text(contents, encoding="utf-8") self.invokeBlack([str(path), "--pyi"]) - with open(path, "r") as fh: - actual = fh.read() + actual = path.read_text(encoding="utf-8") # verify cache with --pyi is separate - pyi_cache = black.read_cache(pyi_mode) - self.assertIn(str(path), pyi_cache) - normal_cache = black.read_cache(DEFAULT_MODE) - self.assertNotIn(str(path), normal_cache) + pyi_cache = black.Cache.read(pyi_mode) + assert not pyi_cache.is_changed(path) + normal_cache = black.Cache.read(DEFAULT_MODE) + assert normal_cache.is_changed(path) self.assertFormatEqual(expected, actual) black.assert_equivalent(contents, actual) black.assert_stable(contents, actual, pyi_mode) @@ -1142,24 +1161,22 @@ class BlackTestCase(BlackBaseTestCase): (workspace / "file2.py").resolve(), ] for path in paths: - with open(path, "w") as fh: - fh.write(contents) + path.write_text(contents, encoding="utf-8") self.invokeBlack([str(p) for p in paths] + ["--pyi"]) for path in paths: - with open(path, "r") as fh: - actual = fh.read() + actual = path.read_text(encoding="utf-8") self.assertEqual(actual, expected) # verify cache with --pyi is separate - pyi_cache = black.read_cache(pyi_mode) - normal_cache = black.read_cache(reg_mode) + pyi_cache = black.Cache.read(pyi_mode) + normal_cache = black.Cache.read(reg_mode) for path in paths: - self.assertIn(str(path), pyi_cache) - self.assertNotIn(str(path), normal_cache) + assert not pyi_cache.is_changed(path) + assert normal_cache.is_changed(path) def test_pipe_force_pyi(self) -> None: source, expected = read_data("miscellaneous", "force_pyi") result = CliRunner().invoke( - black.main, ["-", "-q", "--pyi"], input=BytesIO(source.encode("utf8")) + black.main, ["-", "-q", "--pyi"], input=BytesIO(source.encode("utf-8")) ) self.assertEqual(result.exit_code, 0) actual = result.output @@ -1171,16 +1188,14 @@ class BlackTestCase(BlackBaseTestCase): source, expected = read_data("miscellaneous", "force_py36") with cache_dir() as workspace: path = (workspace / "file.py").resolve() - with open(path, "w") as fh: - fh.write(source) + path.write_text(source, encoding="utf-8") self.invokeBlack([str(path), *PY36_ARGS]) - with open(path, "r") as fh: - actual = fh.read() + actual = path.read_text(encoding="utf-8") # verify cache with --target-version is separate - py36_cache = black.read_cache(py36_mode) - self.assertIn(str(path), py36_cache) - normal_cache = black.read_cache(reg_mode) - self.assertNotIn(str(path), normal_cache) + py36_cache = black.Cache.read(py36_mode) + assert not py36_cache.is_changed(path) + normal_cache = black.Cache.read(reg_mode) + assert normal_cache.is_changed(path) self.assertEqual(actual, expected) @event_loop() @@ -1194,26 +1209,24 @@ class BlackTestCase(BlackBaseTestCase): (workspace / "file2.py").resolve(), ] for path in paths: - with open(path, "w") as fh: - fh.write(source) + path.write_text(source, encoding="utf-8") self.invokeBlack([str(p) for p in paths] + PY36_ARGS) for path in paths: - with open(path, "r") as fh: - actual = fh.read() + actual = path.read_text(encoding="utf-8") self.assertEqual(actual, expected) # verify cache with --target-version is separate - pyi_cache = black.read_cache(py36_mode) - normal_cache = black.read_cache(reg_mode) + pyi_cache = black.Cache.read(py36_mode) + normal_cache = black.Cache.read(reg_mode) for path in paths: - self.assertIn(str(path), pyi_cache) - self.assertNotIn(str(path), normal_cache) + assert not pyi_cache.is_changed(path) + assert normal_cache.is_changed(path) def test_pipe_force_py36(self) -> None: source, expected = read_data("miscellaneous", "force_py36") result = CliRunner().invoke( black.main, ["-", "-q", "--target-version=py36"], - input=BytesIO(source.encode("utf8")), + input=BytesIO(source.encode("utf-8")), ) self.assertEqual(result.exit_code, 0) actual = result.output @@ -1319,7 +1332,7 @@ class BlackTestCase(BlackBaseTestCase): report = MagicMock() # Even with an existing file, since we are forcing stdin, black # should output to stdout and not modify the file inplace - p = THIS_DIR / "data" / "simple_cases" / "collections.py" + p = THIS_DIR / "data" / "cases" / "collections.py" # Make sure is_file actually returns True self.assertTrue(p.is_file()) path = Path(f"__BLACK_STDIN_FILENAME__{p}") @@ -1442,11 +1455,11 @@ class BlackTestCase(BlackBaseTestCase): contents = nl.join(["def f( ):", " pass"]) runner = BlackRunner() result = runner.invoke( - black.main, ["-", "--fast"], input=BytesIO(contents.encode("utf8")) + black.main, ["-", "--fast"], input=BytesIO(contents.encode("utf-8")) ) self.assertEqual(result.exit_code, 0) output = result.stdout_bytes - self.assertIn(nl.encode("utf8"), output) + self.assertIn(nl.encode("utf-8"), output) if nl == "\n": self.assertNotIn(b"\r\n", output) @@ -1465,30 +1478,6 @@ class BlackTestCase(BlackBaseTestCase): with self.assertRaises(AssertionError): black.assert_equivalent("{}", "None") - def test_shhh_click(self) -> None: - try: - from click import _unicodefun # type: ignore - except ImportError: - self.skipTest("Incompatible Click version") - - if not hasattr(_unicodefun, "_verify_python_env"): - self.skipTest("Incompatible Click version") - - # First, let's see if Click is crashing with a preferred ASCII charset. - with patch("locale.getpreferredencoding") as gpe: - gpe.return_value = "ASCII" - with self.assertRaises(RuntimeError): - _unicodefun._verify_python_env() - # Now, let's silence Click... - black.patch_click() - # ...and confirm it's silent. - with patch("locale.getpreferredencoding") as gpe: - gpe.return_value = "ASCII" - try: - _unicodefun._verify_python_env() - except RuntimeError as re: - self.fail(f"`patch_click()` failed, exception still raised: {re}") - def test_root_logger_not_used_directly(self) -> None: def fail(*args: Any, **kwargs: Any) -> None: self.fail("Record created with root logger") @@ -1620,6 +1609,39 @@ class BlackTestCase(BlackBaseTestCase): self.assertEqual(config["exclude"], r"\.pyi?$") self.assertEqual(config["include"], r"\.py?$") + def test_read_pyproject_toml_from_stdin(self) -> None: + with TemporaryDirectory() as workspace: + root = Path(workspace) + + src_dir = root / "src" + src_dir.mkdir() + + src_pyproject = src_dir / "pyproject.toml" + src_pyproject.touch() + + test_toml_content = (THIS_DIR / "test.toml").read_text(encoding="utf-8") + src_pyproject.write_text(test_toml_content, encoding="utf-8") + + src_python = src_dir / "foo.py" + src_python.touch() + + fake_ctx = FakeContext() + fake_ctx.params["src"] = ("-",) + fake_ctx.params["stdin_filename"] = str(src_python) + + with change_directory(root): + black.read_pyproject_toml(fake_ctx, FakeParameter(), None) + + config = fake_ctx.default_map + self.assertEqual(config["verbose"], "1") + self.assertEqual(config["check"], "no") + self.assertEqual(config["diff"], "y") + self.assertEqual(config["color"], "True") + self.assertEqual(config["line_length"], "79") + self.assertEqual(config["target_version"], ["py36", "py37", "py38"]) + self.assertEqual(config["exclude"], r"\.pyi?$") + self.assertEqual(config["include"], r"\.py?$") + @pytest.mark.incompatible_with_mypyc def test_find_project_root(self) -> None: with TemporaryDirectory() as workspace: @@ -1941,32 +1963,33 @@ class TestCaching: # If BLACK_CACHE_DIR is not set, use user_cache_dir monkeypatch.delenv("BLACK_CACHE_DIR", raising=False) with patch_user_cache_dir: - assert get_cache_dir() == workspace1 + assert get_cache_dir().parent == workspace1 # If it is set, use the path provided in the env var. monkeypatch.setenv("BLACK_CACHE_DIR", str(workspace2)) - assert get_cache_dir() == workspace2 + assert get_cache_dir().parent == workspace2 def test_cache_broken_file(self) -> None: mode = DEFAULT_MODE with cache_dir() as workspace: cache_file = get_cache_file(mode) - cache_file.write_text("this is not a pickle") - assert black.read_cache(mode) == {} + cache_file.write_text("this is not a pickle", encoding="utf-8") + assert black.Cache.read(mode).file_data == {} src = (workspace / "test.py").resolve() - src.write_text("print('hello')") + src.write_text("print('hello')", encoding="utf-8") invokeBlack([str(src)]) - cache = black.read_cache(mode) - assert str(src) in cache + cache = black.Cache.read(mode) + assert not cache.is_changed(src) def test_cache_single_file_already_cached(self) -> None: mode = DEFAULT_MODE with cache_dir() as workspace: src = (workspace / "test.py").resolve() - src.write_text("print('hello')") - black.write_cache({}, [src], mode) + src.write_text("print('hello')", encoding="utf-8") + cache = black.Cache.read(mode) + cache.write([src]) invokeBlack([str(src)]) - assert src.read_text() == "print('hello')" + assert src.read_text(encoding="utf-8") == "print('hello')" @event_loop() def test_cache_multiple_files(self) -> None: @@ -1975,30 +1998,27 @@ class TestCaching: "concurrent.futures.ProcessPoolExecutor", new=ThreadPoolExecutor ): one = (workspace / "one.py").resolve() - with one.open("w") as fobj: - fobj.write("print('hello')") + one.write_text("print('hello')", encoding="utf-8") two = (workspace / "two.py").resolve() - with two.open("w") as fobj: - fobj.write("print('hello')") - black.write_cache({}, [one], mode) + two.write_text("print('hello')", encoding="utf-8") + cache = black.Cache.read(mode) + cache.write([one]) invokeBlack([str(workspace)]) - with one.open("r") as fobj: - assert fobj.read() == "print('hello')" - with two.open("r") as fobj: - assert fobj.read() == 'print("hello")\n' - cache = black.read_cache(mode) - assert str(one) in cache - assert str(two) in cache + assert one.read_text(encoding="utf-8") == "print('hello')" + assert two.read_text(encoding="utf-8") == 'print("hello")\n' + cache = black.Cache.read(mode) + assert not cache.is_changed(one) + assert not cache.is_changed(two) + @pytest.mark.incompatible_with_mypyc @pytest.mark.parametrize("color", [False, True], ids=["no-color", "with-color"]) def test_no_cache_when_writeback_diff(self, color: bool) -> None: mode = DEFAULT_MODE with cache_dir() as workspace: src = (workspace / "test.py").resolve() - with src.open("w") as fobj: - fobj.write("print('hello')") - with patch("black.read_cache") as read_cache, patch( - "black.write_cache" + src.write_text("print('hello')", encoding="utf-8") + with patch.object(black.Cache, "read") as read_cache, patch.object( + black.Cache, "write" ) as write_cache: cmd = [str(src), "--diff"] if color: @@ -2006,8 +2026,8 @@ class TestCaching: invokeBlack(cmd) cache_file = get_cache_file(mode) assert cache_file.exists() is False + read_cache.assert_called_once() write_cache.assert_not_called() - read_cache.assert_not_called() @pytest.mark.parametrize("color", [False, True], ids=["no-color", "with-color"]) @event_loop() @@ -2015,8 +2035,7 @@ class TestCaching: with cache_dir() as workspace: for tag in range(0, 4): src = (workspace / f"test{tag}.py").resolve() - with src.open("w") as fobj: - fobj.write("print('hello')") + src.write_text("print('hello')", encoding="utf-8") with patch( "black.concurrency.Manager", wraps=multiprocessing.Manager ) as mgr: @@ -2041,18 +2060,19 @@ class TestCaching: def test_read_cache_no_cachefile(self) -> None: mode = DEFAULT_MODE with cache_dir(): - assert black.read_cache(mode) == {} + assert black.Cache.read(mode).file_data == {} def test_write_cache_read_cache(self) -> None: mode = DEFAULT_MODE with cache_dir() as workspace: src = (workspace / "test.py").resolve() src.touch() - black.write_cache({}, [src], mode) - cache = black.read_cache(mode) - assert str(src) in cache - assert cache[str(src)] == black.get_cache_info(src) + write_cache = black.Cache.read(mode) + write_cache.write([src]) + read_cache = black.Cache.read(mode) + assert not read_cache.is_changed(src) + @pytest.mark.incompatible_with_mypyc def test_filter_cached(self) -> None: with TemporaryDirectory() as workspace: path = Path(workspace) @@ -2062,21 +2082,67 @@ class TestCaching: uncached.touch() cached.touch() cached_but_changed.touch() - cache = { - str(cached): black.get_cache_info(cached), - str(cached_but_changed): (0.0, 0), - } - todo, done = black.cache.filter_cached( - cache, {uncached, cached, cached_but_changed} - ) + cache = black.Cache.read(DEFAULT_MODE) + + orig_func = black.Cache.get_file_data + + def wrapped_func(path: Path) -> FileData: + if path == cached: + return orig_func(path) + if path == cached_but_changed: + return FileData(0.0, 0, "") + raise AssertionError + + with patch.object(black.Cache, "get_file_data", side_effect=wrapped_func): + cache.write([cached, cached_but_changed]) + todo, done = cache.filtered_cached({uncached, cached, cached_but_changed}) assert todo == {uncached, cached_but_changed} assert done == {cached} + def test_filter_cached_hash(self) -> None: + with TemporaryDirectory() as workspace: + path = Path(workspace) + src = (path / "test.py").resolve() + src.write_text("print('hello')", encoding="utf-8") + st = src.stat() + cache = black.Cache.read(DEFAULT_MODE) + cache.write([src]) + cached_file_data = cache.file_data[str(src)] + + todo, done = cache.filtered_cached([src]) + assert todo == set() + assert done == {src} + assert cached_file_data.st_mtime == st.st_mtime + + # Modify st_mtime + cached_file_data = cache.file_data[str(src)] = FileData( + cached_file_data.st_mtime - 1, + cached_file_data.st_size, + cached_file_data.hash, + ) + todo, done = cache.filtered_cached([src]) + assert todo == set() + assert done == {src} + assert cached_file_data.st_mtime < st.st_mtime + assert cached_file_data.st_size == st.st_size + assert cached_file_data.hash == black.Cache.hash_digest(src) + + # Modify contents + src.write_text("print('hello world')", encoding="utf-8") + new_st = src.stat() + todo, done = cache.filtered_cached([src]) + assert todo == {src} + assert done == set() + assert cached_file_data.st_mtime < new_st.st_mtime + assert cached_file_data.st_size != new_st.st_size + assert cached_file_data.hash != black.Cache.hash_digest(src) + def test_write_cache_creates_directory_if_needed(self) -> None: mode = DEFAULT_MODE with cache_dir(exists=False) as workspace: assert not workspace.exists() - black.write_cache({}, [], mode) + cache = black.Cache.read(mode) + cache.write([]) assert workspace.exists() @event_loop() @@ -2086,21 +2152,21 @@ class TestCaching: "concurrent.futures.ProcessPoolExecutor", new=ThreadPoolExecutor ): failing = (workspace / "failing.py").resolve() - with failing.open("w") as fobj: - fobj.write("not actually python") + failing.write_text("not actually python", encoding="utf-8") clean = (workspace / "clean.py").resolve() - with clean.open("w") as fobj: - fobj.write('print("hello")\n') + clean.write_text('print("hello")\n', encoding="utf-8") invokeBlack([str(workspace)], exit_code=123) - cache = black.read_cache(mode) - assert str(failing) not in cache - assert str(clean) in cache + cache = black.Cache.read(mode) + assert cache.is_changed(failing) + assert not cache.is_changed(clean) def test_write_cache_write_fail(self) -> None: mode = DEFAULT_MODE - with cache_dir(), patch.object(Path, "open") as mock: - mock.side_effect = OSError - black.write_cache({}, [], mode) + with cache_dir(): + cache = black.Cache.read(mode) + with patch.object(Path, "open") as mock: + mock.side_effect = OSError + cache.write([]) def test_read_cache_line_lengths(self) -> None: mode = DEFAULT_MODE @@ -2108,18 +2174,19 @@ class TestCaching: with cache_dir() as workspace: path = (workspace / "file.py").resolve() path.touch() - black.write_cache({}, [path], mode) - one = black.read_cache(mode) - assert str(path) in one - two = black.read_cache(short_mode) - assert str(path) not in two + cache = black.Cache.read(mode) + cache.write([path]) + one = black.Cache.read(mode) + assert not one.is_changed(path) + two = black.Cache.read(short_mode) + assert two.is_changed(path) def assert_collected_sources( src: Sequence[Union[str, Path]], expected: Sequence[Union[str, Path]], *, - ctx: Optional[FakeContext] = None, + root: Optional[Path] = None, exclude: Optional[str] = None, include: Optional[str] = None, extend_exclude: Optional[str] = None, @@ -2135,7 +2202,7 @@ def assert_collected_sources( ) gs_force_exclude = None if force_exclude is None else compile_pattern(force_exclude) collected = black.get_sources( - ctx=ctx or FakeContext(), + root=root or THIS_DIR, src=gs_src, quiet=False, verbose=False, @@ -2171,9 +2238,7 @@ class TestFileCollection: base / "b/.definitely_exclude/a.pyi", ] src = [base / "b/"] - ctx = FakeContext() - ctx.obj["root"] = base - assert_collected_sources(src, expected, ctx=ctx, extend_exclude=r"/exclude/") + assert_collected_sources(src, expected, root=base, extend_exclude=r"/exclude/") def test_gitignore_used_on_multiple_sources(self) -> None: root = Path(DATA_DIR / "gitignore_used_on_multiple_sources") @@ -2181,10 +2246,8 @@ class TestFileCollection: root / "dir1" / "b.py", root / "dir2" / "b.py", ] - ctx = FakeContext() - ctx.obj["root"] = root src = [root / "dir1", root / "dir2"] - assert_collected_sources(src, expected, ctx=ctx) + assert_collected_sources(src, expected, root=root) @patch("black.find_project_root", lambda *args: (THIS_DIR.resolve(), None)) def test_exclude_for_issue_1572(self) -> None: @@ -2290,9 +2353,7 @@ class TestFileCollection: # If gitignore with */* is in root root = Path(DATA_DIR / "ignore_subfolders_gitignore_tests" / "subdir") expected = [root / "b.py"] - ctx = FakeContext() - ctx.obj["root"] = root - assert_collected_sources([root], expected, ctx=ctx) + assert_collected_sources([root], expected, root=root) # If .gitignore with */* is nested root = Path(DATA_DIR / "ignore_subfolders_gitignore_tests") @@ -2300,17 +2361,13 @@ class TestFileCollection: root / "a.py", root / "subdir" / "b.py", ] - ctx = FakeContext() - ctx.obj["root"] = root - assert_collected_sources([root], expected, ctx=ctx) + assert_collected_sources([root], expected, root=root) # If command is executed from outer dir root = Path(DATA_DIR / "ignore_subfolders_gitignore_tests") target = root / "subdir" expected = [target / "b.py"] - ctx = FakeContext() - ctx.obj["root"] = root - assert_collected_sources([target], expected, ctx=ctx) + assert_collected_sources([target], expected, root=root) def test_empty_include(self) -> None: path = DATA_DIR / "include_exclude_tests" @@ -2343,38 +2400,48 @@ class TestFileCollection: ) @pytest.mark.incompatible_with_mypyc - def test_symlink_out_of_root_directory(self) -> None: + def test_symlinks(self) -> None: path = MagicMock() root = THIS_DIR.resolve() - child = MagicMock() include = re.compile(black.DEFAULT_INCLUDES) exclude = re.compile(black.DEFAULT_EXCLUDES) report = black.Report() gitignore = PathSpec.from_lines("gitwildmatch", []) - # `child` should behave like a symlink which resolved path is clearly - # outside of the `root` directory. - path.iterdir.return_value = [child] - child.resolve.return_value = Path("/a/b/c") - child.as_posix.return_value = "/a/b/c" - try: - list( - black.gen_python_files( - path.iterdir(), - root, - include, - exclude, - None, - None, - report, - {path: gitignore}, - verbose=False, - quiet=False, - ) + + regular = MagicMock() + outside_root_symlink = MagicMock() + ignored_symlink = MagicMock() + + path.iterdir.return_value = [regular, outside_root_symlink, ignored_symlink] + + regular.absolute.return_value = root / "regular.py" + regular.resolve.return_value = root / "regular.py" + regular.is_dir.return_value = False + + outside_root_symlink.absolute.return_value = root / "symlink.py" + outside_root_symlink.resolve.return_value = Path("/nowhere") + + ignored_symlink.absolute.return_value = root / ".mypy_cache" / "symlink.py" + + files = list( + black.gen_python_files( + path.iterdir(), + root, + include, + exclude, + None, + None, + report, + {path: gitignore}, + verbose=False, + quiet=False, ) - except ValueError as ve: - pytest.fail(f"`get_python_files_in_dir()` failed: {ve}") + ) + assert files == [regular] + path.iterdir.assert_called_once() - child.resolve.assert_called_once() + outside_root_symlink.resolve.assert_called_once() + ignored_symlink.resolve.assert_not_called() @patch("black.find_project_root", lambda *args: (THIS_DIR.resolve(), None)) def test_get_sources_with_stdin(self) -> None: @@ -2440,6 +2507,41 @@ class TestFileCollection: ) +class TestDeFactoAPI: + """Test that certain symbols that are commonly used externally keep working. + + We don't (yet) formally expose an API (see issue #779), but we should endeavor to + keep certain functions that external users commonly rely on working. + + """ + + def test_format_str(self) -> None: + # format_str and Mode should keep working + assert ( + black.format_str("print('hello')", mode=black.Mode()) == 'print("hello")\n' + ) + + # you can pass line length + assert ( + black.format_str("print('hello')", mode=black.Mode(line_length=42)) + == 'print("hello")\n' + ) + + # invalid input raises InvalidInput + with pytest.raises(black.InvalidInput): + black.format_str("syntax error", mode=black.Mode()) + + def test_format_file_contents(self) -> None: + # You probably should be using format_str() instead, but let's keep + # this one around since people do use it + assert ( + black.format_file_contents("x=1", fast=True, mode=black.Mode()) == "x = 1\n" + ) + + with pytest.raises(black.NothingChanged): + black.format_file_contents("x = 1\n", fast=True, mode=black.Mode()) + + try: with open(black.__file__, "r", encoding="utf-8") as _bf: black_source_lines = _bf.readlines()