]>
git.madduck.net Git - etc/vim.git/blobdiff - src/black_primer/lib.py
madduck's git repository
Every one of the projects in this repository is available at the canonical
URL git://git.madduck.net/madduck/pub/<projectpath> — see
each project's metadata for the exact URL.
All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@ git. madduck. net .
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
SSH access, as well as push access can be individually
arranged .
If you use my repositories frequently, consider adding the following
snippet to ~/.gitconfig and using the third clone URL listed for each
project:
[url "git://git.madduck.net/madduck/"]
insteadOf = madduck:
-# Module '__future__' has no attribute 'annotations'
-from __future__ import annotations # type: ignore
-
import asyncio
import json
import logging
import asyncio
import json
import logging
+from platform import system
from shutil import rmtree, which
from subprocess import CalledProcessError
from sys import version_info
from shutil import rmtree, which
from subprocess import CalledProcessError
from sys import version_info
+WINDOWS = system() == "Windows"
+BLACK_BINARY = "black.exe" if WINDOWS else "black"
+GIT_BIANRY = "git.exe" if WINDOWS else "git"
LOG = logging.getLogger(__name__)
LOG = logging.getLogger(__name__)
+# Windows needs a ProactorEventLoop if you want to exec subprocesses
+# Startng 3.8 this is the default - Can remove when black >= 3.8
+# mypy only respects sys.platform if directly in the evaluation
+# # https://mypy.readthedocs.io/en/latest/common_issues.html#python-version-and-system-platform-checks # noqa: B950
+if sys.platform == "win32":
+ asyncio.set_event_loop(asyncio.ProactorEventLoop())
+
+
class Results(NamedTuple):
stats: Dict[str, int] = {}
failed_projects: Dict[str, CalledProcessError] = {}
class Results(NamedTuple):
stats: Dict[str, int] = {}
failed_projects: Dict[str, CalledProcessError] = {}
-async def analyze_results(project_count: int, results: Results) -> int:
+def analyze_results(project_count: int, results: Results) -> int:
failed_pct = round(((results.stats["failed"] / project_count) * 100), 2)
success_pct = round(((results.stats["success"] / project_count) * 100), 2)
failed_pct = round(((results.stats["failed"] / project_count) * 100), 2)
success_pct = round(((results.stats["success"] / project_count) * 100), 2)
- click.secho(f "-- primer results 📊 --\n", bold=True)
+ click.secho("-- primer results 📊 --\n", bold=True)
click.secho(
f"{results.stats['success']} / {project_count} succeeded ({success_pct}%) ✅",
bold=True,
click.secho(
f"{results.stats['success']} / {project_count} succeeded ({success_pct}%) ✅",
bold=True,
)
if results.failed_projects:
)
if results.failed_projects:
- click.secho(f "\nFailed Projects:\n", bold=True)
+ click.secho("\nFailed Projects:\n", bold=True)
for project_name, project_cpe in results.failed_projects.items():
print(f"## {project_name}:")
for project_name, project_cpe in results.failed_projects.items():
print(f"## {project_name}:")
repo_path: Path, project_config: Dict[str, Any], results: Results
) -> None:
"""Run black and record failures"""
repo_path: Path, project_config: Dict[str, Any], results: Results
) -> None:
"""Run black and record failures"""
- cmd = [str(which("black" ))]
- if project_config["cli_arguments"]:
+ cmd = [str(which(BLACK_BINARY ))]
+ if "cli_arguments" in project_config and project_config["cli_arguments"]:
cmd.extend(*project_config["cli_arguments"])
cmd.extend(["--check", "--diff", "."])
cmd.extend(*project_config["cli_arguments"])
cmd.extend(["--check", "--diff", "."])
results.stats["failed"] += 1
LOG.error(f"Running black for {repo_path} timed out ({cmd})")
except CalledProcessError as cpe:
results.stats["failed"] += 1
LOG.error(f"Running black for {repo_path} timed out ({cmd})")
except CalledProcessError as cpe:
- # TODO: This might need to be tuned and made smarter for higher signal
- if not project_config["expect_formatting_changes"] and cpe.returncode == 1:
- results.stats["failed"] += 1
- results.failed_projects[repo_path.name] = cpe
+ # TODO: Tune for smarter for higher signal
+ # If any other reutrn value than 1 we raise - can disable project in config
+ if cpe.returncode == 1:
+ if not project_config["expect_formatting_changes"]:
+ results.stats["failed"] += 1
+ results.failed_projects[repo_path.name] = cpe
+ else:
+ results.stats["success"] += 1
+ LOG.error(f"Unkown error with {repo_path}")
+ raise
+
+ # If we get here and expect formatting changes something is up
+ if project_config["expect_formatting_changes"]:
+ results.stats["failed"] += 1
+ results.failed_projects[repo_path.name] = CalledProcessError(
+ 0, cmd, b"Expected formatting changes but didn't get any!", b""
+ )
+ return
+
results.stats["success"] += 1
results.stats["success"] += 1
depth: int = 1,
) -> Optional[Path]:
"""git Clone project or rebase"""
depth: int = 1,
) -> Optional[Path]:
"""git Clone project or rebase"""
- git_bin = str(which("git" ))
+ git_bin = str(which(GIT_BIANRY ))
- LOG.error(f "No git binary found")
+ LOG.error("No git binary found")
return None
repo_url_parts = urlparse(project_config["git_clone_url"])
return None
repo_url_parts = urlparse(project_config["git_clone_url"])
async def load_projects_queue(
config_path: Path,
async def load_projects_queue(
config_path: Path,
-) -> Tuple[Dict[str, Any], asyncio.Queue[str] ]:
+) -> Tuple[Dict[str, Any], asyncio.Queue]:
"""Load project config and fill queue with all the project names"""
with config_path.open("r") as cfp:
config = json.load(cfp)
"""Load project config and fill queue with all the project names"""
with config_path.open("r") as cfp:
config = json.load(cfp)
# TODO: Offer more options here
# e.g. Run on X random packages or specific sub list etc.
project_names = sorted(config["projects"].keys())
# TODO: Offer more options here
# e.g. Run on X random packages or specific sub list etc.
project_names = sorted(config["projects"].keys())
- queue: asyncio.Queue[str] = asyncio.Queue(maxsize=len(project_names))
+ queue: asyncio.Queue = asyncio.Queue(maxsize=len(project_names))
for project in project_names:
await queue.put(project)
for project in project_names:
await queue.put(project)
async def project_runner(
idx: int,
config: Dict[str, Any],
async def project_runner(
idx: int,
config: Dict[str, Any],
- queue: asyncio.Queue[str] ,
work_path: Path,
results: Results,
long_checkouts: bool = False,
work_path: Path,
results: Results,
long_checkouts: bool = False,
config, queue = await load_projects_queue(Path(config_file))
project_count = queue.qsize()
LOG.info(f"{project_count} projects to run black over")
config, queue = await load_projects_queue(Path(config_file))
project_count = queue.qsize()
LOG.info(f"{project_count} projects to run black over")
return -1
LOG.debug(f"Using {workers} parallel workers to run black")
return -1
LOG.debug(f"Using {workers} parallel workers to run black")
)
LOG.info("Analyzing results")
)
LOG.info("Analyzing results")
- return await analyze_results(project_count, results)
+ return analyze_results(project_count, results)
+
+
+if __name__ == "__main__": # pragma: nocover
+ raise NotImplementedError("lib is a library, funnily enough.")