From 91f8f6154c4cbf0373577fc15c74b0d31f9a2cfc Mon Sep 17 00:00:00 2001 From: Aiqin Zhang Date: Thu, 7 Jul 2022 17:25:47 +1000 Subject: [PATCH] Cache repomd.xml and use cache when overwrite the repo since the sync packages may take a bit time, and repo.xml may have been changed in upstream --- rpm_s3_mirror/mirror.py | 116 ++++++++++++++++++++---------------- rpm_s3_mirror/repository.py | 4 +- rpm_s3_mirror/s3.py | 12 ++-- rpm_s3_mirror/util.py | 3 +- 4 files changed, 73 insertions(+), 62 deletions(-) diff --git a/rpm_s3_mirror/mirror.py b/rpm_s3_mirror/mirror.py index b3ceb89..70a86c6 100644 --- a/rpm_s3_mirror/mirror.py +++ b/rpm_s3_mirror/mirror.py @@ -10,7 +10,7 @@ from collections import namedtuple, defaultdict from urllib.parse import urlparse -from rpm_s3_mirror.repository import RPMRepository, safe_parse_xml +from rpm_s3_mirror.repository import RPMRepository, safe_parse_xml, Metadata from rpm_s3_mirror.s3 import S3, S3DirectoryNotFound from rpm_s3_mirror.statsd import StatsClient from rpm_s3_mirror.util import ( @@ -40,6 +40,7 @@ def __init__(self, config): self.session = get_requests_session() self.log = logging.getLogger(type(self).__name__) self.stats = StatsClient() + # S3 bucket self.s3 = S3( aws_secret_access_key=self.config.aws_secret_access_key, aws_access_key_id=self.config.aws_access_key_id, @@ -49,6 +50,7 @@ def __init__(self, config): max_workers=self.config.max_workers, scratch_dir=self.config.scratch_dir, ) + # upstream repos self.repositories = [RPMRepository(base_url=url) for url in config.upstream_repositories] def sync(self): @@ -69,63 +71,71 @@ def sync(self): self.stats.gauge(metric="s3_mirror_sync_seconds_total", value=time.monotonic() - start) return sync_success - def _sync_repository(self, upstream_repository): + def _sync_repository(self, upstream_repository: RPMRepository): mirror_start = time.monotonic() update_time = now() - upstream_metadata = upstream_repository.parse_metadata() + upstream_metadata: Metadata = upstream_repository.parse_metadata() + # Cache the repomd.xml when 1st reading from upstream repo. + with TemporaryDirectory(prefix=self.config.scratch_dir) as temp_dir: + base_url = upstream_repository.base_url + url = f"{base_url}repodata/repomd.xml" + cache_xml_path = download_file(temp_dir=temp_dir, url=url, session=self.session) + self.log.info("Cache repomd.xml to local path: %s", cache_xml_path) + # Our S3 mirror. Hostname based on S3 bucket region & name. Path based on upstream path. + mirror_repository: RPMRepository = RPMRepository(base_url=self._build_s3_url(upstream_repository.base_url)) + bootstrap = not mirror_repository.exists() + if bootstrap: + self.log.info("Bootstrapping repository: %s", upstream_repository.base_url) + new_packages = upstream_metadata.package_list + else: + self.log.info("Syncing repository: %s", upstream_repository.base_url) + # If the upstream repomd.xml file was updated after the last time we updated our + # mirror repomd.xml file then there is probably some work to do. + last_check_time = self.s3.repomd_update_time(base_url=mirror_repository.base_url) + if not upstream_repository.has_updates(since=last_check_time): + self.log.info("Skipping repository with no updates since: %s", last_check_time) + self.stats.gauge( + metric="s3_mirror_sync_seconds", + value=time.monotonic() - mirror_start, + tags={"repo": upstream_repository.path}, + ) + return + + # Extract our metadata and detect any new/updated packages. + mirror_metadata = mirror_repository.parse_metadata() + new_packages = set(upstream_metadata.package_list).difference(set(mirror_metadata.package_list)) + + # Sync our mirror with upstream. + self.s3.sync_packages( + base_url=upstream_metadata.base_url, + upstream_repodata=upstream_metadata.repodata, + upstream_packages=new_packages, + # If we are bootstrapping the s3 repo, it is worth checking if the package already exists as if the + # process is interrupted halfway through we would have to do a lot of potentially useless work. Once + # we have completed bootstrapping and are just running a sync we don't benefit from checking as it + # slows things down for no good reason (we expect the packages to be there already and if not + # it is a bug of some kind). + skip_existing=bootstrap, + ) - mirror_repository = RPMRepository(base_url=self._build_s3_url(upstream_repository.base_url)) - bootstrap = not mirror_repository.exists() - if bootstrap: - self.log.info("Bootstrapping repository: %s", upstream_repository.base_url) - new_packages = upstream_metadata.package_list - else: - self.log.info("Syncing repository: %s", upstream_repository.base_url) - # If the upstream repomd.xml file was updated after the last time we updated our - # mirror repomd.xml file then there is probably some work to do. - last_check_time = self.s3.repomd_update_time(base_url=mirror_repository.base_url) - if not upstream_repository.has_updates(since=last_check_time): - self.log.info("Skipping repository with no updates since: %s", last_check_time) - self.stats.gauge( - metric="s3_mirror_sync_seconds", - value=time.monotonic() - mirror_start, - tags={"repo": upstream_repository.path}, + # If we are not bootstrapping, store a manifest that describes the changes synced in this run + if not bootstrap: + manifest_location = self._build_manifest_location(base_url=upstream_repository.base_url) + repomd_path = join(manifest_location, "repomd.xml") + self.s3.archive_repomd(base_url=upstream_repository.base_url, location=repomd_path) + manifest = Manifest( + update_time=update_time, + upstream_repository=upstream_repository.base_url, + previous_repomd=repomd_path, + synced_packages=[package.to_dict() for package in new_packages], ) - return - - # Extract our metadata and detect any new/updated packages. - mirror_metadata = mirror_repository.parse_metadata() - new_packages = set(upstream_metadata.package_list).difference(set(mirror_metadata.package_list)) - - # Sync our mirror with upstream. - self.s3.sync_packages( - base_url=upstream_metadata.base_url, - upstream_repodata=upstream_metadata.repodata, - upstream_packages=new_packages, - # If we are bootstrapping the s3 repo, it is worth checking if the package already exists as if the - # process is interrupted halfway through we would have to do a lot of potentially useless work. Once - # we have completed bootstrapping and are just running a sync we don't benefit from checking as it - # slows things down for no good reason (we expect the packages to be there already and if not - # it is a bug of some kind). - skip_existing=bootstrap, - ) - - # If we are not bootstrapping, store a manifest that describes the changes synced in this run - if not bootstrap: - manifest_location = self._build_manifest_location(base_url=upstream_repository.base_url) - repomd_path = join(manifest_location, "repomd.xml") - self.s3.archive_repomd(base_url=upstream_repository.base_url, location=repomd_path) - manifest = Manifest( - update_time=update_time, - upstream_repository=upstream_repository.base_url, - previous_repomd=repomd_path, - synced_packages=[package.to_dict() for package in new_packages], - ) - manifest_path = join(manifest_location, "manifest.json") - self.s3.put_manifest(location=manifest_path, manifest=manifest) + manifest_path = join(manifest_location, "manifest.json") + self.s3.put_manifest(location=manifest_path, manifest=manifest) - # Finally, overwrite the repomd.xml file to make our changes live - self.s3.overwrite_repomd(base_url=upstream_repository.base_url) + # Finally, overwrite the repomd.xml file to make our changes live + # Copy from the cached file in /var/tmp, because sync_packages may take a bit of time, and repomd.xml may + # change in upstream. + self.s3.overwrite_repomd(repomd_xml_local_path=cache_xml_path, base_url=upstream_repository.base_url) self.log.info("Updated mirror with %s packages", len(new_packages)) self.stats.gauge( metric="s3_mirror_sync_seconds", diff --git a/rpm_s3_mirror/repository.py b/rpm_s3_mirror/repository.py index f778ef3..b63469b 100644 --- a/rpm_s3_mirror/repository.py +++ b/rpm_s3_mirror/repository.py @@ -143,6 +143,8 @@ def __iter__(self) -> Iterator[Package]: class RPMRepository: + """Upstream repository. This MAY NOT be a S3 bucket.""" + def __init__(self, base_url: str): if not base_url.startswith("https://"): raise ValueError("Only https upstream repositories can be synced from") @@ -238,7 +240,7 @@ def _rewrite_primary(self, temp_dir, primary: RepodataSection): location=f"repodata/{basename(local_path)}", ) - def _rewrite_repomd(self, repomd_xml, snapshot: SnapshotPrimary): + def _rewrite_repomd(self, repomd_xml: Element, snapshot: SnapshotPrimary): for element in repomd_xml.findall("repo:*", namespaces=namespaces): # We only support *.xml.gz files currently if element.attrib.get("type", None) not in {"primary", "filelists", "other", "modules", "updateinfo"}: diff --git a/rpm_s3_mirror/s3.py b/rpm_s3_mirror/s3.py index 22e8353..7eea595 100644 --- a/rpm_s3_mirror/s3.py +++ b/rpm_s3_mirror/s3.py @@ -78,13 +78,11 @@ def sync_packages( ) self._sync_objects(temp_dir=temp_dir, repo_objects=upstream_repodata.values(), skip_existing=skip_existing) - def overwrite_repomd(self, base_url): - with TemporaryDirectory(prefix=self.scratch_dir) as temp_dir: - url = f"{base_url}repodata/repomd.xml" - repomd_xml = download_file(temp_dir=temp_dir, url=url, session=self.session) - path = urlparse(url).path - self.log.info("Overwriting repomd.xml") - self.put_object(repomd_xml, path, cache_age=0) + def overwrite_repomd(self, repomd_xml_local_path, base_url): + url = f"{base_url}repodata/repomd.xml" + remote_path = urlparse(url).path + self.log.info("Overwriting repomd.xml") + self.put_object(repomd_xml_local_path, remote_path, cache_age=0) def archive_repomd(self, base_url, location): self.log.debug("Archiving repomd.xml to %s", location) diff --git a/rpm_s3_mirror/util.py b/rpm_s3_mirror/util.py index fdbb254..e5eb963 100644 --- a/rpm_s3_mirror/util.py +++ b/rpm_s3_mirror/util.py @@ -35,6 +35,7 @@ def get_requests_session() -> Session: def download_file(temp_dir: str, url: str, session: Session = None) -> str: + """Download a file and return the local path.""" session = session or get_requests_session() try: return _download_file(session, temp_dir, url) @@ -60,7 +61,7 @@ def _escape_s3_url(url: str) -> str: ) -def _download_file(session, temp_dir, url): +def _download_file(session, temp_dir, url) -> str: with session.get(url, stream=True) as request: request.raise_for_status() out_path = join(temp_dir, os.path.basename(url))