import logging
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
-from functools import partial, wraps
+from functools import partial
from io import BytesIO, TextIOWrapper
import os
from pathlib import Path
import re
import sys
from tempfile import TemporaryDirectory
-from typing import (
- Any,
- BinaryIO,
- Callable,
- Coroutine,
- Generator,
- List,
- Tuple,
- Iterator,
- TypeVar,
-)
+from typing import Any, BinaryIO, Generator, List, Tuple, Iterator, TypeVar
import unittest
from unittest.mock import patch, MagicMock
try:
import blackd
- from aiohttp.test_utils import TestClient, TestServer
+ from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop
+ from aiohttp import web
except ImportError:
has_blackd_deps = False
else:
@contextmanager
def event_loop(close: bool) -> Iterator[None]:
policy = asyncio.get_event_loop_policy()
- old_loop = policy.get_event_loop()
loop = policy.new_event_loop()
asyncio.set_event_loop(loop)
try:
yield
finally:
- policy.set_event_loop(old_loop)
if close:
loop.close()
-def async_test(f: Callable[..., Coroutine[Any, None, R]]) -> Callable[..., None]:
- @event_loop(close=True)
- @wraps(f)
- def wrapper(*args: Any, **kwargs: Any) -> None:
- asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
-
- return wrapper
+@contextmanager
+def skip_if_exception(e: str) -> Iterator[None]:
+ try:
+ yield
+ except Exception as exc:
+ if exc.__class__.__name__ == e:
+ unittest.skip(f"Encountered expected exception {exc}, skipping")
+ else:
+ raise exc
class BlackRunner(CliRunner):
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_function_trailing_comma(self) -> None:
+ source, expected = read_data("function_trailing_comma")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+
@patch("black.dump_to_file", dump_to_stderr)
def test_expression(self) -> None:
source, expected = read_data("expression")
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_pep_572(self) -> None:
+ source, expected = read_data("pep_572")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_stable(source, actual, black.FileMode())
+ if sys.version_info >= (3, 8):
+ black.assert_equivalent(source, actual)
+
+ def test_pep_572_version_detection(self) -> None:
+ source, _ = read_data("pep_572")
+ root = black.lib2to3_parse(source)
+ features = black.get_features_used(root)
+ self.assertIn(black.Feature.ASSIGNMENT_EXPRESSIONS, features)
+ versions = black.detect_target_versions(root)
+ self.assertIn(black.TargetVersion.PY38, versions)
+
def test_expression_ff(self) -> None:
source, expected = read_data("expression")
tmp_file = Path(black.dump_to_file(source))
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_pep_570(self) -> None:
+ source, expected = read_data("pep_570")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_stable(source, actual, black.FileMode())
+ if sys.version_info >= (3, 8):
+ black.assert_equivalent(source, actual)
+
+ def test_detect_pos_only_arguments(self) -> None:
+ source, _ = read_data("pep_570")
+ root = black.lib2to3_parse(source)
+ features = black.get_features_used(root)
+ self.assertIn(black.Feature.POS_ONLY_ARGUMENTS, features)
+ versions = black.detect_target_versions(root)
+ self.assertIn(black.TargetVersion.PY38, versions)
+
@patch("black.dump_to_file", dump_to_stderr)
def test_string_quotes(self) -> None:
source, expected = read_data("string_quotes")
black.assert_equivalent(source, actual)
black.assert_stable(source, actual, black.FileMode())
+ @patch("black.dump_to_file", dump_to_stderr)
+ def test_beginning_backslash(self) -> None:
+ source, expected = read_data("beginning_backslash")
+ actual = fs(source)
+ self.assertFormatEqual(expected, actual)
+ black.assert_equivalent(source, actual)
+ black.assert_stable(source, actual, black.FileMode())
+
def test_tab_comment_indentation(self) -> None:
contents_tab = "if 1:\n\tif 2:\n\t\tpass\n\t# comment\n\tpass\n"
contents_spc = "if 1:\n if 2:\n pass\n # comment\n pass\n"
ff(THIS_FILE)
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
- @async_test
+ def test_blackd_main(self) -> None:
+ with patch("blackd.web.run_app"):
+ result = CliRunner().invoke(blackd.main, [])
+ if result.exception is not None:
+ raise result.exception
+ self.assertEqual(result.exit_code, 0)
+
+
+class BlackDTestCase(AioHTTPTestCase):
+ async def get_application(self) -> web.Application:
+ return blackd.make_app()
+
+ # TODO: remove these decorators once the below is released
+ # https://github.com/aio-libs/aiohttp/pull/3727
+ @skip_if_exception("ClientOSError")
+ @unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
+ @unittest_run_loop
async def test_blackd_request_needs_formatting(self) -> None:
- app = blackd.make_app()
- async with TestClient(TestServer(app)) as client:
- response = await client.post("/", data=b"print('hello world')")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.charset, "utf8")
- self.assertEqual(await response.read(), b'print("hello world")\n')
+ response = await self.client.post("/", data=b"print('hello world')")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.charset, "utf8")
+ self.assertEqual(await response.read(), b'print("hello world")\n')
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
- @async_test
+ @unittest_run_loop
async def test_blackd_request_no_change(self) -> None:
- app = blackd.make_app()
- async with TestClient(TestServer(app)) as client:
- response = await client.post("/", data=b'print("hello world")\n')
- self.assertEqual(response.status, 204)
- self.assertEqual(await response.read(), b"")
+ response = await self.client.post("/", data=b'print("hello world")\n')
+ self.assertEqual(response.status, 204)
+ self.assertEqual(await response.read(), b"")
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
- @async_test
+ @unittest_run_loop
async def test_blackd_request_syntax_error(self) -> None:
- app = blackd.make_app()
- async with TestClient(TestServer(app)) as client:
- response = await client.post("/", data=b"what even ( is")
- self.assertEqual(response.status, 400)
- content = await response.text()
- self.assertTrue(
- content.startswith("Cannot parse"),
- msg=f"Expected error to start with 'Cannot parse', got {repr(content)}",
- )
+ response = await self.client.post("/", data=b"what even ( is")
+ self.assertEqual(response.status, 400)
+ content = await response.text()
+ self.assertTrue(
+ content.startswith("Cannot parse"),
+ msg=f"Expected error to start with 'Cannot parse', got {repr(content)}",
+ )
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
- @async_test
+ @unittest_run_loop
async def test_blackd_unsupported_version(self) -> None:
- app = blackd.make_app()
- async with TestClient(TestServer(app)) as client:
- response = await client.post(
- "/", data=b"what", headers={blackd.VERSION_HEADER: "2"}
- )
- self.assertEqual(response.status, 501)
+ response = await self.client.post(
+ "/", data=b"what", headers={blackd.VERSION_HEADER: "2"}
+ )
+ self.assertEqual(response.status, 501)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
- @async_test
+ @unittest_run_loop
async def test_blackd_supported_version(self) -> None:
- app = blackd.make_app()
- async with TestClient(TestServer(app)) as client:
- response = await client.post(
- "/", data=b"what", headers={blackd.VERSION_HEADER: "1"}
- )
- self.assertEqual(response.status, 200)
+ response = await self.client.post(
+ "/", data=b"what", headers={blackd.VERSION_HEADER: "1"}
+ )
+ self.assertEqual(response.status, 200)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
- @async_test
+ @unittest_run_loop
async def test_blackd_invalid_python_variant(self) -> None:
- app = blackd.make_app()
- async with TestClient(TestServer(app)) as client:
-
- async def check(header_value: str, expected_status: int = 400) -> None:
- response = await client.post(
- "/",
- data=b"what",
- headers={blackd.PYTHON_VARIANT_HEADER: header_value},
- )
- self.assertEqual(response.status, expected_status)
-
- await check("lol")
- await check("ruby3.5")
- await check("pyi3.6")
- await check("py1.5")
- await check("2.8")
- await check("py2.8")
- await check("3.0")
- await check("pypy3.0")
- await check("jython3.4")
-
+ async def check(header_value: str, expected_status: int = 400) -> None:
+ response = await self.client.post(
+ "/", data=b"what", headers={blackd.PYTHON_VARIANT_HEADER: header_value}
+ )
+ self.assertEqual(response.status, expected_status)
+
+ await check("lol")
+ await check("ruby3.5")
+ await check("pyi3.6")
+ await check("py1.5")
+ await check("2.8")
+ await check("py2.8")
+ await check("3.0")
+ await check("pypy3.0")
+ await check("jython3.4")
+
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
- @async_test
+ @unittest_run_loop
async def test_blackd_pyi(self) -> None:
- app = blackd.make_app()
- async with TestClient(TestServer(app)) as client:
- source, expected = read_data("stub.pyi")
- response = await client.post(
- "/", data=source, headers={blackd.PYTHON_VARIANT_HEADER: "pyi"}
- )
- self.assertEqual(response.status, 200)
- self.assertEqual(await response.text(), expected)
+ source, expected = read_data("stub.pyi")
+ response = await self.client.post(
+ "/", data=source, headers={blackd.PYTHON_VARIANT_HEADER: "pyi"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(await response.text(), expected)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
- @async_test
+ @unittest_run_loop
async def test_blackd_python_variant(self) -> None:
- app = blackd.make_app()
code = (
"def f(\n"
" and_has_a_bunch_of,\n"
"):\n"
" pass\n"
)
- async with TestClient(TestServer(app)) as client:
- async def check(header_value: str, expected_status: int) -> None:
- response = await client.post(
- "/", data=code, headers={blackd.PYTHON_VARIANT_HEADER: header_value}
- )
- self.assertEqual(response.status, expected_status)
+ async def check(header_value: str, expected_status: int) -> None:
+ response = await self.client.post(
+ "/", data=code, headers={blackd.PYTHON_VARIANT_HEADER: header_value}
+ )
+ self.assertEqual(response.status, expected_status)
- await check("3.6", 200)
- await check("py3.6", 200)
- await check("3.6,3.7", 200)
- await check("3.6,py3.7", 200)
+ await check("3.6", 200)
+ await check("py3.6", 200)
+ await check("3.6,3.7", 200)
+ await check("3.6,py3.7", 200)
- await check("2", 204)
- await check("2.7", 204)
- await check("py2.7", 204)
- await check("3.4", 204)
- await check("py3.4", 204)
+ await check("2", 204)
+ await check("2.7", 204)
+ await check("py2.7", 204)
+ await check("3.4", 204)
+ await check("py3.4", 204)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
- @async_test
+ @unittest_run_loop
async def test_blackd_line_length(self) -> None:
- app = blackd.make_app()
- async with TestClient(TestServer(app)) as client:
- response = await client.post(
- "/", data=b'print("hello")\n', headers={blackd.LINE_LENGTH_HEADER: "7"}
- )
- self.assertEqual(response.status, 200)
+ response = await self.client.post(
+ "/", data=b'print("hello")\n', headers={blackd.LINE_LENGTH_HEADER: "7"}
+ )
+ self.assertEqual(response.status, 200)
+ @skip_if_exception("ClientOSError")
@unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
- @async_test
+ @unittest_run_loop
async def test_blackd_invalid_line_length(self) -> None:
- app = blackd.make_app()
- async with TestClient(TestServer(app)) as client:
- response = await client.post(
- "/",
- data=b'print("hello")\n',
- headers={blackd.LINE_LENGTH_HEADER: "NaN"},
- )
- self.assertEqual(response.status, 400)
-
- @unittest.skipUnless(has_blackd_deps, "blackd's dependencies are not installed")
- def test_blackd_main(self) -> None:
- with patch("blackd.web.run_app"):
- result = CliRunner().invoke(blackd.main, [])
- if result.exception is not None:
- raise result.exception
- self.assertEqual(result.exit_code, 0)
+ response = await self.client.post(
+ "/", data=b'print("hello")\n', headers={blackd.LINE_LENGTH_HEADER: "NaN"}
+ )
+ self.assertEqual(response.status, 400)
if __name__ == "__main__":