"""Helper script for psf/black's diff-shades Github Actions integration. diff-shades is a tool for analyzing what happens when you run Black on OSS code capturing it for comparisons or other usage. It's used here to help measure the impact of a change *before* landing it (in particular posting a comment on completion for PRs). This script exists as a more maintainable alternative to using inline Javascript in the workflow YAML files. The revision configuration and resolving, caching, and PR comment logic is contained here. For more information, please see the developer docs: https://black.readthedocs.io/en/latest/contributing/gauging_changes.html#diff-shades """ import json import os import platform import pprint import subprocess import sys import zipfile from io import BytesIO from pathlib import Path from typing import Any, Dict, Optional, Tuple import click import urllib3 from packaging.version import Version if sys.version_info >= (3, 8): from typing import Final, Literal else: from typing_extensions import Final, Literal COMMENT_BODY_FILE: Final = ".pr-comment-body.md" DIFF_STEP_NAME: Final = "Generate HTML diff report" DOCS_URL: Final = ( "https://black.readthedocs.io/en/latest/" "contributing/gauging_changes.html#diff-shades" ) USER_AGENT: Final = f"psf/black diff-shades workflow via urllib3/{urllib3.__version__}" SHA_LENGTH: Final = 10 GH_API_TOKEN: Final = os.getenv("GITHUB_TOKEN") REPO: Final = os.getenv("GITHUB_REPOSITORY", default="psf/black") http = urllib3.PoolManager() def set_output(name: str, value: str) -> None: if len(value) < 200: print(f"[INFO]: setting '{name}' to '{value}'") else: print(f"[INFO]: setting '{name}' to [{len(value)} chars]") print(f"::set-output name={name}::{value}") def http_get( url: str, is_json: bool = True, headers: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> Any: headers = headers or {} headers["User-Agent"] = USER_AGENT if "github" in url: if GH_API_TOKEN: headers["Authorization"] = f"token {GH_API_TOKEN}" headers["Accept"] = "application/vnd.github.v3+json" r = http.request("GET", url, headers=headers, **kwargs) if is_json: data = json.loads(r.data.decode("utf-8")) else: data = r.data print(f"[INFO]: issued GET request for {r.geturl()}") if not (200 <= r.status < 300): pprint.pprint(dict(r.info())) pprint.pprint(data) raise RuntimeError(f"unexpected status code: {r.status}") return data def get_branch_or_tag_revision(sha: str = "main") -> str: data = http_get( f"https://api.github.com/repos/{REPO}/commits", fields={"per_page": "1", "sha": sha}, ) assert isinstance(data[0]["sha"], str) return data[0]["sha"] def get_pr_revision(pr: int) -> str: data = http_get(f"https://api.github.com/repos/{REPO}/pulls/{pr}") assert isinstance(data["head"]["sha"], str) return data["head"]["sha"] def get_pypi_version() -> Version: data = http_get("https://pypi.org/pypi/black/json") versions = [Version(v) for v in data["releases"]] sorted_versions = sorted(versions, reverse=True) return sorted_versions[0] def resolve_custom_ref(ref: str) -> Tuple[str, str]: if ref == ".pypi": # Special value to get latest PyPI version. version = str(get_pypi_version()) return version, f"git checkout {version}" if ref.startswith(".") and ref[1:].isnumeric(): # Special format to get a PR. number = int(ref[1:]) revision = get_pr_revision(number) return ( f"pr-{number}-{revision[:SHA_LENGTH]}", f"gh pr checkout {number} && git merge origin/main", ) # Alright, it's probably a branch, tag, or a commit SHA, let's find out! revision = get_branch_or_tag_revision(ref) # We're cutting the revision short as we might be operating on a short commit SHA. if revision == ref or revision[: len(ref)] == ref: # It's *probably* a commit as the resolved SHA isn't different from the REF. return revision[:SHA_LENGTH], f"git checkout {revision}" # It's *probably* a pre-existing branch or tag, yay! return f"{ref}-{revision[:SHA_LENGTH]}", f"git checkout {revision}" @click.group() def main() -> None: pass @main.command("config", help="Acquire run configuration and metadata.") @click.argument( "event", type=click.Choice(["push", "pull_request", "workflow_dispatch"]) ) @click.argument("custom_baseline", required=False) @click.argument("custom_target", required=False) @click.option("--baseline-args", default="") def config( event: Literal["push", "pull_request", "workflow_dispatch"], custom_baseline: Optional[str], custom_target: Optional[str], baseline_args: str, ) -> None: import diff_shades if event == "push": # Push on main, let's use PyPI Black as the baseline. baseline_name = str(get_pypi_version()) baseline_cmd = f"git checkout {baseline_name}" target_rev = os.getenv("GITHUB_SHA") assert target_rev is not None target_name = "main-" + target_rev[:SHA_LENGTH] target_cmd = f"git checkout {target_rev}" elif event == "pull_request": # PR, let's use main as the baseline. baseline_rev = get_branch_or_tag_revision() baseline_name = "main-" + baseline_rev[:SHA_LENGTH] baseline_cmd = f"git checkout {baseline_rev}" pr_ref = os.getenv("GITHUB_REF") assert pr_ref is not None pr_num = int(pr_ref[10:-6]) pr_rev = get_pr_revision(pr_num) target_name = f"pr-{pr_num}-{pr_rev[:SHA_LENGTH]}" target_cmd = f"gh pr checkout {pr_num} && git merge origin/main" # These are only needed for the PR comment. set_output("baseline-sha", baseline_rev) set_output("target-sha", pr_rev) else: assert custom_baseline is not None and custom_target is not None baseline_name, baseline_cmd = resolve_custom_ref(custom_baseline) target_name, target_cmd = resolve_custom_ref(custom_target) if baseline_name == target_name: # Alright we're using the same revisions but we're (hopefully) using # different command line arguments, let's support that too. baseline_name += "-1" target_name += "-2" set_output("baseline-analysis", baseline_name + ".json") set_output("baseline-setup-cmd", baseline_cmd) set_output("target-analysis", target_name + ".json") set_output("target-setup-cmd", target_cmd) key = f"{platform.system()}-{platform.python_version()}-{diff_shades.__version__}" key += f"-{baseline_name}-{baseline_args.encode('utf-8').hex()}" set_output("baseline-cache-key", key) @main.command("comment-body", help="Generate the body for a summary PR comment.") @click.argument("baseline", type=click.Path(exists=True, path_type=Path)) @click.argument("target", type=click.Path(exists=True, path_type=Path)) @click.argument("baseline-sha") @click.argument("target-sha") def comment_body( baseline: Path, target: Path, baseline_sha: str, target_sha: str ) -> None: # fmt: off cmd = [ sys.executable, "-m", "diff_shades", "--no-color", "compare", str(baseline), str(target), "--quiet", "--check" ] # fmt: on proc = subprocess.run(cmd, stdout=subprocess.PIPE, encoding="utf-8") if not proc.returncode: body = ( f"**diff-shades** reports zero changes comparing this PR ({target_sha}) to" f" main ({baseline_sha}).\n\n---\n\n" ) else: body = ( f"**diff-shades** results comparing this PR ({target_sha}) to main" f" ({baseline_sha}). The full diff is [available in the logs]" f'($job-diff-url) under the "{DIFF_STEP_NAME}" step.' ) body += "\n```text\n" + proc.stdout.strip() + "\n```\n" body += ( f"[**What is this?**]({DOCS_URL}) | [Workflow run]($workflow-run-url) |" " [diff-shades documentation](https://github.com/ichard26/diff-shades#readme)" ) print(f"[INFO]: writing half-completed comment body to {COMMENT_BODY_FILE}") with open(COMMENT_BODY_FILE, "w", encoding="utf-8") as f: f.write(body) @main.command("comment-details", help="Get PR comment resources from a workflow run.") @click.argument("run-id") def comment_details(run_id: str) -> None: data = http_get(f"https://api.github.com/repos/{REPO}/actions/runs/{run_id}") if data["event"] != "pull_request": set_output("needs-comment", "false") return set_output("needs-comment", "true") pulls = data["pull_requests"] assert len(pulls) == 1 pr_number = pulls[0]["number"] set_output("pr-number", str(pr_number)) jobs_data = http_get(data["jobs_url"]) assert len(jobs_data["jobs"]) == 1, "multiple jobs not supported nor tested" job = jobs_data["jobs"][0] steps = {s["name"]: s["number"] for s in job["steps"]} diff_step = steps[DIFF_STEP_NAME] diff_url = job["html_url"] + f"#step:{diff_step}:1" artifacts_data = http_get(data["artifacts_url"])["artifacts"] artifacts = {a["name"]: a["archive_download_url"] for a in artifacts_data} body_url = artifacts[COMMENT_BODY_FILE] body_zip = BytesIO(http_get(body_url, is_json=False)) with zipfile.ZipFile(body_zip) as zfile: with zfile.open(COMMENT_BODY_FILE) as rf: body = rf.read().decode("utf-8") # It's more convenient to fill in these fields after the first workflow is done # since this command can access the workflows API (doing it in the main workflow # while it's still in progress seems impossible). body = body.replace("$workflow-run-url", data["html_url"]) body = body.replace("$job-diff-url", diff_url) # # https://github.community/t/set-output-truncates-multiline-strings/16852/3 escaped = body.replace("%", "%25").replace("\n", "%0A").replace("\r", "%0D") set_output("comment-body", escaped) if __name__ == "__main__": main()