+from click.testing import CliRunner
+
+from tests.util import DETERMINISTIC_HEADER, read_data
+
+LESS_THAN_311 = sys.version_info < (3, 11)
+
+if LESS_THAN_311: # noqa: C901
+ try:
+ from aiohttp import web
+ from aiohttp.test_utils import AioHTTPTestCase
+
+ import blackd
+ except ImportError as e:
+ raise RuntimeError("Please install Black with the 'd' extra") from e
+
+ if TYPE_CHECKING:
+ F = TypeVar("F", bound=Callable[..., Any])
+
+ unittest_run_loop: Callable[[F], F] = lambda x: x
+ else:
+ try:
+ from aiohttp.test_utils import unittest_run_loop
+ except ImportError:
+ # unittest_run_loop is unnecessary and a no-op since aiohttp 3.8, and
+ # aiohttp 4 removed it. To maintain compatibility we can make our own
+ # no-op decorator.
+ def unittest_run_loop(func, *args, **kwargs):
+ return func
+
+ @pytest.mark.blackd
+ class BlackDTestCase(AioHTTPTestCase): # type: ignore[misc]
+ def test_blackd_main(self) -> None:
+ with patch("blackd.web.run_app"):
+ result = CliRunner().invoke(blackd.main, [])
+ if result.exception is not None:
+ raise result.exception
+ self.assertEqual(result.exit_code, 0)
+
+ async def get_application(self) -> web.Application:
+ return blackd.make_app()
+
+ @unittest_run_loop
+ async def test_blackd_request_needs_formatting(self) -> None:
+ response = await self.client.post("/", data=b"print('hello world')")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.charset, "utf8")
+ self.assertEqual(await response.read(), b'print("hello world")\n')
+
+ @unittest_run_loop
+ async def test_blackd_request_no_change(self) -> None:
+ response = await self.client.post("/", data=b'print("hello world")\n')
+ self.assertEqual(response.status, 204)
+ self.assertEqual(await response.read(), b"")
+
+ @unittest_run_loop
+ async def test_blackd_request_syntax_error(self) -> None:
+ response = await self.client.post("/", data=b"what even ( is")
+ self.assertEqual(response.status, 400)
+ content = await response.text()
+ self.assertTrue(
+ content.startswith("Cannot parse"),
+ msg=f"Expected error to start with 'Cannot parse', got {repr(content)}",
+ )
+
+ @unittest_run_loop
+ async def test_blackd_unsupported_version(self) -> None:
+ response = await self.client.post(
+ "/", data=b"what", headers={blackd.PROTOCOL_VERSION_HEADER: "2"}
+ )
+ self.assertEqual(response.status, 501)
+
+ @unittest_run_loop
+ async def test_blackd_supported_version(self) -> None:
+ response = await self.client.post(
+ "/", data=b"what", headers={blackd.PROTOCOL_VERSION_HEADER: "1"}
+ )
+ self.assertEqual(response.status, 200)
+
+ @unittest_run_loop
+ async def test_blackd_invalid_python_variant(self) -> None:
+ async def check(header_value: str, expected_status: int = 400) -> None:
+ response = await self.client.post(
+ "/",
+ data=b"what",
+ headers={blackd.PYTHON_VARIANT_HEADER: header_value},
+ )
+ self.assertEqual(response.status, expected_status)
+
+ await check("lol")
+ await check("ruby3.5")
+ await check("pyi3.6")
+ await check("py1.5")
+ await check("2")
+ await check("2.7")
+ await check("py2.7")
+ await check("2.8")
+ await check("py2.8")
+ await check("3.0")
+ await check("pypy3.0")
+ await check("jython3.4")
+
+ @unittest_run_loop
+ async def test_blackd_pyi(self) -> None:
+ source, expected = read_data("miscellaneous", "stub.pyi")
+ response = await self.client.post(
+ "/", data=source, headers={blackd.PYTHON_VARIANT_HEADER: "pyi"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(await response.text(), expected)
+
+ @unittest_run_loop
+ async def test_blackd_diff(self) -> None:
+ diff_header = re.compile(
+ r"(In|Out)\t\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d\.\d\d\d\d\d\d \+\d\d\d\d"
+ )
+
+ source, _ = read_data("miscellaneous", "blackd_diff")
+ expected, _ = read_data("miscellaneous", "blackd_diff.diff")