import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
-from contextlib import contextmanager, redirect_stderr
+from contextlib import contextmanager
from functools import partial, wraps
from io import BytesIO, TextIOWrapper
import os
return wrapper
+@contextmanager
+def skip_if_exception(e: str) -> Iterator[None]:
+ try:
+ yield
+ except Exception as exc:
+ if exc.__class__.__name__ == e:
+ unittest.skip(f"Encountered expected exception {exc}, skipping")
+
+
class BlackRunner(CliRunner):
"""Modify CliRunner so that stderr is not merged with stdout.
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_function_trailing_comma(self) -> None:
+ source, expected = read_data("function_trailing_comma")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+
@patch("black.dump_to_file", dump_to_stderr)
def test_expression(self) -> None:
source, expected = read_data("expression")
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_pep_572(self) -> None:
+ source, expected = read_data("pep_572")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_stable(source, actual, black.FileMode())
+ if sys.version_info >= (3, 8):
+ black.assert_equivalent(source, actual)
+
+ def test_pep_572_version_detection(self) -> None:
+ source, _ = read_data("pep_572")
+ root = black.lib2to3_parse(source)
+ features = black.get_features_used(root)
+ self.assertIn(black.Feature.ASSIGNMENT_EXPRESSIONS, features)
+ versions = black.detect_target_versions(root)
+ self.assertIn(black.TargetVersion.PY38, versions)
+
def test_expression_ff(self) -> None:
source, expected = read_data("expression")
tmp_file = Path(black.dump_to_file(source))
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_pep_570(self) -> None:
+ source, expected = read_data("pep_570")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_stable(source, actual, black.FileMode())
+ if sys.version_info >= (3, 8):
+ black.assert_equivalent(source, actual)
+
+ def test_detect_pos_only_arguments(self) -> None:
+ source, _ = read_data("pep_570")
+ root = black.lib2to3_parse(source)
+ features = black.get_features_used(root)
+ self.assertIn(black.Feature.POS_ONLY_ARGUMENTS, features)
+ versions = black.detect_target_versions(root)
+ self.assertIn(black.TargetVersion.PY38, versions)
+
@patch("black.dump_to_file", dump_to_stderr)
def test_string_quotes(self) -> None:
source, expected = read_data("string_quotes")
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_comments7(self) -> None:
+ source, expected = read_data("comments7")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_comment_after_escaped_newline(self) -> None:
+ source, expected = read_data("comment_after_escaped_newline")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+
@patch("black.dump_to_file", dump_to_stderr)
def test_cantfit(self) -> None:
source, expected = read_data("cantfit")
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_remove_parens(self) -> None:
+ source, expected = read_data("remove_parens")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+
@patch("black.dump_to_file", dump_to_stderr)
def test_string_prefixes(self) -> None:
source, expected = read_data("string_prefixes")
source, expected = read_data("python2")
actual = fs(source)
self.assertFormatEqual(expected, actual)
- # black.assert_equivalent(source, actual)
+ black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
@patch("black.dump_to_file", dump_to_stderr)
mode = black.FileMode(target_versions={TargetVersion.PY27})
actual = fs(source, mode=mode)
self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
black.assert_stable(source, actual, mode)
@patch("black.dump_to_file", dump_to_stderr)
source, expected = read_data("python2_unicode_literals")
actual = fs(source)
self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
@patch("black.dump_to_file", dump_to_stderr)
self.assertFormatEqual(expected, actual)
black.assert_stable(source, actual, mode)
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_async_as_identifier(self) -> None:
+ source_path = (THIS_DIR / "data" / "async_as_identifier.py").resolve()
+ source, expected = read_data("async_as_identifier")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ major, minor = sys.version_info[:2]
+ if major < 3 or (major <= 3 and minor < 7):
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+ # ensure black can parse this when the target is 3.6
+ self.invokeBlack([str(source_path), "--target-version", "py36"])
+ # but not on 3.7, because async/await is no longer an identifier
+ self.invokeBlack([str(source_path), "--target-version", "py37"], exit_code=123)
+
@patch("black.dump_to_file", dump_to_stderr)
def test_python37(self) -> None:
+ source_path = (THIS_DIR / "data" / "python37.py").resolve()
source, expected = read_data("python37")
actual = fs(source)
self.assertFormatEqual(expected, actual)
if major > 3 or (major == 3 and minor >= 7):
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ # ensure black can parse this when the target is 3.7
+ self.invokeBlack([str(source_path), "--target-version", "py37"])
+ # but not on 3.6, because we use async as a reserved keyword
+ self.invokeBlack([str(source_path), "--target-version", "py36"], exit_code=123)
@patch("black.dump_to_file", dump_to_stderr)
def test_fmtonoff(self) -> None:
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_tuple_assign(self) -> None:
+ source, expected = read_data("tupleassign")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+
def test_tab_comment_indentation(self) -> None:
contents_tab = "if 1:\n\tif 2:\n\t\tpass\n\t# comment\n\tpass\n"
contents_spc = "if 1:\n if 2:\n pass\n # comment\n pass\n"
):
ff(THIS_FILE)
+ # TODO: remove these decorators once the below is released
+ # https://github.com/aio-libs/aiohttp/pull/3727
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_request_needs_formatting(self) -> None:
self.assertEqual(response.charset, "utf8")
self.assertEqual(await response.read(), b'print("hello world")\n')
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_request_no_change(self) -> None:
self.assertEqual(response.status, 204)
self.assertEqual(await response.read(), b"")
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_request_syntax_error(self) -> None:
msg=f"Expected error to start with 'Cannot parse', got {repr(content)}",
)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_unsupported_version(self) -> None:
)
self.assertEqual(response.status, 501)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_supported_version(self) -> None:
)
self.assertEqual(response.status, 200)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_invalid_python_variant(self) -> None:
await check("pypy3.0")
await check("jython3.4")
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_pyi(self) -> None:
self.assertEqual(response.status, 200)
self.assertEqual(await response.text(), expected)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_python_variant(self) -> None:
await check("3.4", 204)
await check("py3.4", 204)
- @unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
- @async_test
- async def test_blackd_fast(self) -> None:
- with open(os.devnull, "w") as dn, redirect_stderr(dn):
- app = blackd.make_app()
- async with TestClient(TestServer(app)) as client:
- response = await client.post("/", data=b"ur'hello'")
- self.assertEqual(response.status, 500)
- self.assertIn("failed to parse source file", await response.text())
- response = await client.post(
- "/", data=b"ur'hello'", headers={blackd.FAST_OR_SAFE_HEADER: "fast"}
- )
- self.assertEqual(response.status, 200)
-
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_line_length(self) -> None:
)
self.assertEqual(response.status, 200)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
@async_test
async def test_blackd_invalid_line_length(self) -> None: