Compare commits

...

9 Commits

Author SHA1 Message Date
Ivan Pozdeev
55b43a5566 save newly generated scripts into existing_scripts 2025-11-15 21:41:50 +03:00
Ivan Pozdeev
7bb34890d1 Clarify version for initial relesaes early and retain download file information 2025-11-15 21:13:25 +03:00
Ivan Pozdeev
949ff7f5da + progress bar 2025-11-15 21:12:26 +03:00
Ivan Pozdeev
2c16db9dfc Add in sorted order, report versions to add in info 2025-11-15 18:43:30 +03:00
Ivan Pozdeev
0542763abb -dead code 2025-11-15 18:42:53 +03:00
Ivan Pozdeev
00fd0d5595 add progress bar 2025-11-15 18:42:34 +03:00
Ivan Pozdeev
602bb7a8e6 reuse HTTP session 2025-11-15 18:41:57 +03:00
Ivan Pozdeev
f561ab17e2 Update existing_versions when deleting older prerelease 2025-11-15 18:40:28 +03:00
Ivan Pozdeev
047be557d4 fix message 2025-11-15 18:36:22 +03:00
2 changed files with 104 additions and 70 deletions

View File

@ -21,9 +21,9 @@ import urllib.parse
import more_itertools
import packaging.version
import requests
import requests_html
import sortedcontainers
import tqdm
logger = logging.getLogger(__name__)
@ -31,7 +31,7 @@ REPO = "https://www.python.org/ftp/python/"
CUTOFF_VERSION=packaging.version.Version('3.9')
EXCLUDED_VERSIONS= {
packaging.version.Version("3.9.3") #recalled
packaging.version.Version("3.9.3") #recalled upstream
}
here = pathlib.Path(__file__).resolve()
@ -62,7 +62,7 @@ def get_existing_scripts(name="CPython", pattern=r'^\d+\.\d+(?:(t?)(-\w+)|(.\d+(
logger.error("Unable to parse existing version %s: %s", entry_name, e)
def _get_download_entries(url, pattern, session=None):
def _get_download_entries(url, pattern, session=None) -> typing.Generator[typing.Tuple[str,str],None,None]:
if session is None:
session = requests_html.HTMLSession()
response = session.get(url)
@ -84,7 +84,7 @@ def get_available_versions(name="CPython", url=REPO, pattern=r'^\d+', session=No
logger.info("Fetching remote %(name)s versions",locals())
for name, url in _get_download_entries(url, pattern, session):
logger.debug('Available %(name)s version: %(link)s', locals())
logger.debug(f'Available version: {name}, {url}')
yield packaging.version.Version(name), url
@ -108,7 +108,7 @@ def pick_previous_version(version: packaging.version.Version,
def adapt_script(version: packaging.version.Version,
extensions_urls: typing.Dict[str,typing.Tuple[str,str]],
previous_version: packaging.version.Version,
is_prerelease_upgrade: bool) -> None:
session: requests_html.BaseSession = None) -> pathlib.Path:
previous_version_path = out_dir.joinpath(str(previous_version))
with previous_version_path.open("r", encoding='utf-8') as f:
script = f.readlines()
@ -125,7 +125,7 @@ def adapt_script(version: packaging.version.Version,
f'to available packages {extensions_urls}')
return
new_package_name, new_package_url = extensions_urls[matched_extension]
new_package_hash = Url.sha256_url(new_package_url)
new_package_hash = Url.sha256_url(new_package_url, session)
verify_py_suffix = str(version.major)+str(version.minor)
@ -140,10 +140,62 @@ def adapt_script(version: packaging.version.Version,
result_path.write_text(result.getvalue(), encoding='utf-8')
result.close()
return result_path
def add_version(version: packaging.version.Version,
url: str,
existing_versions: typing.MutableMapping[packaging.version.Version,typing.Any],
available_downloads_repo: typing.Mapping[packaging.version.Version,typing.Mapping[str,str]],
session: requests_html.BaseSession = None):
previous_version = pick_previous_version(version, existing_versions)
is_prerelease_upgrade = previous_version.major==version.major\
and previous_version.minor==version.minor\
and previous_version.micro==version.micro
logger.info(f"Adding {version} based on {previous_version}")
latest_available_download = available_downloads_repo.get(version, None)
if latest_available_download is None:
latest_available_download_version, latest_available_download = \
get_latest_available_download(url, session)
# The only case when site dir is not equal to a download's version
# is for prereleases -- which we should have already clarified
# in clarify_available_versions_for_initial_releases
assert latest_available_download_version == version
del latest_available_download_version
new_path = adapt_script(version,
latest_available_download,
previous_version,
session)
existing_versions[version]=new_path.name
cleanup_prerelease_upgrade(is_prerelease_upgrade, previous_version, existing_versions)
handle_t_thunks(version, previous_version, is_prerelease_upgrade)
return True
def get_latest_available_download(url, session):
available_downloads = get_available_source_downloads(url, session)
latest_available_download_version = max(available_downloads.keys())
return (latest_available_download_version,available_downloads[latest_available_download_version])
def cleanup_prerelease_upgrade(
is_prerelease_upgrade: bool,
previous_version: packaging.version.Version,
existing_versions: typing.MutableMapping[packaging.version.Version,typing.Any])\
-> None:
if is_prerelease_upgrade:
previous_version_path = out_dir.joinpath(str(previous_version))
logger.debug(f'Deleting {previous_version_path}')
previous_version_path.unlink()
del existing_versions[previous_version]
def handle_t_thunks(version, previous_version, is_prerelease_upgrade):
if (version.major, version.minor) >= (3, 13):
# an old thunk may have older version-specific code
# so it's safer to write a known version-independent template
@ -155,74 +207,48 @@ def adapt_script(version: packaging.version.Version,
logger.debug(f"Deleting {previous_thunk_path}")
previous_thunk_path.unlink()
def add_version(version: packaging.version.Version,
url: str,
existing_versions: typing.Collection[packaging.version.Version],
session: requests_html.BaseSession = None):
previous_version = pick_previous_version(version, existing_versions)
is_prerelease_upgrade = previous_version.major==version.major\
and previous_version.minor==version.minor\
and previous_version.micro==version.micro
if is_prerelease_upgrade:
logger.info(f"Checking for a (pre)release for {version} newer than {previous_version}")
else:
logger.info(f"Adding {version} based on {previous_version}")
available_downloads = get_available_source_downloads(url, session)
latest_available_download_version = max(available_downloads.keys())
if latest_available_download_version == previous_version:
logger.info("No newer download found")
return
adapt_script(latest_available_download_version,
available_downloads[latest_available_download_version],
previous_version,
is_prerelease_upgrade)
def main():
args = parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
cached_session=None
cached_session=requests_html.HTMLSession()
available_downloads_repo={}
existing_versions = dict(get_existing_scripts())
available_versions = dict(get_available_versions(session=cached_session))
# version triple to triple-ified spec to raw spec
versions_to_add = set(available_versions.keys()) - set(existing_versions.keys())
versions_to_add = {v for v in versions_to_add if v>=CUTOFF_VERSION and v not in EXCLUDED_VERSIONS}
logger.info("Checking for new versions")
available_versions = {v:url for v,url in get_available_versions(session=cached_session)
if v>=CUTOFF_VERSION and v not in EXCLUDED_VERSIONS}
clarify_available_versions_for_initial_releases(
available_versions,
existing_versions,
available_downloads_repo,
cached_session)
versions_to_add = sorted(set(available_versions.keys()) - set(existing_versions.keys()))
logger.debug("Existing_versions:\n"+pprint.pformat(existing_versions))
logger.debug("Available_versions:\n"+pprint.pformat(available_versions))
logger.debug("Versions to add:\n"+pprint.pformat(versions_to_add))
logger.info("Versions to add:\n"+pprint.pformat(versions_to_add))
for version_to_add in versions_to_add:
add_version(version_to_add, available_versions[version_to_add], existing_versions)
# for s in available_versions:
# key = s.version
# vv = key.version_str.info()
#
# reason = None
# if key in existing_versions:
# reason = "already exists"
# elif key.version_str.info() <= (4, 3, 30):
# reason = "too old"
# elif len(key.version_str.info()) >= 4 and "-" not in key.version_str:
# reason = "ignoring hotfix releases"
#
# if reason:
# logger.debug("Ignoring version %(s)s (%(reason)s)", locals())
# continue
#
# to_add[key][s] = s
# logger.info("Writing %s scripts", len(to_add))
# for ver, d in to_add.items():
# specs = list(d.values())
# fpath = out_dir / ver.to_filename()
# script_str = make_script(specs)
# logger.info("Writing script for %s", ver)
# if args.dry_run:
# print(f"Would write spec to {fpath}:\n" + textwrap.indent(script_str, " "))
# else:
# with open(fpath, "w") as f:
# f.write(script_str)
add_version(version_to_add, available_versions[version_to_add], existing_versions, available_downloads_repo, session=cached_session)
def clarify_available_versions_for_initial_releases(available_versions, existing_versions, available_downloads_repo,
session):
for v in [v for v in available_versions if v.micro == 0 and v not in existing_versions]:
logger.debug(f"Clarifying available version for {v}")
latest_available_download_version, latest_available_download = \
get_latest_available_download(available_versions[v], session)
available_downloads_repo[latest_available_download_version] = latest_available_download
if latest_available_download_version != v:
logger.info(f"Adjusting available version {v} to {latest_available_download_version}")
available_versions[latest_available_download_version] = available_versions[v]
del available_versions[v]
def parse_args():
@ -238,6 +264,7 @@ def parse_args():
parsed = parser.parse_args()
return parsed
class Re:
@dataclasses.dataclass
class _interval:
@ -283,13 +310,19 @@ class Re:
return result
class Url:
logger = logging.getLogger("Url")
@staticmethod
def sha256_url(url):
def sha256_url(url, session=None):
if session is None:
session = requests_html.HTMLSession()
logger.info(f"Downloading and computing hash of {url}")
h=hashlib.sha256()
r=requests.get(url,stream=True)
for c in r.iter_content(None):
h.update(c)
r=session.get(url,stream=True)
total_bytes=int(r.headers.get('content-length',0)) or float('inf')
with tqdm.tqdm(total=total_bytes, unit='B', unit_scale=True, unit_divisor=1024) as t:
for c in r.iter_content(1024):
t.update(len(c))
h.update(c)
return h.hexdigest()

View File

@ -2,4 +2,5 @@ requests-html
packaging~=20.4
requests~=2.32.4
sortedcontainers~=2.4.0
sortedcontainers~=2.4.0
tqdm~=4.51.0