import asyncio
from concurrent.futures import Executor, ProcessPoolExecutor
+from datetime import datetime
from functools import partial
import logging
from multiprocessing import freeze_support
import black
import click
+from _black_version import version as __version__
+
# This is used internally by tests to shut down the server prematurely
_stop_signal = asyncio.Event()
-VERSION_HEADER = "X-Protocol-Version"
+# Request headers
+PROTOCOL_VERSION_HEADER = "X-Protocol-Version"
LINE_LENGTH_HEADER = "X-Line-Length"
PYTHON_VARIANT_HEADER = "X-Python-Variant"
SKIP_STRING_NORMALIZATION_HEADER = "X-Skip-String-Normalization"
FAST_OR_SAFE_HEADER = "X-Fast-Or-Safe"
+DIFF_HEADER = "X-Diff"
BLACK_HEADERS = [
- VERSION_HEADER,
+ PROTOCOL_VERSION_HEADER,
LINE_LENGTH_HEADER,
PYTHON_VARIANT_HEADER,
SKIP_STRING_NORMALIZATION_HEADER,
FAST_OR_SAFE_HEADER,
+ DIFF_HEADER,
]
+# Response headers
+BLACK_VERSION_HEADER = "X-Black-Version"
+
class InvalidVariantHeader(Exception):
pass
async def handle(request: web.Request, executor: Executor) -> web.Response:
+ headers = {BLACK_VERSION_HEADER: __version__}
try:
- if request.headers.get(VERSION_HEADER, "1") != "1":
+ if request.headers.get(PROTOCOL_VERSION_HEADER, "1") != "1":
return web.Response(
status=501, text="This server only supports protocol version 1"
)
req_bytes = await request.content.read()
charset = request.charset if request.charset is not None else "utf8"
req_str = req_bytes.decode(charset)
+ then = datetime.utcnow()
+
loop = asyncio.get_event_loop()
formatted_str = await loop.run_in_executor(
executor, partial(black.format_file_contents, req_str, fast=fast, mode=mode)
)
+
+ # Only output the diff in the HTTP response
+ only_diff = bool(request.headers.get(DIFF_HEADER, False))
+ if only_diff:
+ now = datetime.utcnow()
+ src_name = f"In\t{then} +0000"
+ dst_name = f"Out\t{now} +0000"
+ loop = asyncio.get_event_loop()
+ formatted_str = await loop.run_in_executor(
+ executor,
+ partial(black.diff, req_str, formatted_str, src_name, dst_name),
+ )
+
return web.Response(
- content_type=request.content_type, charset=charset, text=formatted_str
+ content_type=request.content_type,
+ charset=charset,
+ headers=headers,
+ text=formatted_str,
)
except black.NothingChanged:
- return web.Response(status=204)
+ return web.Response(status=204, headers=headers)
except black.InvalidInput as e:
- return web.Response(status=400, text=str(e))
+ return web.Response(status=400, headers=headers, text=str(e))
except Exception as e:
logging.exception("Exception during handling a request")
- return web.Response(status=500, text=str(e))
+ return web.Response(status=500, headers=headers, text=str(e))
def parse_python_variant_header(value: str) -> Tuple[bool, Set[black.TargetVersion]]:
else:
versions = set()
for version in value.split(","):
- tag = "cpy"
- if version.startswith("cpy"):
- version = version[len("cpy") :]
- elif version.startswith("pypy"):
- tag = "pypy"
- version = version[len("pypy") :]
- major_str, *rest = version.split(".")
+ if version.startswith("py"):
+ version = version[len("py") :]
+ if "." in version:
+ major_str, *rest = version.split(".")
+ else:
+ major_str = version[0]
+ rest = [version[1:]] if len(version) > 1 else []
try:
major = int(major_str)
if major not in (2, 3):
else:
# Default to lowest supported minor version.
minor = 7 if major == 2 else 3
- version_str = f"{tag.upper()}{major}{minor}"
- # If PyPY is the same as CPython in some version, use
- # the corresponding CPython version.
- if tag == "pypy" and not hasattr(black.TargetVersion, version_str):
- version_str = f"CPY{major}{minor}"
+ version_str = f"PY{major}{minor}"
if major == 3 and not hasattr(black.TargetVersion, version_str):
raise InvalidVariantHeader(f"3.{minor} is not supported")
versions.add(black.TargetVersion[version_str])
except (KeyError, ValueError):
- raise InvalidVariantHeader("expected e.g. '3.7', 'pypy3.5'")
+ raise InvalidVariantHeader("expected e.g. '3.7', 'py3.5'")
return False, versions