from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache, partial
from pathlib import Path
-from typing import ( # type: ignore # typing can't see Literal
- Generator,
- List,
- Literal,
- NamedTuple,
- Optional,
- Tuple,
- Union,
- cast,
-)
+from typing import Generator, List, NamedTuple, Optional, Tuple, Union, cast
from urllib.request import urlopen, urlretrieve
PYPI_INSTANCE = "https://pypi.org/pypi"
PYPI_TOP_PACKAGES = (
- "https://hugovk.github.io/top-pypi-packages/top-pypi-packages-{days}-days.json"
+ "https://hugovk.github.io/top-pypi-packages/top-pypi-packages-30-days.min.json"
)
INTERNAL_BLACK_REPO = f"{tempfile.gettempdir()}/__black"
ArchiveKind = Union[tarfile.TarFile, zipfile.ZipFile]
-Days = Union[Literal[30], Literal[365]]
subprocess.run = partial(subprocess.run, check=True) # type: ignore
# https://github.com/python/mypy/issues/1484
return cast(str, source["url"])
-def get_top_packages(days: Days) -> List[str]:
- with urlopen(PYPI_TOP_PACKAGES.format(days=days)) as page:
+def get_top_packages() -> List[str]:
+ with urlopen(PYPI_TOP_PACKAGES) as page:
result = json.load(page)
return [package["project"] for package in result["rows"]]
def get_package_source(package: str, version: Optional[str]) -> str:
if package == "cpython":
if version is None:
- version = "master"
+ version = "main"
return f"https://github.com/python/cpython/archive/{version}.zip"
elif package == "pypy":
if version is None:
def download_and_extract_top_packages(
- directory: Path, days: Days = 365, workers: int = 8, limit: slice = DEFAULT_SLICE,
+ directory: Path,
+ workers: int = 8,
+ limit: slice = DEFAULT_SLICE,
) -> Generator[Path, None, None]:
with ThreadPoolExecutor(max_workers=workers) as executor:
bound_downloader = partial(get_package, version=None, directory=directory)
- for package in executor.map(bound_downloader, get_top_packages(days)[limit]):
+ for package in executor.map(bound_downloader, get_top_packages()[limit]):
if package is not None:
yield package
def git_create_repository(repo: Path) -> None:
subprocess.run(["git", "init"], cwd=repo)
- git_add_and_commit(msg="Inital commit", repo=repo)
+ git_add_and_commit(msg="Initial commit", repo=repo)
def git_add_and_commit(msg: str, repo: Path) -> None:
black_version=black_version,
input_directory=options.input,
)
- git_switch_branch("master", repo=repo)
+ git_switch_branch("main", repo=repo)
- git_switch_branch("master", repo=options.black_repo)
+ git_switch_branch("main", repo=options.black_repo)
def main() -> None:
type=Path,
help="Output directory to download and put result artifacts.",
)
- parser.add_argument("versions", nargs="*", default=("master",), help="")
+ parser.add_argument("versions", nargs="*", default=("main",), help="")
options = parser.parse_args()
repos = init_repos(options)