]>
git.madduck.net Git - etc/vim.git/blobdiff - src/black_primer/lib.py
madduck's git repository
Every one of the projects in this repository is available at the canonical
URL git://git.madduck.net/madduck/pub/<projectpath> — see
each project's metadata for the exact URL.
All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@ git. madduck. net .
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
SSH access, as well as push access can be individually
arranged .
If you use my repositories frequently, consider adding the following
snippet to ~/.gitconfig and using the third clone URL listed for each
project:
[url "git://git.madduck.net/madduck/"]
insteadOf = madduck:
from subprocess import CalledProcessError
from sys import version_info
from tempfile import TemporaryDirectory
from subprocess import CalledProcessError
from sys import version_info
from tempfile import TemporaryDirectory
-from typing import Any, Callable, Dict, NamedTuple, Optional, Sequence, Tuple
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ List,
+ NamedTuple,
+ Optional,
+ Sequence,
+ Tuple,
+ Union,
+)
from urllib.parse import urlparse
import click
from urllib.parse import urlparse
import click
+TEN_MINUTES_SECONDS = 600
WINDOWS = system() == "Windows"
BLACK_BINARY = "black.exe" if WINDOWS else "black"
GIT_BINARY = "git.exe" if WINDOWS else "git"
WINDOWS = system() == "Windows"
BLACK_BINARY = "black.exe" if WINDOWS else "black"
GIT_BINARY = "git.exe" if WINDOWS else "git"
async def _gen_check_output(
cmd: Sequence[str],
async def _gen_check_output(
cmd: Sequence[str],
+ timeout: float = TEN_MINUTES_SECONDS ,
env: Optional[Dict[str, str]] = None,
cwd: Optional[Path] = None,
env: Optional[Dict[str, str]] = None,
cwd: Optional[Path] = None,
+ stdin: Optional[bytes] = None,
) -> Tuple[bytes, bytes]:
process = await asyncio.create_subprocess_exec(
*cmd,
) -> Tuple[bytes, bytes]:
process = await asyncio.create_subprocess_exec(
*cmd,
+ stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env=env,
cwd=cwd,
)
try:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env=env,
cwd=cwd,
)
try:
- (stdout, stderr) = await asyncio.wait_for(process.communicate(), timeout)
+ (stdout, stderr) = await asyncio.wait_for(process.communicate(stdin ), timeout)
except asyncio.TimeoutError:
process.kill()
await process.wait()
except asyncio.TimeoutError:
process.kill()
await process.wait()
failed_pct = round(((results.stats["failed"] / project_count) * 100), 2)
success_pct = round(((results.stats["success"] / project_count) * 100), 2)
failed_pct = round(((results.stats["failed"] / project_count) * 100), 2)
success_pct = round(((results.stats["success"] / project_count) * 100), 2)
+ if results.failed_projects:
+ click.secho("\nFailed projects:\n", bold=True)
+
+ for project_name, project_cpe in results.failed_projects.items():
+ print(f"## {project_name}:")
+ print(f" - Returned {project_cpe.returncode}")
+ if project_cpe.stderr:
+ print(f" - stderr:\n{project_cpe.stderr.decode('utf8')}")
+ if project_cpe.stdout:
+ print(f" - stdout:\n{project_cpe.stdout.decode('utf8')}")
+ print("")
+
click.secho("-- primer results 📊 --\n", bold=True)
click.secho(
f"{results.stats['success']} / {project_count} succeeded ({success_pct}%) ✅",
click.secho("-- primer results 📊 --\n", bold=True)
click.secho(
f"{results.stats['success']} / {project_count} succeeded ({success_pct}%) ✅",
)
if results.failed_projects:
)
if results.failed_projects:
- click.secho("\nFailed projects:\n", bold=True)
-
- for project_name, project_cpe in results.failed_projects.items():
- print(f"## {project_name}:")
- print(f" - Returned {project_cpe.returncode}")
- if project_cpe.stderr:
- print(f" - stderr:\n{project_cpe.stderr.decode('utf8')}")
- if project_cpe.stdout:
- print(f" - stdout:\n{project_cpe.stdout.decode('utf8')}")
- print("")
+ failed = ", ".join(results.failed_projects.keys())
+ click.secho(f"\nFailed projects: {failed}\n", bold=True)
return results.stats["failed"]
return results.stats["failed"]
+def _flatten_cli_args(cli_args: List[Union[Sequence[str], str]]) -> List[str]:
+ """Allow a user to put long arguments into a list of strs
+ to make the JSON human readable"""
+ flat_args = []
+ for arg in cli_args:
+ if isinstance(arg, str):
+ flat_args.append(arg)
+ continue
+
+ args_as_str = "".join(arg)
+ flat_args.append(args_as_str)
+
+ return flat_args
+
+
- repo_path: Path, project_config: Dict[str, Any], results: Results
+ project_name: str,
+ repo_path: Optional[Path],
+ project_config: Dict[str, Any],
+ results: Results,
+ no_diff: bool = False,
) -> None:
"""Run Black and record failures"""
) -> None:
"""Run Black and record failures"""
+ if not repo_path:
+ results.stats["failed"] += 1
+ results.failed_projects[project_name] = CalledProcessError(
+ 69, [], f"{project_name} has no repo_path: {repo_path}".encode(), b""
+ )
+ return
+
+ stdin_test = project_name.upper() == "STDIN"
cmd = [str(which(BLACK_BINARY))]
if "cli_arguments" in project_config and project_config["cli_arguments"]:
cmd = [str(which(BLACK_BINARY))]
if "cli_arguments" in project_config and project_config["cli_arguments"]:
- cmd.extend(*project_config["cli_arguments"])
- cmd.extend(["--check", "--diff", "."])
+ cmd.extend(_flatten_cli_args(project_config["cli_arguments"]))
+ cmd.append("--check")
+ if not no_diff:
+ cmd.append("--diff")
+
+ # Workout if we should read in a python file or search from cwd
+ stdin = None
+ if stdin_test:
+ cmd.append("-")
+ stdin = repo_path.read_bytes()
+ elif "base_path" in project_config:
+ cmd.append(project_config["base_path"])
+ else:
+ cmd.append(".")
+ timeout = (
+ project_config["timeout_seconds"]
+ if "timeout_seconds" in project_config
+ else TEN_MINUTES_SECONDS
+ )
with TemporaryDirectory() as tmp_path:
with TemporaryDirectory() as tmp_path:
- # Prevent reading top-level user configs by manipulating envionment variables
+ # Prevent reading top-level user configs by manipulating envir onment variables
env = {
**os.environ,
"XDG_CONFIG_HOME": tmp_path, # Unix-like
"USERPROFILE": tmp_path, # Windows (changes `Path.home()` output)
}
env = {
**os.environ,
"XDG_CONFIG_HOME": tmp_path, # Unix-like
"USERPROFILE": tmp_path, # Windows (changes `Path.home()` output)
}
+ cwd_path = repo_path.parent if stdin_test else repo_path
- _stdout, _stderr = await _gen_check_output(cmd, cwd=repo_path, env=env)
+ LOG.debug(f"Running black for {project_name}: {' '.join(cmd)}")
+ _stdout, _stderr = await _gen_check_output(
+ cmd, cwd=cwd_path, env=env, stdin=stdin, timeout=timeout
+ )
except asyncio.TimeoutError:
results.stats["failed"] += 1
LOG.error(f"Running black for {repo_path} timed out ({cmd})")
except asyncio.TimeoutError:
results.stats["failed"] += 1
LOG.error(f"Running black for {repo_path} timed out ({cmd})")
def handle_PermissionError(
def handle_PermissionError(
- func: Callable, path: Path, exc: Tuple[Any, Any, Any]
+ func: Callable[..., None] , path: Path, exc: Tuple[Any, Any, Any]
) -> None:
"""
Handle PermissionError during shutil.rmtree.
) -> None:
"""
Handle PermissionError during shutil.rmtree.
async def load_projects_queue(
config_path: Path,
async def load_projects_queue(
config_path: Path,
+ projects_to_run: List[str],
) -> Tuple[Dict[str, Any], asyncio.Queue]:
"""Load project config and fill queue with all the project names"""
with config_path.open("r") as cfp:
config = json.load(cfp)
# TODO: Offer more options here
) -> Tuple[Dict[str, Any], asyncio.Queue]:
"""Load project config and fill queue with all the project names"""
with config_path.open("r") as cfp:
config = json.load(cfp)
# TODO: Offer more options here
- # e.g. Run on X random packages or specific sub list etc.
- project_names = sorted(config["projects"].keys())
- queue: asyncio.Queue = asyncio.Queue(maxsize=len(project_names))
- for project in project_names:
+ # e.g. Run on X random packages etc.
+ queue: asyncio.Queue = asyncio.Queue(maxsize=len(projects_to_run))
+ for project in projects_to_run:
await queue.put(project)
return config, queue
await queue.put(project)
return config, queue
long_checkouts: bool = False,
rebase: bool = False,
keep: bool = False,
long_checkouts: bool = False,
rebase: bool = False,
keep: bool = False,
) -> None:
"""Check out project and run Black on it + record result"""
loop = asyncio.get_event_loop()
) -> None:
"""Check out project and run Black on it + record result"""
loop = asyncio.get_event_loop()
LOG.debug(f"Skipping {project_name} as it's configured as a long checkout")
continue
LOG.debug(f"Skipping {project_name} as it's configured as a long checkout")
continue
- repo_path = await git_checkout_or_rebase(work_path, project_config, rebase)
- if not repo_path:
- continue
- await black_run(repo_path, project_config, results)
+ repo_path: Optional[Path] = Path(__file__)
+ stdin_project = project_name.upper() == "STDIN"
+ if not stdin_project:
+ repo_path = await git_checkout_or_rebase(work_path, project_config, rebase)
+ if not repo_path:
+ continue
+ await black_run(project_name, repo_path, project_config, results, no_diff)
+ if not keep and not stdin_project :
LOG.debug(f"Removing {repo_path}")
rmtree_partial = partial(
rmtree, path=repo_path, onerror=handle_PermissionError
LOG.debug(f"Removing {repo_path}")
rmtree_partial = partial(
rmtree, path=repo_path, onerror=handle_PermissionError
config_file: str,
work_path: Path,
workers: int,
config_file: str,
work_path: Path,
workers: int,
+ projects_to_run: List[str],
keep: bool = False,
long_checkouts: bool = False,
rebase: bool = False,
keep: bool = False,
long_checkouts: bool = False,
rebase: bool = False,
) -> int:
"""
Process the queue with X workers and evaluate results
) -> int:
"""
Process the queue with X workers and evaluate results
results.stats["success"] = 0
results.stats["wrong_py_ver"] = 0
results.stats["success"] = 0
results.stats["wrong_py_ver"] = 0
- config, queue = await load_projects_queue(Path(config_file))
+ config, queue = await load_projects_queue(Path(config_file), projects_to_run )
project_count = queue.qsize()
s = "" if project_count == 1 else "s"
LOG.info(f"{project_count} project{s} to run Black over")
project_count = queue.qsize()
s = "" if project_count == 1 else "s"
LOG.info(f"{project_count} project{s} to run Black over")
await asyncio.gather(
*[
project_runner(
await asyncio.gather(
*[
project_runner(
- i, config, queue, work_path, results, long_checkouts, rebase, keep
+ i,
+ config,
+ queue,
+ work_path,
+ results,
+ long_checkouts,
+ rebase,
+ keep,
+ no_diff,
)
for i in range(workers)
]
)
for i in range(workers)
]