import re
-import sys
from typing import TYPE_CHECKING, Any, Callable, TypeVar
from unittest.mock import patch
from tests.util import DETERMINISTIC_HEADER, read_data
-LESS_THAN_311 = sys.version_info < (3, 11)
+try:
+ from aiohttp import web
+ from aiohttp.test_utils import AioHTTPTestCase
-if LESS_THAN_311: # noqa: C901
- try:
- from aiohttp import web
- from aiohttp.test_utils import AioHTTPTestCase
-
- import blackd
- except ImportError as e:
- raise RuntimeError("Please install Black with the 'd' extra") from e
-
- if TYPE_CHECKING:
- F = TypeVar("F", bound=Callable[..., Any])
-
- unittest_run_loop: Callable[[F], F] = lambda x: x
- else:
- try:
- from aiohttp.test_utils import unittest_run_loop
- except ImportError:
- # unittest_run_loop is unnecessary and a no-op since aiohttp 3.8, and
- # aiohttp 4 removed it. To maintain compatibility we can make our own
- # no-op decorator.
- def unittest_run_loop(func, *args, **kwargs):
- return func
-
- @pytest.mark.blackd
- class BlackDTestCase(AioHTTPTestCase): # type: ignore[misc]
- def test_blackd_main(self) -> None:
- with patch("blackd.web.run_app"):
- result = CliRunner().invoke(blackd.main, [])
- if result.exception is not None:
- raise result.exception
- self.assertEqual(result.exit_code, 0)
-
- async def get_application(self) -> web.Application:
- return blackd.make_app()
-
- @unittest_run_loop
- async def test_blackd_request_needs_formatting(self) -> None:
- response = await self.client.post("/", data=b"print('hello world')")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.charset, "utf8")
- self.assertEqual(await response.read(), b'print("hello world")\n')
-
- @unittest_run_loop
- async def test_blackd_request_no_change(self) -> None:
- response = await self.client.post("/", data=b'print("hello world")\n')
- self.assertEqual(response.status, 204)
- self.assertEqual(await response.read(), b"")
-
- @unittest_run_loop
- async def test_blackd_request_syntax_error(self) -> None:
- response = await self.client.post("/", data=b"what even ( is")
- self.assertEqual(response.status, 400)
- content = await response.text()
- self.assertTrue(
- content.startswith("Cannot parse"),
- msg=f"Expected error to start with 'Cannot parse', got {repr(content)}",
- )
-
- @unittest_run_loop
- async def test_blackd_unsupported_version(self) -> None:
- response = await self.client.post(
- "/", data=b"what", headers={blackd.PROTOCOL_VERSION_HEADER: "2"}
- )
- self.assertEqual(response.status, 501)
-
- @unittest_run_loop
- async def test_blackd_supported_version(self) -> None:
- response = await self.client.post(
- "/", data=b"what", headers={blackd.PROTOCOL_VERSION_HEADER: "1"}
- )
- self.assertEqual(response.status, 200)
-
- @unittest_run_loop
- async def test_blackd_invalid_python_variant(self) -> None:
- async def check(header_value: str, expected_status: int = 400) -> None:
- response = await self.client.post(
- "/",
- data=b"what",
- headers={blackd.PYTHON_VARIANT_HEADER: header_value},
- )
- self.assertEqual(response.status, expected_status)
-
- await check("lol")
- await check("ruby3.5")
- await check("pyi3.6")
- await check("py1.5")
- await check("2")
- await check("2.7")
- await check("py2.7")
- await check("2.8")
- await check("py2.8")
- await check("3.0")
- await check("pypy3.0")
- await check("jython3.4")
-
- @unittest_run_loop
- async def test_blackd_pyi(self) -> None:
- source, expected = read_data("miscellaneous", "stub.pyi")
- response = await self.client.post(
- "/", data=source, headers={blackd.PYTHON_VARIANT_HEADER: "pyi"}
- )
- self.assertEqual(response.status, 200)
- self.assertEqual(await response.text(), expected)
-
- @unittest_run_loop
- async def test_blackd_diff(self) -> None:
- diff_header = re.compile(
- r"(In|Out)\t\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d\.\d\d\d\d\d\d \+\d\d\d\d"
- )
+ import blackd
+except ImportError as e:
+ raise RuntimeError("Please install Black with the 'd' extra") from e
- source, _ = read_data("miscellaneous", "blackd_diff")
- expected, _ = read_data("miscellaneous", "blackd_diff.diff")
+if TYPE_CHECKING:
+ F = TypeVar("F", bound=Callable[..., Any])
+ unittest_run_loop: Callable[[F], F] = lambda x: x
+else:
+ try:
+ from aiohttp.test_utils import unittest_run_loop
+ except ImportError:
+ # unittest_run_loop is unnecessary and a no-op since aiohttp 3.8, and
+ # aiohttp 4 removed it. To maintain compatibility we can make our own
+ # no-op decorator.
+ def unittest_run_loop(func, *args, **kwargs):
+ return func
+
+
+@pytest.mark.blackd
+class BlackDTestCase(AioHTTPTestCase): # type: ignore[misc]
+ def test_blackd_main(self) -> None:
+ with patch("blackd.web.run_app"):
+ result = CliRunner().invoke(blackd.main, [])
+ if result.exception is not None:
+ raise result.exception
+ self.assertEqual(result.exit_code, 0)
+
+ async def get_application(self) -> web.Application:
+ return blackd.make_app()
+
+ @unittest_run_loop
+ async def test_blackd_request_needs_formatting(self) -> None:
+ response = await self.client.post("/", data=b"print('hello world')")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.charset, "utf8")
+ self.assertEqual(await response.read(), b'print("hello world")\n')
+
+ @unittest_run_loop
+ async def test_blackd_request_no_change(self) -> None:
+ response = await self.client.post("/", data=b'print("hello world")\n')
+ self.assertEqual(response.status, 204)
+ self.assertEqual(await response.read(), b"")
+
+ @unittest_run_loop
+ async def test_blackd_request_syntax_error(self) -> None:
+ response = await self.client.post("/", data=b"what even ( is")
+ self.assertEqual(response.status, 400)
+ content = await response.text()
+ self.assertTrue(
+ content.startswith("Cannot parse"),
+ msg=f"Expected error to start with 'Cannot parse', got {repr(content)}",
+ )
+
+ @unittest_run_loop
+ async def test_blackd_unsupported_version(self) -> None:
+ response = await self.client.post(
+ "/", data=b"what", headers={blackd.PROTOCOL_VERSION_HEADER: "2"}
+ )
+ self.assertEqual(response.status, 501)
+
+ @unittest_run_loop
+ async def test_blackd_supported_version(self) -> None:
+ response = await self.client.post(
+ "/", data=b"what", headers={blackd.PROTOCOL_VERSION_HEADER: "1"}
+ )
+ self.assertEqual(response.status, 200)
+
+ @unittest_run_loop
+ async def test_blackd_invalid_python_variant(self) -> None:
+ async def check(header_value: str, expected_status: int = 400) -> None:
response = await self.client.post(
- "/", data=source, headers={blackd.DIFF_HEADER: "true"}
- )
- self.assertEqual(response.status, 200)
-
- actual = await response.text()
- actual = diff_header.sub(DETERMINISTIC_HEADER, actual)
- self.assertEqual(actual, expected)
-
- @unittest_run_loop
- async def test_blackd_python_variant(self) -> None:
- code = (
- "def f(\n"
- " and_has_a_bunch_of,\n"
- " very_long_arguments_too,\n"
- " and_lots_of_them_as_well_lol,\n"
- " **and_very_long_keyword_arguments\n"
- "):\n"
- " pass\n"
+ "/",
+ data=b"what",
+ headers={blackd.PYTHON_VARIANT_HEADER: header_value},
)
-
- async def check(header_value: str, expected_status: int) -> None:
- response = await self.client.post(
- "/", data=code, headers={blackd.PYTHON_VARIANT_HEADER: header_value}
- )
- self.assertEqual(
- response.status, expected_status, msg=await response.text()
- )
-
- await check("3.6", 200)
- await check("py3.6", 200)
- await check("3.6,3.7", 200)
- await check("3.6,py3.7", 200)
- await check("py36,py37", 200)
- await check("36", 200)
- await check("3.6.4", 200)
- await check("3.4", 204)
- await check("py3.4", 204)
- await check("py34,py36", 204)
- await check("34", 204)
-
- @unittest_run_loop
- async def test_blackd_line_length(self) -> None:
+ self.assertEqual(response.status, expected_status)
+
+ await check("lol")
+ await check("ruby3.5")
+ await check("pyi3.6")
+ await check("py1.5")
+ await check("2")
+ await check("2.7")
+ await check("py2.7")
+ await check("2.8")
+ await check("py2.8")
+ await check("3.0")
+ await check("pypy3.0")
+ await check("jython3.4")
+
+ @unittest_run_loop
+ async def test_blackd_pyi(self) -> None:
+ source, expected = read_data("cases", "stub.py")
+ response = await self.client.post(
+ "/", data=source, headers={blackd.PYTHON_VARIANT_HEADER: "pyi"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(await response.text(), expected)
+
+ @unittest_run_loop
+ async def test_blackd_diff(self) -> None:
+ diff_header = re.compile(
+ r"(In|Out)\t\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d\.\d\d\d\d\d\d\+\d\d:\d\d"
+ )
+
+ source, _ = read_data("miscellaneous", "blackd_diff")
+ expected, _ = read_data("miscellaneous", "blackd_diff.diff")
+
+ response = await self.client.post(
+ "/", data=source, headers={blackd.DIFF_HEADER: "true"}
+ )
+ self.assertEqual(response.status, 200)
+
+ actual = await response.text()
+ actual = diff_header.sub(DETERMINISTIC_HEADER, actual)
+ self.assertEqual(actual, expected)
+
+ @unittest_run_loop
+ async def test_blackd_python_variant(self) -> None:
+ code = (
+ "def f(\n"
+ " and_has_a_bunch_of,\n"
+ " very_long_arguments_too,\n"
+ " and_lots_of_them_as_well_lol,\n"
+ " **and_very_long_keyword_arguments\n"
+ "):\n"
+ " pass\n"
+ )
+
+ async def check(header_value: str, expected_status: int) -> None:
response = await self.client.post(
- "/", data=b'print("hello")\n', headers={blackd.LINE_LENGTH_HEADER: "7"}
+ "/", data=code, headers={blackd.PYTHON_VARIANT_HEADER: header_value}
)
- self.assertEqual(response.status, 200)
-
- @unittest_run_loop
- async def test_blackd_invalid_line_length(self) -> None:
- response = await self.client.post(
- "/",
- data=b'print("hello")\n',
- headers={blackd.LINE_LENGTH_HEADER: "NaN"},
+ self.assertEqual(
+ response.status, expected_status, msg=await response.text()
)
- self.assertEqual(response.status, 400)
- @unittest_run_loop
- async def test_blackd_preview(self) -> None:
- response = await self.client.post(
- "/", data=b'print("hello")\n', headers={blackd.PREVIEW: "true"}
- )
+ await check("3.6", 200)
+ await check("py3.6", 200)
+ await check("3.6,3.7", 200)
+ await check("3.6,py3.7", 200)
+ await check("py36,py37", 200)
+ await check("36", 200)
+ await check("3.6.4", 200)
+ await check("3.4", 204)
+ await check("py3.4", 204)
+ await check("py34,py36", 204)
+ await check("34", 204)
+
+ @unittest_run_loop
+ async def test_blackd_line_length(self) -> None:
+ response = await self.client.post(
+ "/", data=b'print("hello")\n', headers={blackd.LINE_LENGTH_HEADER: "7"}
+ )
+ self.assertEqual(response.status, 200)
+
+ @unittest_run_loop
+ async def test_blackd_invalid_line_length(self) -> None:
+ response = await self.client.post(
+ "/",
+ data=b'print("hello")\n',
+ headers={blackd.LINE_LENGTH_HEADER: "NaN"},
+ )
+ self.assertEqual(response.status, 400)
+
+ @unittest_run_loop
+ async def test_blackd_skip_first_source_line(self) -> None:
+ invalid_first_line = b"Header will be skipped\r\ni = [1,2,3]\nj = [1,2,3]\n"
+ expected_result = b"Header will be skipped\r\ni = [1, 2, 3]\nj = [1, 2, 3]\n"
+ response = await self.client.post("/", data=invalid_first_line)
+ self.assertEqual(response.status, 400)
+ response = await self.client.post(
+ "/",
+ data=invalid_first_line,
+ headers={blackd.SKIP_SOURCE_FIRST_LINE: "true"},
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(await response.read(), expected_result)
+
+ @unittest_run_loop
+ async def test_blackd_preview(self) -> None:
+ response = await self.client.post(
+ "/", data=b'print("hello")\n', headers={blackd.PREVIEW: "true"}
+ )
+ self.assertEqual(response.status, 204)
+
+ @unittest_run_loop
+ async def test_blackd_response_black_version_header(self) -> None:
+ response = await self.client.post("/")
+ self.assertIsNotNone(response.headers.get(blackd.BLACK_VERSION_HEADER))
+
+ @unittest_run_loop
+ async def test_cors_preflight(self) -> None:
+ response = await self.client.options(
+ "/",
+ headers={
+ "Access-Control-Request-Method": "POST",
+ "Origin": "*",
+ "Access-Control-Request-Headers": "Content-Type",
+ },
+ )
+ self.assertEqual(response.status, 200)
+ self.assertIsNotNone(response.headers.get("Access-Control-Allow-Origin"))
+ self.assertIsNotNone(response.headers.get("Access-Control-Allow-Headers"))
+ self.assertIsNotNone(response.headers.get("Access-Control-Allow-Methods"))
+
+ @unittest_run_loop
+ async def test_cors_headers_present(self) -> None:
+ response = await self.client.post("/", headers={"Origin": "*"})
+ self.assertIsNotNone(response.headers.get("Access-Control-Allow-Origin"))
+ self.assertIsNotNone(response.headers.get("Access-Control-Expose-Headers"))
+
+ @unittest_run_loop
+ async def test_preserves_line_endings(self) -> None:
+ for data in (b"c\r\nc\r\n", b"l\nl\n"):
+ # test preserved newlines when reformatted
+ response = await self.client.post("/", data=data + b" ")
+ self.assertEqual(await response.text(), data.decode())
+ # test 204 when no change
+ response = await self.client.post("/", data=data)
self.assertEqual(response.status, 204)
- @unittest_run_loop
- async def test_blackd_response_black_version_header(self) -> None:
- response = await self.client.post("/")
- self.assertIsNotNone(response.headers.get(blackd.BLACK_VERSION_HEADER))
-
- @unittest_run_loop
- async def test_cors_preflight(self) -> None:
- response = await self.client.options(
- "/",
- headers={
- "Access-Control-Request-Method": "POST",
- "Origin": "*",
- "Access-Control-Request-Headers": "Content-Type",
- },
- )
+ @unittest_run_loop
+ async def test_normalizes_line_endings(self) -> None:
+ for data, expected in ((b"c\r\nc\n", "c\r\nc\r\n"), (b"l\nl\r\n", "l\nl\n")):
+ response = await self.client.post("/", data=data)
+ self.assertEqual(await response.text(), expected)
self.assertEqual(response.status, 200)
- self.assertIsNotNone(response.headers.get("Access-Control-Allow-Origin"))
- self.assertIsNotNone(response.headers.get("Access-Control-Allow-Headers"))
- self.assertIsNotNone(response.headers.get("Access-Control-Allow-Methods"))
-
- @unittest_run_loop
- async def test_cors_headers_present(self) -> None:
- response = await self.client.post("/", headers={"Origin": "*"})
- self.assertIsNotNone(response.headers.get("Access-Control-Allow-Origin"))
- self.assertIsNotNone(response.headers.get("Access-Control-Expose-Headers"))
+
+ @unittest_run_loop
+ async def test_single_character(self) -> None:
+ response = await self.client.post("/", data="1")
+ self.assertEqual(await response.text(), "1\n")
+ self.assertEqual(response.status, 200)