#!/usr/bin/env python3
-# Module '__future__' has no attribute 'annotations'
-from __future__ import annotations # type: ignore
-
import asyncio
import json
import logging
+import sys
from pathlib import Path
+from platform import system
from shutil import rmtree, which
from subprocess import CalledProcessError
from sys import version_info
import click
+WINDOWS = system() == "Windows"
+BLACK_BINARY = "black.exe" if WINDOWS else "black"
+GIT_BIANRY = "git.exe" if WINDOWS else "git"
LOG = logging.getLogger(__name__)
+# Windows needs a ProactorEventLoop if you want to exec subprocesses
+# Startng 3.8 this is the default - Can remove when black >= 3.8
+# mypy only respects sys.platform if directly in the evaluation
+# # https://mypy.readthedocs.io/en/latest/common_issues.html#python-version-and-system-platform-checks # noqa: B950
+if sys.platform == "win32":
+ asyncio.set_event_loop(asyncio.ProactorEventLoop())
+
+
class Results(NamedTuple):
stats: Dict[str, int] = {}
failed_projects: Dict[str, CalledProcessError] = {}
return (stdout, stderr)
-async def analyze_results(project_count: int, results: Results) -> int:
+def analyze_results(project_count: int, results: Results) -> int:
failed_pct = round(((results.stats["failed"] / project_count) * 100), 2)
success_pct = round(((results.stats["success"] / project_count) * 100), 2)
repo_path: Path, project_config: Dict[str, Any], results: Results
) -> None:
"""Run black and record failures"""
- cmd = [str(which("black"))]
- if project_config["cli_arguments"]:
+ cmd = [str(which(BLACK_BINARY))]
+ if "cli_arguments" in project_config and project_config["cli_arguments"]:
cmd.extend(*project_config["cli_arguments"])
cmd.extend(["--check", "--diff", "."])
results.stats["failed"] += 1
LOG.error(f"Running black for {repo_path} timed out ({cmd})")
except CalledProcessError as cpe:
- # TODO: This might need to be tuned and made smarter for higher signal
- if not project_config["expect_formatting_changes"] and cpe.returncode == 1:
- results.stats["failed"] += 1
- results.failed_projects[repo_path.name] = cpe
+ # TODO: Tune for smarter for higher signal
+ # If any other reutrn value than 1 we raise - can disable project in config
+ if cpe.returncode == 1:
+ if not project_config["expect_formatting_changes"]:
+ results.stats["failed"] += 1
+ results.failed_projects[repo_path.name] = cpe
+ else:
+ results.stats["success"] += 1
return
+ LOG.error(f"Unkown error with {repo_path}")
+ raise
+
+ # If we get here and expect formatting changes something is up
+ if project_config["expect_formatting_changes"]:
+ results.stats["failed"] += 1
+ results.failed_projects[repo_path.name] = CalledProcessError(
+ 0, cmd, b"Expected formatting changes but didn't get any!", b""
+ )
+ return
+
results.stats["success"] += 1
depth: int = 1,
) -> Optional[Path]:
"""git Clone project or rebase"""
- git_bin = str(which("git"))
+ git_bin = str(which(GIT_BIANRY))
if not git_bin:
LOG.error("No git binary found")
return None
async def load_projects_queue(
config_path: Path,
-) -> Tuple[Dict[str, Any], asyncio.Queue[str]]:
+) -> Tuple[Dict[str, Any], asyncio.Queue]:
"""Load project config and fill queue with all the project names"""
with config_path.open("r") as cfp:
config = json.load(cfp)
# TODO: Offer more options here
# e.g. Run on X random packages or specific sub list etc.
project_names = sorted(config["projects"].keys())
- queue: asyncio.Queue[str] = asyncio.Queue(maxsize=len(project_names))
+ queue: asyncio.Queue = asyncio.Queue(maxsize=len(project_names))
for project in project_names:
await queue.put(project)
async def project_runner(
idx: int,
config: Dict[str, Any],
- queue: asyncio.Queue[str],
+ queue: asyncio.Queue,
work_path: Path,
results: Results,
long_checkouts: bool = False,
config, queue = await load_projects_queue(Path(config_file))
project_count = queue.qsize()
LOG.info(f"{project_count} projects to run black over")
- if not project_count:
+ if project_count < 1:
return -1
LOG.debug(f"Using {workers} parallel workers to run black")
)
LOG.info("Analyzing results")
- return await analyze_results(project_count, results)
+ return analyze_results(project_count, results)
+
+
+if __name__ == "__main__": # pragma: nocover
+ raise NotImplementedError("lib is a library, funnily enough.")
--- /dev/null
+#!/usr/bin/env python3
+
+import asyncio
+import sys
+import unittest
+from contextlib import contextmanager
+from copy import deepcopy
+from io import StringIO
+from os import getpid
+from pathlib import Path
+from platform import system
+from subprocess import CalledProcessError
+from tempfile import TemporaryDirectory, gettempdir
+from typing import Any, Callable, Generator, Iterator, Tuple
+from unittest.mock import Mock, patch
+
+from click.testing import CliRunner
+
+from black_primer import cli, lib
+
+
+EXPECTED_ANALYSIS_OUTPUT = """\
+-- primer results 📊 --
+
+68 / 69 succeeded (98.55%) ✅
+1 / 69 FAILED (1.45%) 💩
+ - 0 projects Disabled by config
+ - 0 projects skipped due to Python Version
+ - 0 skipped due to long checkout
+
+Failed Projects:
+
+## black:
+ - Returned 69
+ - stdout:
+black didn't work
+
+"""
+FAKE_PROJECT_CONFIG = {
+ "cli_arguments": ["--unittest"],
+ "expect_formatting_changes": False,
+ "git_clone_url": "https://github.com/psf/black.git",
+}
+
+
+@contextmanager
+def capture_stdout(command: Callable, *args: Any, **kwargs: Any) -> Generator:
+ old_stdout, sys.stdout = sys.stdout, StringIO()
+ try:
+ command(*args, **kwargs)
+ sys.stdout.seek(0)
+ yield sys.stdout.read()
+ finally:
+ sys.stdout = old_stdout
+
+
+@contextmanager
+def event_loop() -> Iterator[None]:
+ policy = asyncio.get_event_loop_policy()
+ loop = policy.new_event_loop()
+ asyncio.set_event_loop(loop)
+ if sys.platform == "win32":
+ asyncio.set_event_loop(asyncio.ProactorEventLoop())
+ try:
+ yield
+ finally:
+ loop.close()
+
+
+async def raise_subprocess_error(*args: Any, **kwargs: Any) -> None:
+ raise CalledProcessError(1, ["unittest", "error"], b"", b"")
+
+
+async def return_false(*args: Any, **kwargs: Any) -> bool:
+ return False
+
+
+async def return_subproccess_output(*args: Any, **kwargs: Any) -> Tuple[bytes, bytes]:
+ return (b"stdout", b"stderr")
+
+
+async def return_zero(*args: Any, **kwargs: Any) -> int:
+ return 0
+
+
+class PrimerLibTests(unittest.TestCase):
+ def test_analyze_results(self) -> None:
+ fake_results = lib.Results(
+ {
+ "disabled": 0,
+ "failed": 1,
+ "skipped_long_checkout": 0,
+ "success": 68,
+ "wrong_py_ver": 0,
+ },
+ {"black": CalledProcessError(69, ["black"], b"black didn't work", b"")},
+ )
+ with capture_stdout(lib.analyze_results, 69, fake_results) as analyze_stdout:
+ self.assertEqual(EXPECTED_ANALYSIS_OUTPUT, analyze_stdout)
+
+ @event_loop()
+ def test_black_run(self) -> None:
+ """Pretend run black to ensure we cater for all scenarios"""
+ loop = asyncio.get_event_loop()
+ repo_path = Path(gettempdir())
+ project_config = deepcopy(FAKE_PROJECT_CONFIG)
+ results = lib.Results({"failed": 0, "success": 0}, {})
+
+ # Test a successful black run
+ with patch("black_primer.lib._gen_check_output", return_subproccess_output):
+ loop.run_until_complete(lib.black_run(repo_path, project_config, results))
+ self.assertEqual(1, results.stats["success"])
+ self.assertFalse(results.failed_projects)
+
+ # Test a fail based on expecting formatting changes but not getting any
+ project_config["expect_formatting_changes"] = True
+ results = lib.Results({"failed": 0, "success": 0}, {})
+ with patch("black_primer.lib._gen_check_output", return_subproccess_output):
+ loop.run_until_complete(lib.black_run(repo_path, project_config, results))
+ self.assertEqual(1, results.stats["failed"])
+ self.assertTrue(results.failed_projects)
+
+ # Test a fail based on returning 1 and not expecting formatting changes
+ project_config["expect_formatting_changes"] = False
+ results = lib.Results({"failed": 0, "success": 0}, {})
+ with patch("black_primer.lib._gen_check_output", raise_subprocess_error):
+ loop.run_until_complete(lib.black_run(repo_path, project_config, results))
+ self.assertEqual(1, results.stats["failed"])
+ self.assertTrue(results.failed_projects)
+
+ @event_loop()
+ def test_gen_check_output(self) -> None:
+ loop = asyncio.get_event_loop()
+ stdout, stderr = loop.run_until_complete(
+ lib._gen_check_output([lib.BLACK_BINARY, "--help"])
+ )
+ self.assertTrue("The uncompromising code formatter" in stdout.decode("utf8"))
+ self.assertEqual(None, stderr)
+
+ # TODO: Add a test to see failure works on Windows
+ if lib.WINDOWS:
+ return
+
+ false_bin = "/usr/bin/false" if system() == "Darwin" else "/bin/false"
+ with self.assertRaises(CalledProcessError):
+ loop.run_until_complete(lib._gen_check_output([false_bin]))
+
+ with self.assertRaises(asyncio.TimeoutError):
+ loop.run_until_complete(
+ lib._gen_check_output(["/bin/sleep", "2"], timeout=0.1)
+ )
+
+ @event_loop()
+ def test_git_checkout_or_rebase(self) -> None:
+ loop = asyncio.get_event_loop()
+ project_config = deepcopy(FAKE_PROJECT_CONFIG)
+ work_path = Path(gettempdir())
+
+ expected_repo_path = work_path / "black"
+ with patch("black_primer.lib._gen_check_output", return_subproccess_output):
+ returned_repo_path = loop.run_until_complete(
+ lib.git_checkout_or_rebase(work_path, project_config)
+ )
+ self.assertEqual(expected_repo_path, returned_repo_path)
+
+ @patch("sys.stdout", new_callable=StringIO)
+ @event_loop()
+ def test_process_queue(self, mock_stdout: Mock) -> None:
+ loop = asyncio.get_event_loop()
+ config_path = Path(lib.__file__).parent / "primer.json"
+ with patch("black_primer.lib.git_checkout_or_rebase", return_false):
+ with TemporaryDirectory() as td:
+ return_val = loop.run_until_complete(
+ lib.process_queue(str(config_path), td, 2)
+ )
+ self.assertEqual(0, return_val)
+
+
+class PrimerCLITests(unittest.TestCase):
+ @event_loop()
+ def test_async_main(self) -> None:
+ loop = asyncio.get_event_loop()
+ work_dir = Path(gettempdir()) / f"primer_ut_{getpid()}"
+ args = {
+ "config": "/config",
+ "debug": False,
+ "keep": False,
+ "long_checkouts": False,
+ "rebase": False,
+ "workdir": str(work_dir),
+ "workers": 69,
+ }
+ with patch("black_primer.cli.lib.process_queue", return_zero):
+ return_val = loop.run_until_complete(cli.async_main(**args))
+ self.assertEqual(0, return_val)
+
+ def test_handle_debug(self) -> None:
+ self.assertTrue(cli._handle_debug(None, None, True))
+
+ def test_help_output(self) -> None:
+ runner = CliRunner()
+ result = runner.invoke(cli.main, ["--help"])
+ self.assertEqual(result.exit_code, 0)
+
+
+if __name__ == "__main__":
+ unittest.main()