#!/usr/bin/env python3
import asyncio
+import logging
from concurrent.futures import ThreadPoolExecutor
-from contextlib import contextmanager, redirect_stderr
+from contextlib import contextmanager
from functools import partial, wraps
from io import BytesIO, TextIOWrapper
import os
from click.testing import CliRunner
import black
-from black import Feature
+from black import Feature, TargetVersion
try:
import blackd
else:
has_blackd_deps = True
-
ff = partial(black.format_file_in_place, mode=black.FileMode(), fast=True)
fs = partial(black.format_str, mode=black.FileMode())
THIS_FILE = Path(__file__)
return wrapper
+@contextmanager
+def skip_if_exception(e: str) -> Iterator[None]:
+ try:
+ yield
+ except Exception as exc:
+ if exc.__class__.__name__ == e:
+ unittest.skip(f"Encountered expected exception {exc}, skipping")
+
+
class BlackRunner(CliRunner):
"""Modify CliRunner so that stderr is not merged with stdout.
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_function_trailing_comma(self) -> None:
+ source, expected = read_data("function_trailing_comma")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+
@patch("black.dump_to_file", dump_to_stderr)
def test_expression(self) -> None:
source, expected = read_data("expression")
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_pep_572(self) -> None:
+ source, expected = read_data("pep_572")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_stable(source, actual, black.FileMode())
+ if sys.version_info >= (3, 8):
+ black.assert_equivalent(source, actual)
+
+ def test_pep_572_version_detection(self) -> None:
+ source, _ = read_data("pep_572")
+ root = black.lib2to3_parse(source)
+ features = black.get_features_used(root)
+ self.assertIn(black.Feature.ASSIGNMENT_EXPRESSIONS, features)
+ versions = black.detect_target_versions(root)
+ self.assertIn(black.TargetVersion.PY38, versions)
+
def test_expression_ff(self) -> None:
source, expected = read_data("expression")
tmp_file = Path(black.dump_to_file(source))
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_pep_570(self) -> None:
+ source, expected = read_data("pep_570")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_stable(source, actual, black.FileMode())
+ if sys.version_info >= (3, 8):
+ black.assert_equivalent(source, actual)
+
+ def test_detect_pos_only_arguments(self) -> None:
+ source, _ = read_data("pep_570")
+ root = black.lib2to3_parse(source)
+ features = black.get_features_used(root)
+ self.assertIn(black.Feature.POS_ONLY_ARGUMENTS, features)
+ versions = black.detect_target_versions(root)
+ self.assertIn(black.TargetVersion.PY38, versions)
+
@patch("black.dump_to_file", dump_to_stderr)
def test_string_quotes(self) -> None:
source, expected = read_data("string_quotes")
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_comments7(self) -> None:
+ source, expected = read_data("comments7")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_comment_after_escaped_newline(self) -> None:
+ source, expected = read_data("comment_after_escaped_newline")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+
@patch("black.dump_to_file", dump_to_stderr)
def test_cantfit(self) -> None:
source, expected = read_data("cantfit")
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_remove_parens(self) -> None:
+ source, expected = read_data("remove_parens")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+
@patch("black.dump_to_file", dump_to_stderr)
def test_string_prefixes(self) -> None:
source, expected = read_data("string_prefixes")
source, expected = read_data("python2")
actual = fs(source)
self.assertFormatEqual(expected, actual)
- # black.assert_equivalent(source, actual)
+ black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_python2_print_function(self) -> None:
+ source, expected = read_data("python2_print_function")
+ mode = black.FileMode(target_versions={TargetVersion.PY27})
+ actual = fs(source, mode=mode)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, mode)
+
@patch("black.dump_to_file", dump_to_stderr)
def test_python2_unicode_literals(self) -> None:
source, expected = read_data("python2_unicode_literals")
actual = fs(source)
self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
@patch("black.dump_to_file", dump_to_stderr)
self.assertFormatEqual(expected, actual)
black.assert_stable(source, actual, mode)
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_async_as_identifier(self) -> None:
+ source_path = (THIS_DIR / "data" / "async_as_identifier.py").resolve()
+ source, expected = read_data("async_as_identifier")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ major, minor = sys.version_info[:2]
+ if major < 3 or (major <= 3 and minor < 7):
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+ # ensure black can parse this when the target is 3.6
+ self.invokeBlack([str(source_path), "--target-version", "py36"])
+ # but not on 3.7, because async/await is no longer an identifier
+ self.invokeBlack([str(source_path), "--target-version", "py37"], exit_code=123)
+
@patch("black.dump_to_file", dump_to_stderr)
def test_python37(self) -> None:
+ source_path = (THIS_DIR / "data" / "python37.py").resolve()
source, expected = read_data("python37")
actual = fs(source)
self.assertFormatEqual(expected, actual)
if major > 3 or (major == 3 and minor >= 7):
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ # ensure black can parse this when the target is 3.7
+ self.invokeBlack([str(source_path), "--target-version", "py37"])
+ # but not on 3.6, because we use async as a reserved keyword
+ self.invokeBlack([str(source_path), "--target-version", "py36"], exit_code=123)
@patch("black.dump_to_file", dump_to_stderr)
def test_fmtonoff(self) -> None:
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
- def test_comment_indentation(self) -> None:
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_tuple_assign(self) -> None:
+ source, expected = read_data("tupleassign")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+
+ def test_tab_comment_indentation(self) -> None:
contents_tab = "if 1:\n\tif 2:\n\t\tpass\n\t# comment\n\tpass\n"
contents_spc = "if 1:\n if 2:\n pass\n # comment\n pass\n"
-
- self.assertFormatEqual(fs(contents_spc), contents_spc)
- self.assertFormatEqual(fs(contents_tab), contents_spc)
+ self.assertFormatEqual(contents_spc, fs(contents_spc))
+ self.assertFormatEqual(contents_spc, fs(contents_tab))
contents_tab = "if 1:\n\tif 2:\n\t\tpass\n\t\t# comment\n\tpass\n"
contents_spc = "if 1:\n if 2:\n pass\n # comment\n pass\n"
+ self.assertFormatEqual(contents_spc, fs(contents_spc))
+ self.assertFormatEqual(contents_spc, fs(contents_tab))
+
+ # mixed tabs and spaces (valid Python 2 code)
+ contents_tab = "if 1:\n if 2:\n\t\tpass\n\t# comment\n pass\n"
+ contents_spc = "if 1:\n if 2:\n pass\n # comment\n pass\n"
+ self.assertFormatEqual(contents_spc, fs(contents_spc))
+ self.assertFormatEqual(contents_spc, fs(contents_tab))
- self.assertFormatEqual(fs(contents_tab), contents_spc)
- self.assertFormatEqual(fs(contents_spc), contents_spc)
+ contents_tab = "if 1:\n if 2:\n\t\tpass\n\t\t# comment\n pass\n"
+ contents_spc = "if 1:\n if 2:\n pass\n # comment\n pass\n"
+ self.assertFormatEqual(contents_spc, fs(contents_spc))
+ self.assertFormatEqual(contents_spc, fs(contents_tab))
def test_report_verbose(self) -> None:
report = black.Report(verbose=True)
"2 files would fail to reformat.",
)
+ def test_lib2to3_parse(self) -> None:
+ with self.assertRaises(black.InvalidInput):
+ black.lib2to3_parse("invalid syntax")
+
+ straddling = "x + y"
+ black.lib2to3_parse(straddling)
+ black.lib2to3_parse(straddling, {TargetVersion.PY27})
+ black.lib2to3_parse(straddling, {TargetVersion.PY36})
+ black.lib2to3_parse(straddling, {TargetVersion.PY27, TargetVersion.PY36})
+
+ py2_only = "print x"
+ black.lib2to3_parse(py2_only)
+ black.lib2to3_parse(py2_only, {TargetVersion.PY27})
+ with self.assertRaises(black.InvalidInput):
+ black.lib2to3_parse(py2_only, {TargetVersion.PY36})
+ with self.assertRaises(black.InvalidInput):
+ black.lib2to3_parse(py2_only, {TargetVersion.PY27, TargetVersion.PY36})
+
+ py3_only = "exec(x, end=y)"
+ black.lib2to3_parse(py3_only)
+ with self.assertRaises(black.InvalidInput):
+ black.lib2to3_parse(py3_only, {TargetVersion.PY27})
+ black.lib2to3_parse(py3_only, {TargetVersion.PY36})
+ black.lib2to3_parse(py3_only, {TargetVersion.PY27, TargetVersion.PY36})
+
def test_get_features_used(self) -> None:
node = black.lib2to3_parse("def f(*, arg): ...\n")
self.assertEqual(black.get_features_used(node), set())
node = black.lib2to3_parse("def f(*, arg,): ...\n")
- self.assertEqual(black.get_features_used(node), {Feature.TRAILING_COMMA})
+ self.assertEqual(black.get_features_used(node), {Feature.TRAILING_COMMA_IN_DEF})
+ node = black.lib2to3_parse("f(*arg,)\n")
+ self.assertEqual(
+ black.get_features_used(node), {Feature.TRAILING_COMMA_IN_CALL}
+ )
node = black.lib2to3_parse("def f(*, arg): f'string'\n")
self.assertEqual(black.get_features_used(node), {Feature.F_STRINGS})
node = black.lib2to3_parse("123_456\n")
self.assertEqual(black.get_features_used(node), set())
source, expected = read_data("function")
node = black.lib2to3_parse(source)
- self.assertEqual(
- black.get_features_used(node), {Feature.TRAILING_COMMA, Feature.F_STRINGS}
- )
+ expected_features = {
+ Feature.TRAILING_COMMA_IN_CALL,
+ Feature.TRAILING_COMMA_IN_DEF,
+ Feature.F_STRINGS,
+ }
+ self.assertEqual(black.get_features_used(node), expected_features)
node = black.lib2to3_parse(expected)
- self.assertEqual(
- black.get_features_used(node), {Feature.TRAILING_COMMA, Feature.F_STRINGS}
- )
+ self.assertEqual(black.get_features_used(node), expected_features)
source, expected = read_data("expression")
node = black.lib2to3_parse(source)
self.assertEqual(black.get_features_used(node), set())
source, expected = read_data("force_py36")
result = CliRunner().invoke(
black.main,
- ["-", "-q", "--target-version=cpy36"],
+ ["-", "-q", "--target-version=py36"],
input=BytesIO(source.encode("utf8")),
)
self.assertEqual(result.exit_code, 0)
except RuntimeError as re:
self.fail(f"`patch_click()` failed, exception still raised: {re}")
+ def test_root_logger_not_used_directly(self) -> None:
+ def fail(*args: Any, **kwargs: Any) -> None:
+ self.fail("Record created with root logger")
+
+ with patch.multiple(
+ logging.root,
+ debug=fail,
+ info=fail,
+ warning=fail,
+ error=fail,
+ critical=fail,
+ log=fail,
+ ):
+ ff(THIS_FILE)
+
+ # TODO: remove these decorators once the below is released
+ # https://github.com/aio-libs/aiohttp/commit/bfb99eb92bbc4a7462bc51dda3e480feed43882d
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_request_needs_formatting(self) -> None:
self.assertEqual(response.charset, "utf8")
self.assertEqual(await response.read(), b'print("hello world")\n')
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_request_no_change(self) -> None:
self.assertEqual(response.status, 204)
self.assertEqual(await response.read(), b"")
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_request_syntax_error(self) -> None:
msg=f"Expected error to start with 'Cannot parse', got {repr(content)}",
)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_unsupported_version(self) -> None:
)
self.assertEqual(response.status, 501)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_supported_version(self) -> None:
)
self.assertEqual(response.status, 200)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_invalid_python_variant(self) -> None:
await check("lol")
await check("ruby3.5")
await check("pyi3.6")
- await check("cpy1.5")
+ await check("py1.5")
await check("2.8")
- await check("cpy2.8")
+ await check("py2.8")
await check("3.0")
await check("pypy3.0")
await check("jython3.4")
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_pyi(self) -> None:
self.assertEqual(response.status, 200)
self.assertEqual(await response.text(), expected)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_python_variant(self) -> None:
self.assertEqual(response.status, expected_status)
await check("3.6", 200)
- await check("cpy3.6", 200)
- await check("3.5,3.7", 200)
- await check("3.5,cpy3.7", 200)
+ await check("py3.6", 200)
+ await check("3.6,3.7", 200)
+ await check("3.6,py3.7", 200)
await check("2", 204)
await check("2.7", 204)
- await check("cpy2.7", 204)
- await check("pypy2.7", 204)
+ await check("py2.7", 204)
await check("3.4", 204)
- await check("cpy3.4", 204)
- await check("pypy3.4", 204)
-
- @unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
- @async_test
- async def test_blackd_fast(self) -> None:
- with open(os.devnull, "w") as dn, redirect_stderr(dn):
- app = blackd.make_app()
- async with TestClient(TestServer(app)) as client:
- response = await client.post("/", data=b"ur'hello'")
- self.assertEqual(response.status, 500)
- self.assertIn("failed to parse source file", await response.text())
- response = await client.post(
- "/", data=b"ur'hello'", headers={blackd.FAST_OR_SAFE_HEADER: "fast"}
- )
- self.assertEqual(response.status, 200)
+ await check("py3.4", 204)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_line_length(self) -> None:
)
self.assertEqual(response.status, 200)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_invalid_line_length(self) -> None: