-#!/usr/bin/env python3
-
-# Module '__future__' has no attribute 'annotations'
-from __future__ import annotations # type: ignore
-
import asyncio
+import errno
import json
import logging
+import os
+import stat
+import sys
+from functools import partial
from pathlib import Path
+from platform import system
from shutil import rmtree, which
from subprocess import CalledProcessError
from sys import version_info
-from typing import Any, Dict, NamedTuple, Optional, Sequence, Tuple
+from tempfile import TemporaryDirectory
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ List,
+ NamedTuple,
+ Optional,
+ Sequence,
+ Tuple,
+ Union,
+)
from urllib.parse import urlparse
import click
+TEN_MINUTES_SECONDS = 600
+WINDOWS = system() == "Windows"
+BLACK_BINARY = "black.exe" if WINDOWS else "black"
+GIT_BINARY = "git.exe" if WINDOWS else "git"
LOG = logging.getLogger(__name__)
+# Windows needs a ProactorEventLoop if you want to exec subprocesses
+# Starting with 3.8 this is the default - can remove when Black >= 3.8
+# mypy only respects sys.platform if directly in the evaluation
+# https://mypy.readthedocs.io/en/latest/common_issues.html#python-version-and-system-platform-checks # noqa: B950
+if sys.platform == "win32":
+ asyncio.set_event_loop(asyncio.ProactorEventLoop())
+
+
class Results(NamedTuple):
stats: Dict[str, int] = {}
failed_projects: Dict[str, CalledProcessError] = {}
async def _gen_check_output(
cmd: Sequence[str],
- timeout: float = 30,
+ timeout: float = TEN_MINUTES_SECONDS,
env: Optional[Dict[str, str]] = None,
cwd: Optional[Path] = None,
+ stdin: Optional[bytes] = None,
) -> Tuple[bytes, bytes]:
process = await asyncio.create_subprocess_exec(
*cmd,
+ stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env=env,
cwd=cwd,
)
try:
- (stdout, stderr) = await asyncio.wait_for(process.communicate(), timeout)
+ (stdout, stderr) = await asyncio.wait_for(process.communicate(stdin), timeout)
except asyncio.TimeoutError:
process.kill()
await process.wait()
raise
+ # A non-optional timeout was supplied to asyncio.wait_for, guaranteeing
+ # a timeout or completed process. A terminated Python process will have a
+ # non-empty returncode value.
+ assert process.returncode is not None
+
if process.returncode != 0:
cmd_str = " ".join(cmd)
raise CalledProcessError(
return (stdout, stderr)
-async def analyze_results(project_count: int, results: Results) -> int:
+def analyze_results(project_count: int, results: Results) -> int:
failed_pct = round(((results.stats["failed"] / project_count) * 100), 2)
success_pct = round(((results.stats["success"] / project_count) * 100), 2)
bold=bool(results.stats["failed"]),
fg="red",
)
- click.echo(f" - {results.stats['disabled']} projects Disabled by config")
+ s = "" if results.stats["disabled"] == 1 else "s"
+ click.echo(f" - {results.stats['disabled']} project{s} disabled by config")
+ s = "" if results.stats["wrong_py_ver"] == 1 else "s"
click.echo(
- f" - {results.stats['wrong_py_ver']} projects skipped due to Python Version"
+ f" - {results.stats['wrong_py_ver']} project{s} skipped due to Python version"
)
click.echo(
f" - {results.stats['skipped_long_checkout']} skipped due to long checkout"
)
if results.failed_projects:
- click.secho("\nFailed Projects:\n", bold=True)
+ click.secho("\nFailed projects:\n", bold=True)
for project_name, project_cpe in results.failed_projects.items():
print(f"## {project_name}:")
return results.stats["failed"]
+def _flatten_cli_args(cli_args: List[Union[Sequence[str], str]]) -> List[str]:
+ """Allow a user to put long arguments into a list of strs
+ to make the JSON human readable"""
+ flat_args = []
+ for arg in cli_args:
+ if isinstance(arg, str):
+ flat_args.append(arg)
+ continue
+
+ args_as_str = "".join(arg)
+ flat_args.append(args_as_str)
+
+ return flat_args
+
+
async def black_run(
- repo_path: Path, project_config: Dict[str, Any], results: Results
+ project_name: str,
+ repo_path: Optional[Path],
+ project_config: Dict[str, Any],
+ results: Results,
+ no_diff: bool = False,
) -> None:
- """Run black and record failures"""
- cmd = [str(which("black"))]
- if project_config["cli_arguments"]:
- cmd.extend(*project_config["cli_arguments"])
- cmd.extend(["--check", "--diff", "."])
-
- try:
- _stdout, _stderr = await _gen_check_output(cmd, cwd=repo_path)
- except asyncio.TimeoutError:
+ """Run Black and record failures"""
+ if not repo_path:
results.stats["failed"] += 1
- LOG.error(f"Running black for {repo_path} timed out ({cmd})")
- except CalledProcessError as cpe:
- # TODO: This might need to be tuned and made smarter for higher signal
- if not project_config["expect_formatting_changes"] and cpe.returncode == 1:
+ results.failed_projects[project_name] = CalledProcessError(
+ 69, [], f"{project_name} has no repo_path: {repo_path}".encode(), b""
+ )
+ return
+
+ stdin_test = project_name.upper() == "STDIN"
+ cmd = [str(which(BLACK_BINARY))]
+ if "cli_arguments" in project_config and project_config["cli_arguments"]:
+ cmd.extend(_flatten_cli_args(project_config["cli_arguments"]))
+ cmd.append("--check")
+ if not no_diff:
+ cmd.append("--diff")
+
+ # Workout if we should read in a python file or search from cwd
+ stdin = None
+ if stdin_test:
+ cmd.append("-")
+ stdin = repo_path.read_bytes()
+ elif "base_path" in project_config:
+ cmd.append(project_config["base_path"])
+ else:
+ cmd.append(".")
+
+ timeout = (
+ project_config["timeout_seconds"]
+ if "timeout_seconds" in project_config
+ else TEN_MINUTES_SECONDS
+ )
+ with TemporaryDirectory() as tmp_path:
+ # Prevent reading top-level user configs by manipulating environment variables
+ env = {
+ **os.environ,
+ "XDG_CONFIG_HOME": tmp_path, # Unix-like
+ "USERPROFILE": tmp_path, # Windows (changes `Path.home()` output)
+ }
+
+ cwd_path = repo_path.parent if stdin_test else repo_path
+ try:
+ LOG.debug(f"Running black for {project_name}: {' '.join(cmd)}")
+ _stdout, _stderr = await _gen_check_output(
+ cmd, cwd=cwd_path, env=env, stdin=stdin, timeout=timeout
+ )
+ except asyncio.TimeoutError:
results.stats["failed"] += 1
- results.failed_projects[repo_path.name] = cpe
- return
+ LOG.error(f"Running black for {repo_path} timed out ({cmd})")
+ except CalledProcessError as cpe:
+ # TODO: Tune for smarter for higher signal
+ # If any other return value than 1 we raise - can disable project in config
+ if cpe.returncode == 1:
+ if not project_config["expect_formatting_changes"]:
+ results.stats["failed"] += 1
+ results.failed_projects[repo_path.name] = cpe
+ else:
+ results.stats["success"] += 1
+ return
+ elif cpe.returncode > 1:
+ results.stats["failed"] += 1
+ results.failed_projects[repo_path.name] = cpe
+ return
+
+ LOG.error(f"Unknown error with {repo_path}")
+ raise
+
+ # If we get here and expect formatting changes something is up
+ if project_config["expect_formatting_changes"]:
+ results.stats["failed"] += 1
+ results.failed_projects[repo_path.name] = CalledProcessError(
+ 0, cmd, b"Expected formatting changes but didn't get any!", b""
+ )
+ return
results.stats["success"] += 1
depth: int = 1,
) -> Optional[Path]:
"""git Clone project or rebase"""
- git_bin = str(which("git"))
+ git_bin = str(which(GIT_BINARY))
if not git_bin:
LOG.error("No git binary found")
return None
return repo_path
+def handle_PermissionError(
+ func: Callable, path: Path, exc: Tuple[Any, Any, Any]
+) -> None:
+ """
+ Handle PermissionError during shutil.rmtree.
+
+ This checks if the erroring function is either 'os.rmdir' or 'os.unlink', and that
+ the error was EACCES (i.e. Permission denied). If true, the path is set writable,
+ readable, and executable by everyone. Finally, it tries the error causing delete
+ operation again.
+
+ If the check is false, then the original error will be reraised as this function
+ can't handle it.
+ """
+ excvalue = exc[1]
+ LOG.debug(f"Handling {excvalue} from {func.__name__}... ")
+ if func in (os.rmdir, os.unlink) and excvalue.errno == errno.EACCES:
+ LOG.debug(f"Setting {path} writable, readable, and executable by everyone... ")
+ os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # chmod 0777
+ func(path) # Try the error causing delete operation again
+ else:
+ raise
+
+
async def load_projects_queue(
config_path: Path,
-) -> Tuple[Dict[str, Any], asyncio.Queue[str]]:
+) -> Tuple[Dict[str, Any], asyncio.Queue]:
"""Load project config and fill queue with all the project names"""
with config_path.open("r") as cfp:
config = json.load(cfp)
# TODO: Offer more options here
# e.g. Run on X random packages or specific sub list etc.
project_names = sorted(config["projects"].keys())
- queue: asyncio.Queue[str] = asyncio.Queue(maxsize=len(project_names))
+ queue: asyncio.Queue = asyncio.Queue(maxsize=len(project_names))
for project in project_names:
await queue.put(project)
async def project_runner(
idx: int,
config: Dict[str, Any],
- queue: asyncio.Queue[str],
+ queue: asyncio.Queue,
work_path: Path,
results: Results,
long_checkouts: bool = False,
rebase: bool = False,
keep: bool = False,
+ no_diff: bool = False,
) -> None:
- """Checkout project and run black on it + record result"""
+ """Check out project and run Black on it + record result"""
loop = asyncio.get_event_loop()
py_version = f"{version_info[0]}.{version_info[1]}"
while True:
except asyncio.QueueEmpty:
LOG.debug(f"project_runner {idx} exiting")
return
+ LOG.debug(f"worker {idx} working on {project_name}")
project_config = config["projects"][project_name]
LOG.debug(f"Skipping {project_name} as it's configured as a long checkout")
continue
- repo_path = await git_checkout_or_rebase(work_path, project_config, rebase)
- if not repo_path:
- continue
- await black_run(repo_path, project_config, results)
+ repo_path: Optional[Path] = Path(__file__)
+ stdin_project = project_name.upper() == "STDIN"
+ if not stdin_project:
+ repo_path = await git_checkout_or_rebase(work_path, project_config, rebase)
+ if not repo_path:
+ continue
+ await black_run(project_name, repo_path, project_config, results, no_diff)
- if not keep:
+ if not keep and not stdin_project:
LOG.debug(f"Removing {repo_path}")
- await loop.run_in_executor(None, rmtree, repo_path)
+ rmtree_partial = partial(
+ rmtree, path=repo_path, onerror=handle_PermissionError
+ )
+ await loop.run_in_executor(None, rmtree_partial)
+
+ LOG.info(f"Finished {project_name}")
async def process_queue(
keep: bool = False,
long_checkouts: bool = False,
rebase: bool = False,
+ no_diff: bool = False,
) -> int:
"""
Process the queue with X workers and evaluate results
config, queue = await load_projects_queue(Path(config_file))
project_count = queue.qsize()
- LOG.info(f"{project_count} projects to run black over")
- if not project_count:
+ s = "" if project_count == 1 else "s"
+ LOG.info(f"{project_count} project{s} to run Black over")
+ if project_count < 1:
return -1
- LOG.debug(f"Using {workers} parallel workers to run black")
+ s = "" if workers == 1 else "s"
+ LOG.debug(f"Using {workers} parallel worker{s} to run Black")
# Wait until we finish running all the projects before analyzing
await asyncio.gather(
*[
project_runner(
- i, config, queue, work_path, results, long_checkouts, rebase, keep
+ i,
+ config,
+ queue,
+ work_path,
+ results,
+ long_checkouts,
+ rebase,
+ keep,
+ no_diff,
)
for i in range(workers)
]
)
LOG.info("Analyzing results")
- return await analyze_results(project_count, results)
+ return analyze_results(project_count, results)
+
+
+if __name__ == "__main__": # pragma: nocover
+ raise NotImplementedError("lib is a library, funnily enough.")