- On Python 3.11 and newer, use the standard library's `tomllib` instead of `tomli`
(#2903)
+- `black-primer`, the deprecated internal devtool, has been removed and copied to a
+ [separate repository](https://github.com/cooperlees/black-primer) (#2924)
### Parser
modification causes before submitting a PR. Think about if the change seems disruptive
enough to cause frustration to projects that are already "black formatted".
-## black-primer
-
-`black-primer` is an obsolete tool (now replaced with `diff-shades`) that was used to
-gauge the impact of changes in _Black_ on open-source code. It is no longer used
-internally and will be removed from the _Black_ repository in the future.
-
## diff-shades
diff-shades is a tool that runs _Black_ across a list of Git cloneable OSS projects
# The following is because of `patch_click()`. Remove when
# we drop Python 3.6 support.
warn_unused_ignores=False
-
-[mypy-black_primer.*]
-# Until we're not supporting 3.6 primer needs this
-disallow_any_generics=False
-
-[mypy-tests.test_primer]
-# Until we're not supporting 3.6 primer needs this
-disallow_any_generics=False
"black/__main__.py",
]
discovered = []
- # black-primer and blackd have no good reason to be compiled.
+ # There's no good reason for blackd to be compiled.
discovered.extend(find_python_files(src / "black"))
discovered.extend(find_python_files(src / "blib2to3"))
mypyc_targets = [
package_data={
"blib2to3": ["*.txt"],
"black": ["py.typed"],
- "black_primer": ["primer.json"],
},
python_requires=">=3.6.2",
zip_safe=False,
"console_scripts": [
"black=black:patched_main",
"blackd=blackd:patched_main [d]",
- "black-primer=black_primer.cli:main",
]
},
)
+++ /dev/null
-# coding=utf8
-
-import asyncio
-import json
-import logging
-import sys
-from datetime import datetime
-from pathlib import Path
-from shutil import rmtree, which
-from tempfile import gettempdir
-from typing import Any, List, Optional, Union
-
-import click
-
-from black_primer import lib
-
-# If our environment has uvloop installed lets use it
-try:
- import uvloop
-
- uvloop.install()
-except ImportError:
- pass
-
-
-DEFAULT_CONFIG = Path(__file__).parent / "primer.json"
-_timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
-DEFAULT_WORKDIR = Path(gettempdir()) / f"primer.{_timestamp}"
-LOG = logging.getLogger(__name__)
-
-
-def _handle_debug(
- ctx: Optional[click.core.Context],
- param: Optional[Union[click.core.Option, click.core.Parameter]],
- debug: Union[bool, int, str],
-) -> Union[bool, int, str]:
- """Turn on debugging if asked otherwise INFO default"""
- log_level = logging.DEBUG if debug else logging.INFO
- logging.basicConfig(
- format="[%(asctime)s] %(levelname)s: %(message)s (%(filename)s:%(lineno)d)",
- level=log_level,
- )
- return debug
-
-
-def load_projects(config_path: Path) -> List[str]:
- with open(config_path) as config:
- return sorted(json.load(config)["projects"].keys())
-
-
-# Unfortunately does import time file IO - but appears to be the only
-# way to get `black-primer --help` to show projects list
-DEFAULT_PROJECTS = load_projects(DEFAULT_CONFIG)
-
-
-def _projects_callback(
- ctx: click.core.Context,
- param: Optional[Union[click.core.Option, click.core.Parameter]],
- projects: str,
-) -> List[str]:
- requested_projects = set(projects.split(","))
- available_projects = set(
- DEFAULT_PROJECTS
- if str(DEFAULT_CONFIG) == ctx.params["config"]
- else load_projects(ctx.params["config"])
- )
-
- unavailable = requested_projects - available_projects
- if unavailable:
- LOG.error(f"Projects not found: {unavailable}. Available: {available_projects}")
-
- return sorted(requested_projects & available_projects)
-
-
-async def async_main(
- config: str,
- debug: bool,
- keep: bool,
- long_checkouts: bool,
- no_diff: bool,
- projects: List[str],
- rebase: bool,
- workdir: str,
- workers: int,
-) -> int:
- work_path = Path(workdir)
- if not work_path.exists():
- LOG.debug(f"Creating {work_path}")
- work_path.mkdir()
-
- if not which("black"):
- LOG.error("Can not find 'black' executable in PATH. No point in running")
- return -1
-
- try:
- ret_val = await lib.process_queue(
- config,
- work_path,
- workers,
- projects,
- keep,
- long_checkouts,
- rebase,
- no_diff,
- )
- return int(ret_val)
-
- finally:
- if not keep and work_path.exists():
- LOG.debug(f"Removing {work_path}")
- rmtree(work_path, onerror=lib.handle_PermissionError)
-
-
-@click.command(context_settings={"help_option_names": ["-h", "--help"]})
-@click.option(
- "-c",
- "--config",
- default=str(DEFAULT_CONFIG),
- type=click.Path(exists=True),
- show_default=True,
- help="JSON config file path",
- # Eager - because config path is used by other callback options
- is_eager=True,
-)
-@click.option(
- "--debug",
- is_flag=True,
- callback=_handle_debug,
- show_default=True,
- help="Turn on debug logging",
-)
-@click.option(
- "-k",
- "--keep",
- is_flag=True,
- show_default=True,
- help="Keep workdir + repos post run",
-)
-@click.option(
- "-L",
- "--long-checkouts",
- is_flag=True,
- show_default=True,
- help="Pull big projects to test",
-)
-@click.option(
- "--no-diff",
- is_flag=True,
- show_default=True,
- help="Disable showing source file changes in black output",
-)
-@click.option(
- "--projects",
- default=",".join(DEFAULT_PROJECTS),
- callback=_projects_callback,
- show_default=True,
- help="Comma separated list of projects to run",
-)
-@click.option(
- "-R",
- "--rebase",
- is_flag=True,
- show_default=True,
- help="Rebase project if already checked out",
-)
-@click.option(
- "-w",
- "--workdir",
- default=str(DEFAULT_WORKDIR),
- type=click.Path(exists=False),
- show_default=True,
- help="Directory path for repo checkouts",
-)
-@click.option(
- "-W",
- "--workers",
- default=2,
- type=int,
- show_default=True,
- help="Number of parallel worker coroutines",
-)
-@click.pass_context
-def main(ctx: click.core.Context, **kwargs: Any) -> None:
- """primer - prime projects for blackening... 🏴"""
- LOG.debug(f"Starting {sys.argv[0]}")
- # TODO: Change to asyncio.run when Black >= 3.7 only
- loop = asyncio.get_event_loop()
- try:
- ctx.exit(loop.run_until_complete(async_main(**kwargs)))
- finally:
- loop.close()
-
-
-if __name__ == "__main__": # pragma: nocover
- main()
+++ /dev/null
-import asyncio
-import errno
-import json
-import logging
-import os
-import stat
-import sys
-from functools import partial
-from pathlib import Path
-from platform import system
-from shutil import rmtree, which
-from subprocess import CalledProcessError
-from sys import version_info
-from tempfile import TemporaryDirectory
-from typing import (
- Any,
- Callable,
- Dict,
- List,
- NamedTuple,
- Optional,
- Sequence,
- Tuple,
- Union,
-)
-from urllib.parse import urlparse
-
-import click
-
-
-TEN_MINUTES_SECONDS = 600
-WINDOWS = system() == "Windows"
-BLACK_BINARY = "black.exe" if WINDOWS else "black"
-GIT_BINARY = "git.exe" if WINDOWS else "git"
-LOG = logging.getLogger(__name__)
-
-
-# Windows needs a ProactorEventLoop if you want to exec subprocesses
-# Starting with 3.8 this is the default - can remove when Black >= 3.8
-# mypy only respects sys.platform if directly in the evaluation
-# https://mypy.readthedocs.io/en/latest/common_issues.html#python-version-and-system-platform-checks # noqa: B950
-if sys.platform == "win32":
- asyncio.set_event_loop(asyncio.ProactorEventLoop())
-
-
-class Results(NamedTuple):
- stats: Dict[str, int] = {}
- failed_projects: Dict[str, CalledProcessError] = {}
-
-
-async def _gen_check_output(
- cmd: Sequence[str],
- timeout: float = TEN_MINUTES_SECONDS,
- env: Optional[Dict[str, str]] = None,
- cwd: Optional[Path] = None,
- stdin: Optional[bytes] = None,
-) -> Tuple[bytes, bytes]:
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdin=asyncio.subprocess.PIPE,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.STDOUT,
- env=env,
- cwd=cwd,
- )
- try:
- (stdout, stderr) = await asyncio.wait_for(process.communicate(stdin), timeout)
- except asyncio.TimeoutError:
- process.kill()
- await process.wait()
- raise
-
- # A non-optional timeout was supplied to asyncio.wait_for, guaranteeing
- # a timeout or completed process. A terminated Python process will have a
- # non-empty returncode value.
- assert process.returncode is not None
-
- if process.returncode != 0:
- cmd_str = " ".join(cmd)
- raise CalledProcessError(
- process.returncode, cmd_str, output=stdout, stderr=stderr
- )
-
- return (stdout, stderr)
-
-
-def analyze_results(project_count: int, results: Results) -> int:
- failed_pct = round(((results.stats["failed"] / project_count) * 100), 2)
- success_pct = round(((results.stats["success"] / project_count) * 100), 2)
-
- if results.failed_projects:
- click.secho("\nFailed projects:\n", bold=True)
-
- for project_name, project_cpe in results.failed_projects.items():
- print(f"## {project_name}:")
- print(f" - Returned {project_cpe.returncode}")
- if project_cpe.stderr:
- print(f" - stderr:\n{project_cpe.stderr.decode('utf8')}")
- if project_cpe.stdout:
- print(f" - stdout:\n{project_cpe.stdout.decode('utf8')}")
- print("")
-
- click.secho("-- primer results 📊 --\n", bold=True)
- click.secho(
- f"{results.stats['success']} / {project_count} succeeded ({success_pct}%) ✅",
- bold=True,
- fg="green",
- )
- click.secho(
- f"{results.stats['failed']} / {project_count} FAILED ({failed_pct}%) 💩",
- bold=bool(results.stats["failed"]),
- fg="red",
- )
- s = "" if results.stats["disabled"] == 1 else "s"
- click.echo(f" - {results.stats['disabled']} project{s} disabled by config")
- s = "" if results.stats["wrong_py_ver"] == 1 else "s"
- click.echo(
- f" - {results.stats['wrong_py_ver']} project{s} skipped due to Python version"
- )
- click.echo(
- f" - {results.stats['skipped_long_checkout']} skipped due to long checkout"
- )
-
- if results.failed_projects:
- failed = ", ".join(results.failed_projects.keys())
- click.secho(f"\nFailed projects: {failed}\n", bold=True)
-
- return results.stats["failed"]
-
-
-def _flatten_cli_args(cli_args: List[Union[Sequence[str], str]]) -> List[str]:
- """Allow a user to put long arguments into a list of strs
- to make the JSON human readable"""
- flat_args = []
- for arg in cli_args:
- if isinstance(arg, str):
- flat_args.append(arg)
- continue
-
- args_as_str = "".join(arg)
- flat_args.append(args_as_str)
-
- return flat_args
-
-
-async def black_run(
- project_name: str,
- repo_path: Optional[Path],
- project_config: Dict[str, Any],
- results: Results,
- no_diff: bool = False,
-) -> None:
- """Run Black and record failures"""
- if not repo_path:
- results.stats["failed"] += 1
- results.failed_projects[project_name] = CalledProcessError(
- 69, [], f"{project_name} has no repo_path: {repo_path}".encode(), b""
- )
- return
-
- stdin_test = project_name.upper() == "STDIN"
- cmd = [str(which(BLACK_BINARY))]
- if "cli_arguments" in project_config and project_config["cli_arguments"]:
- cmd.extend(_flatten_cli_args(project_config["cli_arguments"]))
- cmd.append("--check")
- if not no_diff:
- cmd.append("--diff")
-
- # Workout if we should read in a python file or search from cwd
- stdin = None
- if stdin_test:
- cmd.append("-")
- stdin = repo_path.read_bytes()
- elif "base_path" in project_config:
- cmd.append(project_config["base_path"])
- else:
- cmd.append(".")
-
- timeout = (
- project_config["timeout_seconds"]
- if "timeout_seconds" in project_config
- else TEN_MINUTES_SECONDS
- )
- with TemporaryDirectory() as tmp_path:
- # Prevent reading top-level user configs by manipulating environment variables
- env = {
- **os.environ,
- "XDG_CONFIG_HOME": tmp_path, # Unix-like
- "USERPROFILE": tmp_path, # Windows (changes `Path.home()` output)
- }
-
- cwd_path = repo_path.parent if stdin_test else repo_path
- try:
- LOG.debug(f"Running black for {project_name}: {' '.join(cmd)}")
- _stdout, _stderr = await _gen_check_output(
- cmd, cwd=cwd_path, env=env, stdin=stdin, timeout=timeout
- )
- except asyncio.TimeoutError:
- results.stats["failed"] += 1
- LOG.error(f"Running black for {repo_path} timed out ({cmd})")
- except CalledProcessError as cpe:
- # TODO: Tune for smarter for higher signal
- # If any other return value than 1 we raise - can disable project in config
- if cpe.returncode == 1:
- if not project_config["expect_formatting_changes"]:
- results.stats["failed"] += 1
- results.failed_projects[repo_path.name] = cpe
- else:
- results.stats["success"] += 1
- return
- elif cpe.returncode > 1:
- results.stats["failed"] += 1
- results.failed_projects[repo_path.name] = cpe
- return
-
- LOG.error(f"Unknown error with {repo_path}")
- raise
-
- # If we get here and expect formatting changes something is up
- if project_config["expect_formatting_changes"]:
- results.stats["failed"] += 1
- results.failed_projects[repo_path.name] = CalledProcessError(
- 0, cmd, b"Expected formatting changes but didn't get any!", b""
- )
- return
-
- results.stats["success"] += 1
-
-
-async def git_checkout_or_rebase(
- work_path: Path,
- project_config: Dict[str, Any],
- rebase: bool = False,
- *,
- depth: int = 1,
-) -> Optional[Path]:
- """git Clone project or rebase"""
- git_bin = str(which(GIT_BINARY))
- if not git_bin:
- LOG.error("No git binary found")
- return None
-
- repo_url_parts = urlparse(project_config["git_clone_url"])
- path_parts = repo_url_parts.path[1:].split("/", maxsplit=1)
-
- repo_path: Path = work_path / path_parts[1].replace(".git", "")
- cmd = [git_bin, "clone", "--depth", str(depth), project_config["git_clone_url"]]
- cwd = work_path
- if repo_path.exists() and rebase:
- cmd = [git_bin, "pull", "--rebase"]
- cwd = repo_path
- elif repo_path.exists():
- return repo_path
-
- try:
- _stdout, _stderr = await _gen_check_output(cmd, cwd=cwd)
- except (asyncio.TimeoutError, CalledProcessError) as e:
- LOG.error(f"Unable to git clone / pull {project_config['git_clone_url']}: {e}")
- return None
-
- return repo_path
-
-
-def handle_PermissionError(
- func: Callable[..., None], path: Path, exc: Tuple[Any, Any, Any]
-) -> None:
- """
- Handle PermissionError during shutil.rmtree.
-
- This checks if the erroring function is either 'os.rmdir' or 'os.unlink', and that
- the error was EACCES (i.e. Permission denied). If true, the path is set writable,
- readable, and executable by everyone. Finally, it tries the error causing delete
- operation again.
-
- If the check is false, then the original error will be reraised as this function
- can't handle it.
- """
- excvalue = exc[1]
- LOG.debug(f"Handling {excvalue} from {func.__name__}... ")
- if func in (os.rmdir, os.unlink) and excvalue.errno == errno.EACCES:
- LOG.debug(f"Setting {path} writable, readable, and executable by everyone... ")
- os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # chmod 0777
- func(path) # Try the error causing delete operation again
- else:
- raise
-
-
-async def load_projects_queue(
- config_path: Path,
- projects_to_run: List[str],
-) -> Tuple[Dict[str, Any], asyncio.Queue]:
- """Load project config and fill queue with all the project names"""
- with config_path.open("r") as cfp:
- config = json.load(cfp)
-
- # TODO: Offer more options here
- # e.g. Run on X random packages etc.
- queue: asyncio.Queue = asyncio.Queue(maxsize=len(projects_to_run))
- for project in projects_to_run:
- await queue.put(project)
-
- return config, queue
-
-
-async def project_runner(
- idx: int,
- config: Dict[str, Any],
- queue: asyncio.Queue,
- work_path: Path,
- results: Results,
- long_checkouts: bool = False,
- rebase: bool = False,
- keep: bool = False,
- no_diff: bool = False,
-) -> None:
- """Check out project and run Black on it + record result"""
- loop = asyncio.get_event_loop()
- py_version = f"{version_info[0]}.{version_info[1]}"
- while True:
- try:
- project_name = queue.get_nowait()
- except asyncio.QueueEmpty:
- LOG.debug(f"project_runner {idx} exiting")
- return
- LOG.debug(f"worker {idx} working on {project_name}")
-
- project_config = config["projects"][project_name]
-
- # Check if disabled by config
- if "disabled" in project_config and project_config["disabled"]:
- results.stats["disabled"] += 1
- LOG.info(f"Skipping {project_name} as it's disabled via config")
- continue
-
- # Check if we should run on this version of Python
- if (
- "all" not in project_config["py_versions"]
- and py_version not in project_config["py_versions"]
- ):
- results.stats["wrong_py_ver"] += 1
- LOG.debug(f"Skipping {project_name} as it's not enabled for {py_version}")
- continue
-
- # Check if we're doing big projects / long checkouts
- if not long_checkouts and project_config["long_checkout"]:
- results.stats["skipped_long_checkout"] += 1
- LOG.debug(f"Skipping {project_name} as it's configured as a long checkout")
- continue
-
- repo_path: Optional[Path] = Path(__file__)
- stdin_project = project_name.upper() == "STDIN"
- if not stdin_project:
- repo_path = await git_checkout_or_rebase(work_path, project_config, rebase)
- if not repo_path:
- continue
- await black_run(project_name, repo_path, project_config, results, no_diff)
-
- if not keep and not stdin_project:
- LOG.debug(f"Removing {repo_path}")
- rmtree_partial = partial(
- rmtree, path=repo_path, onerror=handle_PermissionError
- )
- await loop.run_in_executor(None, rmtree_partial)
-
- LOG.info(f"Finished {project_name}")
-
-
-async def process_queue(
- config_file: str,
- work_path: Path,
- workers: int,
- projects_to_run: List[str],
- keep: bool = False,
- long_checkouts: bool = False,
- rebase: bool = False,
- no_diff: bool = False,
-) -> int:
- """
- Process the queue with X workers and evaluate results
- - Success is guaged via the config "expect_formatting_changes"
-
- Integer return equals the number of failed projects
- """
- results = Results()
- results.stats["disabled"] = 0
- results.stats["failed"] = 0
- results.stats["skipped_long_checkout"] = 0
- results.stats["success"] = 0
- results.stats["wrong_py_ver"] = 0
-
- config, queue = await load_projects_queue(Path(config_file), projects_to_run)
- project_count = queue.qsize()
- s = "" if project_count == 1 else "s"
- LOG.info(f"{project_count} project{s} to run Black over")
- if project_count < 1:
- return -1
-
- s = "" if workers == 1 else "s"
- LOG.debug(f"Using {workers} parallel worker{s} to run Black")
- # Wait until we finish running all the projects before analyzing
- await asyncio.gather(
- *[
- project_runner(
- i,
- config,
- queue,
- work_path,
- results,
- long_checkouts,
- rebase,
- keep,
- no_diff,
- )
- for i in range(workers)
- ]
- )
-
- LOG.info("Analyzing results")
- return analyze_results(project_count, results)
-
-
-if __name__ == "__main__": # pragma: nocover
- raise NotImplementedError("lib is a library, funnily enough.")
+++ /dev/null
-{
- "configuration_format_version": 20210815,
- "projects": {
- "STDIN": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": false,
- "git_clone_url": "",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "aioexabgp": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": false,
- "git_clone_url": "https://github.com/cooperlees/aioexabgp.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "attrs": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/python-attrs/attrs.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "bandersnatch": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/pypa/bandersnatch.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "channels": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/django/channels.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "cpython": {
- "disabled": true,
- "disabled_reason": "To big / slow for GitHub Actions but handy to keep config to use manually or in some other CI in the future",
- "base_path": "Lib",
- "cli_arguments": [
- "--experimental-string-processing",
- "--extend-exclude",
- [
- "Lib/lib2to3/tests/data/different_encoding.py",
- "|Lib/lib2to3/tests/data/false_encoding.py",
- "|Lib/lib2to3/tests/data/py2_test_grammar.py",
- "|Lib/test/bad_coding.py",
- "|Lib/test/bad_coding2.py",
- "|Lib/test/badsyntax_3131.py",
- "|Lib/test/badsyntax_pep3120.py",
- "|Lib/test/test_base64.py",
- "|Lib/test/test_exceptions.py",
- "|Lib/test/test_grammar.py",
- "|Lib/test/test_named_expressions.py",
- "|Lib/test/test_patma.py",
- "|Lib/test/test_tokenize.py",
- "|Lib/test/test_xml_etree.py",
- "|Lib/traceback.py"
- ]
- ],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/python/cpython.git",
- "long_checkout": false,
- "py_versions": ["3.9", "3.10"],
- "timeout_seconds": 900
- },
- "django": {
- "cli_arguments": [
- "--experimental-string-processing",
- "--skip-string-normalization",
- "--extend-exclude",
- "/((docs|scripts)/|django/forms/models.py|tests/gis_tests/test_spatialrefsys.py|tests/test_runner_apps/tagged/tests_syntax_error.py)"
- ],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/django/django.git",
- "long_checkout": false,
- "py_versions": ["3.8", "3.9", "3.10"]
- },
- "flake8-bugbear": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/PyCQA/flake8-bugbear.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "hypothesis": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/HypothesisWorks/hypothesis.git",
- "long_checkout": false,
- "py_versions": ["3.8", "3.9", "3.10"]
- },
- "pandas": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/pandas-dev/pandas.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "pillow": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/python-pillow/Pillow.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "poetry": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": false,
- "git_clone_url": "https://github.com/python-poetry/poetry.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "pyanalyze": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": false,
- "git_clone_url": "https://github.com/quora/pyanalyze.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "pyramid": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/Pylons/pyramid.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "ptr": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": false,
- "git_clone_url": "https://github.com/facebookincubator/ptr.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "pytest": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/pytest-dev/pytest.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "scikit-lego": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/koaning/scikit-lego",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "tox": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/tox-dev/tox.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "typeshed": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/python/typeshed.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "virtualenv": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/pypa/virtualenv.git",
- "long_checkout": false,
- "py_versions": ["all"]
- },
- "warehouse": {
- "cli_arguments": ["--experimental-string-processing"],
- "expect_formatting_changes": true,
- "git_clone_url": "https://github.com/pypa/warehouse.git",
- "long_checkout": false,
- "py_versions": ["all"]
- }
- }
-}
"src/black/strings.py",
"src/black/trans.py",
"src/blackd/__init__.py",
- "src/black_primer/cli.py",
- "src/black_primer/lib.py",
"src/blib2to3/pygram.py",
"src/blib2to3/pytree.py",
"src/blib2to3/pgen2/conv.py",
"tests/test_black.py",
"tests/test_blackd.py",
"tests/test_format.py",
- "tests/test_primer.py",
"tests/optional.py",
"tests/util.py",
"tests/conftest.py",
+++ /dev/null
-#!/usr/bin/env python3
-
-import asyncio
-import sys
-import unittest
-from contextlib import contextmanager
-from copy import deepcopy
-from io import StringIO
-from os import getpid
-from pathlib import Path
-from platform import system
-from pytest import LogCaptureFixture
-from subprocess import CalledProcessError
-from tempfile import TemporaryDirectory, gettempdir
-from typing import Any, Callable, Iterator, List, Tuple, TypeVar
-from unittest.mock import Mock, patch
-
-from click.testing import CliRunner
-
-from black_primer import cli, lib
-
-
-EXPECTED_ANALYSIS_OUTPUT = """\
-
-Failed projects:
-
-## black:
- - Returned 69
- - stdout:
-Black didn't work
-
--- primer results 📊 --
-
-68 / 69 succeeded (98.55%) ✅
-1 / 69 FAILED (1.45%) 💩
- - 0 projects disabled by config
- - 0 projects skipped due to Python version
- - 0 skipped due to long checkout
-
-Failed projects: black
-
-"""
-FAKE_PROJECT_CONFIG = {
- "cli_arguments": ["--unittest"],
- "expect_formatting_changes": False,
- "git_clone_url": "https://github.com/psf/black.git",
-}
-
-
-@contextmanager
-def capture_stdout(
- command: Callable[..., Any], *args: Any, **kwargs: Any
-) -> Iterator[str]:
- old_stdout, sys.stdout = sys.stdout, StringIO()
- try:
- command(*args, **kwargs)
- sys.stdout.seek(0)
- yield sys.stdout.read()
- finally:
- sys.stdout = old_stdout
-
-
-@contextmanager
-def event_loop() -> Iterator[None]:
- policy = asyncio.get_event_loop_policy()
- loop = policy.new_event_loop()
- asyncio.set_event_loop(loop)
- if sys.platform == "win32":
- asyncio.set_event_loop(asyncio.ProactorEventLoop())
- try:
- yield
- finally:
- loop.close()
-
-
-async def raise_subprocess_error_1(*args: Any, **kwargs: Any) -> None:
- raise CalledProcessError(1, ["unittest", "error"], b"", b"")
-
-
-async def raise_subprocess_error_123(*args: Any, **kwargs: Any) -> None:
- raise CalledProcessError(123, ["unittest", "error"], b"", b"")
-
-
-async def return_false(*args: Any, **kwargs: Any) -> bool:
- return False
-
-
-async def return_subproccess_output(*args: Any, **kwargs: Any) -> Tuple[bytes, bytes]:
- return (b"stdout", b"stderr")
-
-
-async def return_zero(*args: Any, **kwargs: Any) -> int:
- return 0
-
-
-if sys.version_info >= (3, 9):
- T = TypeVar("T")
- Q = asyncio.Queue[T]
-else:
- T = Any
- Q = asyncio.Queue
-
-
-def collect(queue: Q) -> List[T]:
- ret = []
- while True:
- try:
- item = queue.get_nowait()
- ret.append(item)
- except asyncio.QueueEmpty:
- return ret
-
-
-class PrimerLibTests(unittest.TestCase):
- def test_analyze_results(self) -> None:
- fake_results = lib.Results(
- {
- "disabled": 0,
- "failed": 1,
- "skipped_long_checkout": 0,
- "success": 68,
- "wrong_py_ver": 0,
- },
- {"black": CalledProcessError(69, ["black"], b"Black didn't work", b"")},
- )
- with capture_stdout(lib.analyze_results, 69, fake_results) as analyze_stdout:
- self.assertEqual(EXPECTED_ANALYSIS_OUTPUT, analyze_stdout)
-
- @event_loop()
- def test_black_run(self) -> None:
- """Pretend to run Black to ensure we cater for all scenarios"""
- loop = asyncio.get_event_loop()
- project_name = "unittest"
- repo_path = Path(gettempdir())
- project_config = deepcopy(FAKE_PROJECT_CONFIG)
- results = lib.Results({"failed": 0, "success": 0}, {})
-
- # Test a successful Black run
- with patch("black_primer.lib._gen_check_output", return_subproccess_output):
- loop.run_until_complete(
- lib.black_run(project_name, repo_path, project_config, results)
- )
- self.assertEqual(1, results.stats["success"])
- self.assertFalse(results.failed_projects)
-
- # Test a fail based on expecting formatting changes but not getting any
- project_config["expect_formatting_changes"] = True
- results = lib.Results({"failed": 0, "success": 0}, {})
- with patch("black_primer.lib._gen_check_output", return_subproccess_output):
- loop.run_until_complete(
- lib.black_run(project_name, repo_path, project_config, results)
- )
- self.assertEqual(1, results.stats["failed"])
- self.assertTrue(results.failed_projects)
-
- # Test a fail based on returning 1 and not expecting formatting changes
- project_config["expect_formatting_changes"] = False
- results = lib.Results({"failed": 0, "success": 0}, {})
- with patch("black_primer.lib._gen_check_output", raise_subprocess_error_1):
- loop.run_until_complete(
- lib.black_run(project_name, repo_path, project_config, results)
- )
- self.assertEqual(1, results.stats["failed"])
- self.assertTrue(results.failed_projects)
-
- # Test a formatting error based on returning 123
- with patch("black_primer.lib._gen_check_output", raise_subprocess_error_123):
- loop.run_until_complete(
- lib.black_run(project_name, repo_path, project_config, results)
- )
- self.assertEqual(2, results.stats["failed"])
-
- def test_flatten_cli_args(self) -> None:
- fake_long_args = ["--arg", ["really/", "|long", "|regex", "|splitup"], "--done"]
- expected = ["--arg", "really/|long|regex|splitup", "--done"]
- self.assertEqual(expected, lib._flatten_cli_args(fake_long_args))
-
- @event_loop()
- def test_gen_check_output(self) -> None:
- loop = asyncio.get_event_loop()
- stdout, stderr = loop.run_until_complete(
- lib._gen_check_output([lib.BLACK_BINARY, "--help"])
- )
- self.assertIn("The uncompromising code formatter", stdout.decode("utf8"))
- self.assertEqual(None, stderr)
-
- # TODO: Add a test to see failure works on Windows
- if lib.WINDOWS:
- return
-
- false_bin = "/usr/bin/false" if system() == "Darwin" else "/bin/false"
- with self.assertRaises(CalledProcessError):
- loop.run_until_complete(lib._gen_check_output([false_bin]))
-
- with self.assertRaises(asyncio.TimeoutError):
- loop.run_until_complete(
- lib._gen_check_output(["/bin/sleep", "2"], timeout=0.1)
- )
-
- @event_loop()
- def test_git_checkout_or_rebase(self) -> None:
- loop = asyncio.get_event_loop()
- project_config = deepcopy(FAKE_PROJECT_CONFIG)
- work_path = Path(gettempdir())
-
- expected_repo_path = work_path / "black"
- with patch("black_primer.lib._gen_check_output", return_subproccess_output):
- returned_repo_path = loop.run_until_complete(
- lib.git_checkout_or_rebase(work_path, project_config)
- )
- self.assertEqual(expected_repo_path, returned_repo_path)
-
- @patch("sys.stdout", new_callable=StringIO)
- @event_loop()
- def test_process_queue(self, mock_stdout: Mock) -> None:
- """Test the process queue on primer itself
- - If you have non black conforming formatting in primer itself this can fail"""
- loop = asyncio.get_event_loop()
- config_path = Path(lib.__file__).parent / "primer.json"
- with patch("black_primer.lib.git_checkout_or_rebase", return_false):
- with TemporaryDirectory() as td:
- return_val = loop.run_until_complete(
- lib.process_queue(
- str(config_path), Path(td), 2, ["django", "pyramid"]
- )
- )
- self.assertEqual(0, return_val)
-
- @event_loop()
- def test_load_projects_queue(self) -> None:
- """Test the process queue on primer itself
- - If you have non black conforming formatting in primer itself this can fail"""
- loop = asyncio.get_event_loop()
- config_path = Path(lib.__file__).parent / "primer.json"
-
- config, projects_queue = loop.run_until_complete(
- lib.load_projects_queue(config_path, ["django", "pyramid"])
- )
- projects = collect(projects_queue)
- self.assertEqual(projects, ["django", "pyramid"])
-
-
-class PrimerCLITests(unittest.TestCase):
- @event_loop()
- def test_async_main(self) -> None:
- loop = asyncio.get_event_loop()
- work_dir = Path(gettempdir()) / f"primer_ut_{getpid()}"
- args = {
- "config": "/config",
- "debug": False,
- "keep": False,
- "long_checkouts": False,
- "rebase": False,
- "workdir": str(work_dir),
- "workers": 69,
- "no_diff": False,
- "projects": "",
- }
- with patch("black_primer.cli.lib.process_queue", return_zero):
- return_val = loop.run_until_complete(cli.async_main(**args)) # type: ignore
- self.assertEqual(0, return_val)
-
- def test_handle_debug(self) -> None:
- self.assertTrue(cli._handle_debug(None, None, True))
-
- def test_help_output(self) -> None:
- runner = CliRunner()
- result = runner.invoke(cli.main, ["--help"])
- self.assertEqual(result.exit_code, 0)
-
-
-def test_projects(caplog: LogCaptureFixture) -> None:
- with event_loop():
- runner = CliRunner()
- result = runner.invoke(cli.main, ["--projects=STDIN,asdf"])
- assert result.exit_code == 0
- assert "1 / 1 succeeded" in result.output
- assert "Projects not found: {'asdf'}" in caplog.text
-
- caplog.clear()
-
- with event_loop():
- runner = CliRunner()
- result = runner.invoke(cli.main, ["--projects=fdsa,STDIN"])
- assert result.exit_code == 0
- assert "1 / 1 succeeded" in result.output
- assert "Projects not found: {'fdsa'}" in caplog.text
-
-
-if __name__ == "__main__":
- unittest.main()