X-Git-Url: https://git.madduck.net/etc/vim.git/blobdiff_plain/b50a52708c564a9ded05b579e39eadbb928050a3..911470a610e47d9da5ea938b0887c3df62819b85:/src/black_primer/lib.py?ds=inline diff --git a/src/black_primer/lib.py b/src/black_primer/lib.py index 87028d7..7494ae6 100644 --- a/src/black_primer/lib.py +++ b/src/black_primer/lib.py @@ -1,24 +1,48 @@ -#!/usr/bin/env python3 - -# Module '__future__' has no attribute 'annotations' -from __future__ import annotations # type: ignore - import asyncio +import errno import json import logging +import os +import stat +import sys +from functools import partial from pathlib import Path +from platform import system from shutil import rmtree, which from subprocess import CalledProcessError from sys import version_info -from typing import Any, Dict, NamedTuple, Optional, Sequence, Tuple +from tempfile import TemporaryDirectory +from typing import ( + Any, + Callable, + Dict, + List, + NamedTuple, + Optional, + Sequence, + Tuple, + Union, +) from urllib.parse import urlparse import click +TEN_MINUTES_SECONDS = 600 +WINDOWS = system() == "Windows" +BLACK_BINARY = "black.exe" if WINDOWS else "black" +GIT_BINARY = "git.exe" if WINDOWS else "git" LOG = logging.getLogger(__name__) +# Windows needs a ProactorEventLoop if you want to exec subprocesses +# Starting with 3.8 this is the default - can remove when Black >= 3.8 +# mypy only respects sys.platform if directly in the evaluation +# https://mypy.readthedocs.io/en/latest/common_issues.html#python-version-and-system-platform-checks # noqa: B950 +if sys.platform == "win32": + asyncio.set_event_loop(asyncio.ProactorEventLoop()) + + class Results(NamedTuple): stats: Dict[str, int] = {} failed_projects: Dict[str, CalledProcessError] = {} @@ -26,24 +50,31 @@ class Results(NamedTuple): async def _gen_check_output( cmd: Sequence[str], - timeout: float = 30, + timeout: float = TEN_MINUTES_SECONDS, env: Optional[Dict[str, str]] = None, cwd: Optional[Path] = None, + stdin: Optional[bytes] = None, ) -> Tuple[bytes, bytes]: process = await asyncio.create_subprocess_exec( *cmd, + stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, env=env, cwd=cwd, ) try: - (stdout, stderr) = await asyncio.wait_for(process.communicate(), timeout) + (stdout, stderr) = await asyncio.wait_for(process.communicate(stdin), timeout) except asyncio.TimeoutError: process.kill() await process.wait() raise + # A non-optional timeout was supplied to asyncio.wait_for, guaranteeing + # a timeout or completed process. A terminated Python process will have a + # non-empty returncode value. + assert process.returncode is not None + if process.returncode != 0: cmd_str = " ".join(cmd) raise CalledProcessError( @@ -53,11 +84,11 @@ async def _gen_check_output( return (stdout, stderr) -async def analyze_results(project_count: int, results: Results) -> int: +def analyze_results(project_count: int, results: Results) -> int: failed_pct = round(((results.stats["failed"] / project_count) * 100), 2) success_pct = round(((results.stats["success"] / project_count) * 100), 2) - click.secho(f"-- primer results 📊 --\n", bold=True) + click.secho("-- primer results 📊 --\n", bold=True) click.secho( f"{results.stats['success']} / {project_count} succeeded ({success_pct}%) ✅", bold=True, @@ -68,16 +99,18 @@ async def analyze_results(project_count: int, results: Results) -> int: bold=bool(results.stats["failed"]), fg="red", ) - click.echo(f" - {results.stats['disabled']} projects Disabled by config") + s = "" if results.stats["disabled"] == 1 else "s" + click.echo(f" - {results.stats['disabled']} project{s} disabled by config") + s = "" if results.stats["wrong_py_ver"] == 1 else "s" click.echo( - f" - {results.stats['wrong_py_ver']} projects skipped due to Python Version" + f" - {results.stats['wrong_py_ver']} project{s} skipped due to Python version" ) click.echo( f" - {results.stats['skipped_long_checkout']} skipped due to long checkout" ) if results.failed_projects: - click.secho(f"\nFailed Projects:\n", bold=True) + click.secho("\nFailed projects:\n", bold=True) for project_name, project_cpe in results.failed_projects.items(): print(f"## {project_name}:") @@ -91,26 +124,101 @@ async def analyze_results(project_count: int, results: Results) -> int: return results.stats["failed"] +def _flatten_cli_args(cli_args: List[Union[Sequence[str], str]]) -> List[str]: + """Allow a user to put long arguments into a list of strs + to make the JSON human readable""" + flat_args = [] + for arg in cli_args: + if isinstance(arg, str): + flat_args.append(arg) + continue + + args_as_str = "".join(arg) + flat_args.append(args_as_str) + + return flat_args + + async def black_run( - repo_path: Path, project_config: Dict[str, Any], results: Results + project_name: str, + repo_path: Optional[Path], + project_config: Dict[str, Any], + results: Results, + no_diff: bool = False, ) -> None: - """Run black and record failures""" - cmd = [str(which("black"))] - if project_config["cli_arguments"]: - cmd.extend(*project_config["cli_arguments"]) - cmd.extend(["--check", "--diff", "."]) - - try: - _stdout, _stderr = await _gen_check_output(cmd, cwd=repo_path) - except asyncio.TimeoutError: + """Run Black and record failures""" + if not repo_path: results.stats["failed"] += 1 - LOG.error(f"Running black for {repo_path} timed out ({cmd})") - except CalledProcessError as cpe: - # TODO: This might need to be tuned and made smarter for higher signal - if not project_config["expect_formatting_changes"] and cpe.returncode == 1: + results.failed_projects[project_name] = CalledProcessError( + 69, [], f"{project_name} has no repo_path: {repo_path}".encode(), b"" + ) + return + + stdin_test = project_name.upper() == "STDIN" + cmd = [str(which(BLACK_BINARY))] + if "cli_arguments" in project_config and project_config["cli_arguments"]: + cmd.extend(_flatten_cli_args(project_config["cli_arguments"])) + cmd.append("--check") + if not no_diff: + cmd.append("--diff") + + # Workout if we should read in a python file or search from cwd + stdin = None + if stdin_test: + cmd.append("-") + stdin = repo_path.read_bytes() + elif "base_path" in project_config: + cmd.append(project_config["base_path"]) + else: + cmd.append(".") + + timeout = ( + project_config["timeout_seconds"] + if "timeout_seconds" in project_config + else TEN_MINUTES_SECONDS + ) + with TemporaryDirectory() as tmp_path: + # Prevent reading top-level user configs by manipulating environment variables + env = { + **os.environ, + "XDG_CONFIG_HOME": tmp_path, # Unix-like + "USERPROFILE": tmp_path, # Windows (changes `Path.home()` output) + } + + cwd_path = repo_path.parent if stdin_test else repo_path + try: + LOG.debug(f"Running black for {project_name}: {' '.join(cmd)}") + _stdout, _stderr = await _gen_check_output( + cmd, cwd=cwd_path, env=env, stdin=stdin, timeout=timeout + ) + except asyncio.TimeoutError: results.stats["failed"] += 1 - results.failed_projects[repo_path.name] = cpe - return + LOG.error(f"Running black for {repo_path} timed out ({cmd})") + except CalledProcessError as cpe: + # TODO: Tune for smarter for higher signal + # If any other return value than 1 we raise - can disable project in config + if cpe.returncode == 1: + if not project_config["expect_formatting_changes"]: + results.stats["failed"] += 1 + results.failed_projects[repo_path.name] = cpe + else: + results.stats["success"] += 1 + return + elif cpe.returncode > 1: + results.stats["failed"] += 1 + results.failed_projects[repo_path.name] = cpe + return + + LOG.error(f"Unknown error with {repo_path}") + raise + + # If we get here and expect formatting changes something is up + if project_config["expect_formatting_changes"]: + results.stats["failed"] += 1 + results.failed_projects[repo_path.name] = CalledProcessError( + 0, cmd, b"Expected formatting changes but didn't get any!", b"" + ) + return results.stats["success"] += 1 @@ -123,9 +231,9 @@ async def git_checkout_or_rebase( depth: int = 1, ) -> Optional[Path]: """git Clone project or rebase""" - git_bin = str(which("git")) + git_bin = str(which(GIT_BINARY)) if not git_bin: - LOG.error(f"No git binary found") + LOG.error("No git binary found") return None repo_url_parts = urlparse(project_config["git_clone_url"]) @@ -149,9 +257,33 @@ async def git_checkout_or_rebase( return repo_path +def handle_PermissionError( + func: Callable, path: Path, exc: Tuple[Any, Any, Any] +) -> None: + """ + Handle PermissionError during shutil.rmtree. + + This checks if the erroring function is either 'os.rmdir' or 'os.unlink', and that + the error was EACCES (i.e. Permission denied). If true, the path is set writable, + readable, and executable by everyone. Finally, it tries the error causing delete + operation again. + + If the check is false, then the original error will be reraised as this function + can't handle it. + """ + excvalue = exc[1] + LOG.debug(f"Handling {excvalue} from {func.__name__}... ") + if func in (os.rmdir, os.unlink) and excvalue.errno == errno.EACCES: + LOG.debug(f"Setting {path} writable, readable, and executable by everyone... ") + os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # chmod 0777 + func(path) # Try the error causing delete operation again + else: + raise + + async def load_projects_queue( config_path: Path, -) -> Tuple[Dict[str, Any], asyncio.Queue[str]]: +) -> Tuple[Dict[str, Any], asyncio.Queue]: """Load project config and fill queue with all the project names""" with config_path.open("r") as cfp: config = json.load(cfp) @@ -159,7 +291,7 @@ async def load_projects_queue( # TODO: Offer more options here # e.g. Run on X random packages or specific sub list etc. project_names = sorted(config["projects"].keys()) - queue: asyncio.Queue[str] = asyncio.Queue(maxsize=len(project_names)) + queue: asyncio.Queue = asyncio.Queue(maxsize=len(project_names)) for project in project_names: await queue.put(project) @@ -169,14 +301,15 @@ async def load_projects_queue( async def project_runner( idx: int, config: Dict[str, Any], - queue: asyncio.Queue[str], + queue: asyncio.Queue, work_path: Path, results: Results, long_checkouts: bool = False, rebase: bool = False, keep: bool = False, + no_diff: bool = False, ) -> None: - """Checkout project and run black on it + record result""" + """Check out project and run Black on it + record result""" loop = asyncio.get_event_loop() py_version = f"{version_info[0]}.{version_info[1]}" while True: @@ -185,6 +318,7 @@ async def project_runner( except asyncio.QueueEmpty: LOG.debug(f"project_runner {idx} exiting") return + LOG.debug(f"worker {idx} working on {project_name}") project_config = config["projects"][project_name] @@ -209,14 +343,22 @@ async def project_runner( LOG.debug(f"Skipping {project_name} as it's configured as a long checkout") continue - repo_path = await git_checkout_or_rebase(work_path, project_config, rebase) - if not repo_path: - continue - await black_run(repo_path, project_config, results) + repo_path: Optional[Path] = Path(__file__) + stdin_project = project_name.upper() == "STDIN" + if not stdin_project: + repo_path = await git_checkout_or_rebase(work_path, project_config, rebase) + if not repo_path: + continue + await black_run(project_name, repo_path, project_config, results, no_diff) - if not keep: + if not keep and not stdin_project: LOG.debug(f"Removing {repo_path}") - await loop.run_in_executor(None, rmtree, repo_path) + rmtree_partial = partial( + rmtree, path=repo_path, onerror=handle_PermissionError + ) + await loop.run_in_executor(None, rmtree_partial) + + LOG.info(f"Finished {project_name}") async def process_queue( @@ -226,6 +368,7 @@ async def process_queue( keep: bool = False, long_checkouts: bool = False, rebase: bool = False, + no_diff: bool = False, ) -> int: """ Process the queue with X workers and evaluate results @@ -242,20 +385,34 @@ async def process_queue( config, queue = await load_projects_queue(Path(config_file)) project_count = queue.qsize() - LOG.info(f"{project_count} projects to run black over") - if not project_count: + s = "" if project_count == 1 else "s" + LOG.info(f"{project_count} project{s} to run Black over") + if project_count < 1: return -1 - LOG.debug(f"Using {workers} parallel workers to run black") + s = "" if workers == 1 else "s" + LOG.debug(f"Using {workers} parallel worker{s} to run Black") # Wait until we finish running all the projects before analyzing await asyncio.gather( *[ project_runner( - i, config, queue, work_path, results, long_checkouts, rebase, keep + i, + config, + queue, + work_path, + results, + long_checkouts, + rebase, + keep, + no_diff, ) for i in range(workers) ] ) LOG.info("Analyzing results") - return await analyze_results(project_count, results) + return analyze_results(project_count, results) + + +if __name__ == "__main__": # pragma: nocover + raise NotImplementedError("lib is a library, funnily enough.")