X-Git-Url: https://git.madduck.net/etc/vim.git/blobdiff_plain/3f68af9813542a5e277b7a51b316644241c372d7..dc71922c768e543c9c3bbb1db5ea6d7fa801a814:/.vim/bundle/black/src/black_primer/lib.py diff --git a/.vim/bundle/black/src/black_primer/lib.py b/.vim/bundle/black/src/black_primer/lib.py new file mode 100644 index 0000000..7494ae6 --- /dev/null +++ b/.vim/bundle/black/src/black_primer/lib.py @@ -0,0 +1,418 @@ +import asyncio +import errno +import json +import logging +import os +import stat +import sys +from functools import partial +from pathlib import Path +from platform import system +from shutil import rmtree, which +from subprocess import CalledProcessError +from sys import version_info +from tempfile import TemporaryDirectory +from typing import ( + Any, + Callable, + Dict, + List, + NamedTuple, + Optional, + Sequence, + Tuple, + Union, +) +from urllib.parse import urlparse + +import click + + +TEN_MINUTES_SECONDS = 600 +WINDOWS = system() == "Windows" +BLACK_BINARY = "black.exe" if WINDOWS else "black" +GIT_BINARY = "git.exe" if WINDOWS else "git" +LOG = logging.getLogger(__name__) + + +# Windows needs a ProactorEventLoop if you want to exec subprocesses +# Starting with 3.8 this is the default - can remove when Black >= 3.8 +# mypy only respects sys.platform if directly in the evaluation +# https://mypy.readthedocs.io/en/latest/common_issues.html#python-version-and-system-platform-checks # noqa: B950 +if sys.platform == "win32": + asyncio.set_event_loop(asyncio.ProactorEventLoop()) + + +class Results(NamedTuple): + stats: Dict[str, int] = {} + failed_projects: Dict[str, CalledProcessError] = {} + + +async def _gen_check_output( + cmd: Sequence[str], + timeout: float = TEN_MINUTES_SECONDS, + env: Optional[Dict[str, str]] = None, + cwd: Optional[Path] = None, + stdin: Optional[bytes] = None, +) -> Tuple[bytes, bytes]: + process = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + env=env, + cwd=cwd, + ) + try: + (stdout, stderr) = await asyncio.wait_for(process.communicate(stdin), timeout) + except asyncio.TimeoutError: + process.kill() + await process.wait() + raise + + # A non-optional timeout was supplied to asyncio.wait_for, guaranteeing + # a timeout or completed process. A terminated Python process will have a + # non-empty returncode value. + assert process.returncode is not None + + if process.returncode != 0: + cmd_str = " ".join(cmd) + raise CalledProcessError( + process.returncode, cmd_str, output=stdout, stderr=stderr + ) + + return (stdout, stderr) + + +def analyze_results(project_count: int, results: Results) -> int: + failed_pct = round(((results.stats["failed"] / project_count) * 100), 2) + success_pct = round(((results.stats["success"] / project_count) * 100), 2) + + click.secho("-- primer results 📊 --\n", bold=True) + click.secho( + f"{results.stats['success']} / {project_count} succeeded ({success_pct}%) ✅", + bold=True, + fg="green", + ) + click.secho( + f"{results.stats['failed']} / {project_count} FAILED ({failed_pct}%) 💩", + bold=bool(results.stats["failed"]), + fg="red", + ) + s = "" if results.stats["disabled"] == 1 else "s" + click.echo(f" - {results.stats['disabled']} project{s} disabled by config") + s = "" if results.stats["wrong_py_ver"] == 1 else "s" + click.echo( + f" - {results.stats['wrong_py_ver']} project{s} skipped due to Python version" + ) + click.echo( + f" - {results.stats['skipped_long_checkout']} skipped due to long checkout" + ) + + if results.failed_projects: + click.secho("\nFailed projects:\n", bold=True) + + for project_name, project_cpe in results.failed_projects.items(): + print(f"## {project_name}:") + print(f" - Returned {project_cpe.returncode}") + if project_cpe.stderr: + print(f" - stderr:\n{project_cpe.stderr.decode('utf8')}") + if project_cpe.stdout: + print(f" - stdout:\n{project_cpe.stdout.decode('utf8')}") + print("") + + return results.stats["failed"] + + +def _flatten_cli_args(cli_args: List[Union[Sequence[str], str]]) -> List[str]: + """Allow a user to put long arguments into a list of strs + to make the JSON human readable""" + flat_args = [] + for arg in cli_args: + if isinstance(arg, str): + flat_args.append(arg) + continue + + args_as_str = "".join(arg) + flat_args.append(args_as_str) + + return flat_args + + +async def black_run( + project_name: str, + repo_path: Optional[Path], + project_config: Dict[str, Any], + results: Results, + no_diff: bool = False, +) -> None: + """Run Black and record failures""" + if not repo_path: + results.stats["failed"] += 1 + results.failed_projects[project_name] = CalledProcessError( + 69, [], f"{project_name} has no repo_path: {repo_path}".encode(), b"" + ) + return + + stdin_test = project_name.upper() == "STDIN" + cmd = [str(which(BLACK_BINARY))] + if "cli_arguments" in project_config and project_config["cli_arguments"]: + cmd.extend(_flatten_cli_args(project_config["cli_arguments"])) + cmd.append("--check") + if not no_diff: + cmd.append("--diff") + + # Workout if we should read in a python file or search from cwd + stdin = None + if stdin_test: + cmd.append("-") + stdin = repo_path.read_bytes() + elif "base_path" in project_config: + cmd.append(project_config["base_path"]) + else: + cmd.append(".") + + timeout = ( + project_config["timeout_seconds"] + if "timeout_seconds" in project_config + else TEN_MINUTES_SECONDS + ) + with TemporaryDirectory() as tmp_path: + # Prevent reading top-level user configs by manipulating environment variables + env = { + **os.environ, + "XDG_CONFIG_HOME": tmp_path, # Unix-like + "USERPROFILE": tmp_path, # Windows (changes `Path.home()` output) + } + + cwd_path = repo_path.parent if stdin_test else repo_path + try: + LOG.debug(f"Running black for {project_name}: {' '.join(cmd)}") + _stdout, _stderr = await _gen_check_output( + cmd, cwd=cwd_path, env=env, stdin=stdin, timeout=timeout + ) + except asyncio.TimeoutError: + results.stats["failed"] += 1 + LOG.error(f"Running black for {repo_path} timed out ({cmd})") + except CalledProcessError as cpe: + # TODO: Tune for smarter for higher signal + # If any other return value than 1 we raise - can disable project in config + if cpe.returncode == 1: + if not project_config["expect_formatting_changes"]: + results.stats["failed"] += 1 + results.failed_projects[repo_path.name] = cpe + else: + results.stats["success"] += 1 + return + elif cpe.returncode > 1: + results.stats["failed"] += 1 + results.failed_projects[repo_path.name] = cpe + return + + LOG.error(f"Unknown error with {repo_path}") + raise + + # If we get here and expect formatting changes something is up + if project_config["expect_formatting_changes"]: + results.stats["failed"] += 1 + results.failed_projects[repo_path.name] = CalledProcessError( + 0, cmd, b"Expected formatting changes but didn't get any!", b"" + ) + return + + results.stats["success"] += 1 + + +async def git_checkout_or_rebase( + work_path: Path, + project_config: Dict[str, Any], + rebase: bool = False, + *, + depth: int = 1, +) -> Optional[Path]: + """git Clone project or rebase""" + git_bin = str(which(GIT_BINARY)) + if not git_bin: + LOG.error("No git binary found") + return None + + repo_url_parts = urlparse(project_config["git_clone_url"]) + path_parts = repo_url_parts.path[1:].split("/", maxsplit=1) + + repo_path: Path = work_path / path_parts[1].replace(".git", "") + cmd = [git_bin, "clone", "--depth", str(depth), project_config["git_clone_url"]] + cwd = work_path + if repo_path.exists() and rebase: + cmd = [git_bin, "pull", "--rebase"] + cwd = repo_path + elif repo_path.exists(): + return repo_path + + try: + _stdout, _stderr = await _gen_check_output(cmd, cwd=cwd) + except (asyncio.TimeoutError, CalledProcessError) as e: + LOG.error(f"Unable to git clone / pull {project_config['git_clone_url']}: {e}") + return None + + return repo_path + + +def handle_PermissionError( + func: Callable, path: Path, exc: Tuple[Any, Any, Any] +) -> None: + """ + Handle PermissionError during shutil.rmtree. + + This checks if the erroring function is either 'os.rmdir' or 'os.unlink', and that + the error was EACCES (i.e. Permission denied). If true, the path is set writable, + readable, and executable by everyone. Finally, it tries the error causing delete + operation again. + + If the check is false, then the original error will be reraised as this function + can't handle it. + """ + excvalue = exc[1] + LOG.debug(f"Handling {excvalue} from {func.__name__}... ") + if func in (os.rmdir, os.unlink) and excvalue.errno == errno.EACCES: + LOG.debug(f"Setting {path} writable, readable, and executable by everyone... ") + os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # chmod 0777 + func(path) # Try the error causing delete operation again + else: + raise + + +async def load_projects_queue( + config_path: Path, +) -> Tuple[Dict[str, Any], asyncio.Queue]: + """Load project config and fill queue with all the project names""" + with config_path.open("r") as cfp: + config = json.load(cfp) + + # TODO: Offer more options here + # e.g. Run on X random packages or specific sub list etc. + project_names = sorted(config["projects"].keys()) + queue: asyncio.Queue = asyncio.Queue(maxsize=len(project_names)) + for project in project_names: + await queue.put(project) + + return config, queue + + +async def project_runner( + idx: int, + config: Dict[str, Any], + queue: asyncio.Queue, + work_path: Path, + results: Results, + long_checkouts: bool = False, + rebase: bool = False, + keep: bool = False, + no_diff: bool = False, +) -> None: + """Check out project and run Black on it + record result""" + loop = asyncio.get_event_loop() + py_version = f"{version_info[0]}.{version_info[1]}" + while True: + try: + project_name = queue.get_nowait() + except asyncio.QueueEmpty: + LOG.debug(f"project_runner {idx} exiting") + return + LOG.debug(f"worker {idx} working on {project_name}") + + project_config = config["projects"][project_name] + + # Check if disabled by config + if "disabled" in project_config and project_config["disabled"]: + results.stats["disabled"] += 1 + LOG.info(f"Skipping {project_name} as it's disabled via config") + continue + + # Check if we should run on this version of Python + if ( + "all" not in project_config["py_versions"] + and py_version not in project_config["py_versions"] + ): + results.stats["wrong_py_ver"] += 1 + LOG.debug(f"Skipping {project_name} as it's not enabled for {py_version}") + continue + + # Check if we're doing big projects / long checkouts + if not long_checkouts and project_config["long_checkout"]: + results.stats["skipped_long_checkout"] += 1 + LOG.debug(f"Skipping {project_name} as it's configured as a long checkout") + continue + + repo_path: Optional[Path] = Path(__file__) + stdin_project = project_name.upper() == "STDIN" + if not stdin_project: + repo_path = await git_checkout_or_rebase(work_path, project_config, rebase) + if not repo_path: + continue + await black_run(project_name, repo_path, project_config, results, no_diff) + + if not keep and not stdin_project: + LOG.debug(f"Removing {repo_path}") + rmtree_partial = partial( + rmtree, path=repo_path, onerror=handle_PermissionError + ) + await loop.run_in_executor(None, rmtree_partial) + + LOG.info(f"Finished {project_name}") + + +async def process_queue( + config_file: str, + work_path: Path, + workers: int, + keep: bool = False, + long_checkouts: bool = False, + rebase: bool = False, + no_diff: bool = False, +) -> int: + """ + Process the queue with X workers and evaluate results + - Success is guaged via the config "expect_formatting_changes" + + Integer return equals the number of failed projects + """ + results = Results() + results.stats["disabled"] = 0 + results.stats["failed"] = 0 + results.stats["skipped_long_checkout"] = 0 + results.stats["success"] = 0 + results.stats["wrong_py_ver"] = 0 + + config, queue = await load_projects_queue(Path(config_file)) + project_count = queue.qsize() + s = "" if project_count == 1 else "s" + LOG.info(f"{project_count} project{s} to run Black over") + if project_count < 1: + return -1 + + s = "" if workers == 1 else "s" + LOG.debug(f"Using {workers} parallel worker{s} to run Black") + # Wait until we finish running all the projects before analyzing + await asyncio.gather( + *[ + project_runner( + i, + config, + queue, + work_path, + results, + long_checkouts, + rebase, + keep, + no_diff, + ) + for i in range(workers) + ] + ) + + LOG.info("Analyzing results") + return analyze_results(project_count, results) + + +if __name__ == "__main__": # pragma: nocover + raise NotImplementedError("lib is a library, funnily enough.")