From 7bb34890d1e18378d48ecbd592f00d67ec91ca9b Mon Sep 17 00:00:00 2001 From: Ivan Pozdeev Date: Sat, 15 Nov 2025 21:13:25 +0300 Subject: [PATCH] Clarify version for initial relesaes early and retain download file information --- plugins/python-build/scripts/add_cpython.py | 76 ++++++++++++++------- 1 file changed, 53 insertions(+), 23 deletions(-) diff --git a/plugins/python-build/scripts/add_cpython.py b/plugins/python-build/scripts/add_cpython.py index bf5c0252..fe381e57 100644 --- a/plugins/python-build/scripts/add_cpython.py +++ b/plugins/python-build/scripts/add_cpython.py @@ -21,7 +21,6 @@ import urllib.parse import more_itertools import packaging.version -import requests import requests_html import sortedcontainers import tqdm @@ -32,7 +31,7 @@ REPO = "https://www.python.org/ftp/python/" CUTOFF_VERSION=packaging.version.Version('3.9') EXCLUDED_VERSIONS= { - packaging.version.Version("3.9.3") #recalled + packaging.version.Version("3.9.3") #recalled upstream } here = pathlib.Path(__file__).resolve() @@ -63,7 +62,7 @@ def get_existing_scripts(name="CPython", pattern=r'^\d+\.\d+(?:(t?)(-\w+)|(.\d+( logger.error("Unable to parse existing version %s: %s", entry_name, e) -def _get_download_entries(url, pattern, session=None): +def _get_download_entries(url, pattern, session=None) -> typing.Generator[typing.Tuple[str,str],None,None]: if session is None: session = requests_html.HTMLSession() response = session.get(url) @@ -145,27 +144,27 @@ def adapt_script(version: packaging.version.Version, def add_version(version: packaging.version.Version, url: str, existing_versions: typing.MutableMapping[packaging.version.Version,typing.Any], + available_downloads_repo: typing.Mapping[packaging.version.Version,typing.Mapping[str,str]], session: requests_html.BaseSession = None): previous_version = pick_previous_version(version, existing_versions) is_prerelease_upgrade = previous_version.major==version.major\ and previous_version.minor==version.minor\ and previous_version.micro==version.micro - if is_prerelease_upgrade: - logger.info(f"Checking for a (pre)release for {version} newer than {previous_version}") - else: - logger.info(f"Adding {version} based on {previous_version}") - available_downloads = get_available_source_downloads(url, session) - latest_available_download_version = max(available_downloads.keys()) - if is_prerelease_upgrade: - if latest_available_download_version == previous_version: - logger.info("No newer download found") - return False - else: - logger.info(f"Adding {version} replacing {previous_version}") + logger.info(f"Adding {version} based on {previous_version}") - adapt_script(latest_available_download_version, - available_downloads[latest_available_download_version], + latest_available_download = available_downloads_repo.get(version, None) + if latest_available_download is None: + latest_available_download_version, latest_available_download = \ + get_latest_available_download(url, session) + # The only case when site dir is not equal to a download's version + # is for prereleases -- which we should have already clarified + # in clarify_available_versions_for_initial_releases + assert latest_available_download_version == version + del latest_available_download_version + + adapt_script(version, + latest_available_download, previous_version, is_prerelease_upgrade, session) @@ -177,6 +176,12 @@ def add_version(version: packaging.version.Version, return True +def get_latest_available_download(url, session): + available_downloads = get_available_source_downloads(url, session) + latest_available_download_version = max(available_downloads.keys()) + return (latest_available_download_version,available_downloads[latest_available_download_version]) + + def cleanup_prerelease_upgrade( is_prerelease_upgrade: bool, previous_version: packaging.version.Version, @@ -205,19 +210,44 @@ def handle_t_thunks(version, previous_version, is_prerelease_upgrade): def main(): args = parse_args() logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) + cached_session=requests_html.HTMLSession() + available_downloads_repo={} existing_versions = dict(get_existing_scripts()) - available_versions = dict(get_available_versions(session=cached_session)) - # version triple to triple-ified spec to raw spec - versions_to_add = set(available_versions.keys()) - set(existing_versions.keys()) - versions_to_add = sorted({v for v in versions_to_add if v>=CUTOFF_VERSION and v not in EXCLUDED_VERSIONS}) - logger.info("Checking for new versions") + available_versions = {v:url for v,url in get_available_versions(session=cached_session) + if v>=CUTOFF_VERSION and v not in EXCLUDED_VERSIONS} + + clarify_available_versions_for_initial_releases( + available_versions, + existing_versions, + available_downloads_repo, + cached_session) + + + versions_to_add = sorted(set(available_versions.keys()) - set(existing_versions.keys())) + + logger.debug("Existing_versions:\n"+pprint.pformat(existing_versions)) logger.debug("Available_versions:\n"+pprint.pformat(available_versions)) logger.info("Versions to add:\n"+pprint.pformat(versions_to_add)) for version_to_add in versions_to_add: - add_version(version_to_add, available_versions[version_to_add], existing_versions, session=cached_session) + add_version(version_to_add, available_versions[version_to_add], existing_versions, available_downloads_repo, session=cached_session) + + +def clarify_available_versions_for_initial_releases(available_versions, existing_versions, available_downloads_repo, + session): + for v in [v for v in available_versions if v.micro == 0 and v not in existing_versions]: + logger.debug(f"Clarifying available version for {v}") + latest_available_download_version, latest_available_download = \ + get_latest_available_download(available_versions[v], session) + + available_downloads_repo[latest_available_download_version] = latest_available_download + + if latest_available_download_version != v: + logger.info(f"Adjusting available version {v} to {latest_available_download_version}") + available_versions[latest_available_download_version] = available_versions[v] + del available_versions[v] def parse_args():