Skip to content

Commit

Permalink
Cache repomd.xml and use cache when overwrite the repo
Browse files Browse the repository at this point in the history
since the sync packages may take a bit time, and repo.xml may have been changed in upstream
  • Loading branch information
Aiqin-Aiven committed Jul 8, 2022
1 parent 98a53be commit 91f8f61
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 62 deletions.
116 changes: 63 additions & 53 deletions rpm_s3_mirror/mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections import namedtuple, defaultdict
from urllib.parse import urlparse

from rpm_s3_mirror.repository import RPMRepository, safe_parse_xml
from rpm_s3_mirror.repository import RPMRepository, safe_parse_xml, Metadata
from rpm_s3_mirror.s3 import S3, S3DirectoryNotFound
from rpm_s3_mirror.statsd import StatsClient
from rpm_s3_mirror.util import (
Expand Down Expand Up @@ -40,6 +40,7 @@ def __init__(self, config):
self.session = get_requests_session()
self.log = logging.getLogger(type(self).__name__)
self.stats = StatsClient()
# S3 bucket
self.s3 = S3(
aws_secret_access_key=self.config.aws_secret_access_key,
aws_access_key_id=self.config.aws_access_key_id,
Expand All @@ -49,6 +50,7 @@ def __init__(self, config):
max_workers=self.config.max_workers,
scratch_dir=self.config.scratch_dir,
)
# upstream repos
self.repositories = [RPMRepository(base_url=url) for url in config.upstream_repositories]

def sync(self):
Expand All @@ -69,63 +71,71 @@ def sync(self):
self.stats.gauge(metric="s3_mirror_sync_seconds_total", value=time.monotonic() - start)
return sync_success

def _sync_repository(self, upstream_repository):
def _sync_repository(self, upstream_repository: RPMRepository):
mirror_start = time.monotonic()
update_time = now()
upstream_metadata = upstream_repository.parse_metadata()
upstream_metadata: Metadata = upstream_repository.parse_metadata()
# Cache the repomd.xml when 1st reading from upstream repo.
with TemporaryDirectory(prefix=self.config.scratch_dir) as temp_dir:
base_url = upstream_repository.base_url
url = f"{base_url}repodata/repomd.xml"
cache_xml_path = download_file(temp_dir=temp_dir, url=url, session=self.session)
self.log.info("Cache repomd.xml to local path: %s", cache_xml_path)
# Our S3 mirror. Hostname based on S3 bucket region & name. Path based on upstream path.
mirror_repository: RPMRepository = RPMRepository(base_url=self._build_s3_url(upstream_repository.base_url))
bootstrap = not mirror_repository.exists()
if bootstrap:
self.log.info("Bootstrapping repository: %s", upstream_repository.base_url)
new_packages = upstream_metadata.package_list
else:
self.log.info("Syncing repository: %s", upstream_repository.base_url)
# If the upstream repomd.xml file was updated after the last time we updated our
# mirror repomd.xml file then there is probably some work to do.
last_check_time = self.s3.repomd_update_time(base_url=mirror_repository.base_url)
if not upstream_repository.has_updates(since=last_check_time):
self.log.info("Skipping repository with no updates since: %s", last_check_time)
self.stats.gauge(
metric="s3_mirror_sync_seconds",
value=time.monotonic() - mirror_start,
tags={"repo": upstream_repository.path},
)
return

# Extract our metadata and detect any new/updated packages.
mirror_metadata = mirror_repository.parse_metadata()
new_packages = set(upstream_metadata.package_list).difference(set(mirror_metadata.package_list))

# Sync our mirror with upstream.
self.s3.sync_packages(
base_url=upstream_metadata.base_url,
upstream_repodata=upstream_metadata.repodata,
upstream_packages=new_packages,
# If we are bootstrapping the s3 repo, it is worth checking if the package already exists as if the
# process is interrupted halfway through we would have to do a lot of potentially useless work. Once
# we have completed bootstrapping and are just running a sync we don't benefit from checking as it
# slows things down for no good reason (we expect the packages to be there already and if not
# it is a bug of some kind).
skip_existing=bootstrap,
)

mirror_repository = RPMRepository(base_url=self._build_s3_url(upstream_repository.base_url))
bootstrap = not mirror_repository.exists()
if bootstrap:
self.log.info("Bootstrapping repository: %s", upstream_repository.base_url)
new_packages = upstream_metadata.package_list
else:
self.log.info("Syncing repository: %s", upstream_repository.base_url)
# If the upstream repomd.xml file was updated after the last time we updated our
# mirror repomd.xml file then there is probably some work to do.
last_check_time = self.s3.repomd_update_time(base_url=mirror_repository.base_url)
if not upstream_repository.has_updates(since=last_check_time):
self.log.info("Skipping repository with no updates since: %s", last_check_time)
self.stats.gauge(
metric="s3_mirror_sync_seconds",
value=time.monotonic() - mirror_start,
tags={"repo": upstream_repository.path},
# If we are not bootstrapping, store a manifest that describes the changes synced in this run
if not bootstrap:
manifest_location = self._build_manifest_location(base_url=upstream_repository.base_url)
repomd_path = join(manifest_location, "repomd.xml")
self.s3.archive_repomd(base_url=upstream_repository.base_url, location=repomd_path)
manifest = Manifest(
update_time=update_time,
upstream_repository=upstream_repository.base_url,
previous_repomd=repomd_path,
synced_packages=[package.to_dict() for package in new_packages],
)
return

# Extract our metadata and detect any new/updated packages.
mirror_metadata = mirror_repository.parse_metadata()
new_packages = set(upstream_metadata.package_list).difference(set(mirror_metadata.package_list))

# Sync our mirror with upstream.
self.s3.sync_packages(
base_url=upstream_metadata.base_url,
upstream_repodata=upstream_metadata.repodata,
upstream_packages=new_packages,
# If we are bootstrapping the s3 repo, it is worth checking if the package already exists as if the
# process is interrupted halfway through we would have to do a lot of potentially useless work. Once
# we have completed bootstrapping and are just running a sync we don't benefit from checking as it
# slows things down for no good reason (we expect the packages to be there already and if not
# it is a bug of some kind).
skip_existing=bootstrap,
)

# If we are not bootstrapping, store a manifest that describes the changes synced in this run
if not bootstrap:
manifest_location = self._build_manifest_location(base_url=upstream_repository.base_url)
repomd_path = join(manifest_location, "repomd.xml")
self.s3.archive_repomd(base_url=upstream_repository.base_url, location=repomd_path)
manifest = Manifest(
update_time=update_time,
upstream_repository=upstream_repository.base_url,
previous_repomd=repomd_path,
synced_packages=[package.to_dict() for package in new_packages],
)
manifest_path = join(manifest_location, "manifest.json")
self.s3.put_manifest(location=manifest_path, manifest=manifest)
manifest_path = join(manifest_location, "manifest.json")
self.s3.put_manifest(location=manifest_path, manifest=manifest)

# Finally, overwrite the repomd.xml file to make our changes live
self.s3.overwrite_repomd(base_url=upstream_repository.base_url)
# Finally, overwrite the repomd.xml file to make our changes live
# Copy from the cached file in /var/tmp, because sync_packages may take a bit of time, and repomd.xml may
# change in upstream.
self.s3.overwrite_repomd(repomd_xml_local_path=cache_xml_path, base_url=upstream_repository.base_url)
self.log.info("Updated mirror with %s packages", len(new_packages))
self.stats.gauge(
metric="s3_mirror_sync_seconds",
Expand Down
4 changes: 3 additions & 1 deletion rpm_s3_mirror/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def __iter__(self) -> Iterator[Package]:


class RPMRepository:
"""Upstream repository. This MAY NOT be a S3 bucket."""

def __init__(self, base_url: str):
if not base_url.startswith("https://"):
raise ValueError("Only https upstream repositories can be synced from")
Expand Down Expand Up @@ -238,7 +240,7 @@ def _rewrite_primary(self, temp_dir, primary: RepodataSection):
location=f"repodata/{basename(local_path)}",
)

def _rewrite_repomd(self, repomd_xml, snapshot: SnapshotPrimary):
def _rewrite_repomd(self, repomd_xml: Element, snapshot: SnapshotPrimary):
for element in repomd_xml.findall("repo:*", namespaces=namespaces):
# We only support *.xml.gz files currently
if element.attrib.get("type", None) not in {"primary", "filelists", "other", "modules", "updateinfo"}:
Expand Down
12 changes: 5 additions & 7 deletions rpm_s3_mirror/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,11 @@ def sync_packages(
)
self._sync_objects(temp_dir=temp_dir, repo_objects=upstream_repodata.values(), skip_existing=skip_existing)

def overwrite_repomd(self, base_url):
with TemporaryDirectory(prefix=self.scratch_dir) as temp_dir:
url = f"{base_url}repodata/repomd.xml"
repomd_xml = download_file(temp_dir=temp_dir, url=url, session=self.session)
path = urlparse(url).path
self.log.info("Overwriting repomd.xml")
self.put_object(repomd_xml, path, cache_age=0)
def overwrite_repomd(self, repomd_xml_local_path, base_url):
url = f"{base_url}repodata/repomd.xml"
remote_path = urlparse(url).path
self.log.info("Overwriting repomd.xml")
self.put_object(repomd_xml_local_path, remote_path, cache_age=0)

def archive_repomd(self, base_url, location):
self.log.debug("Archiving repomd.xml to %s", location)
Expand Down
3 changes: 2 additions & 1 deletion rpm_s3_mirror/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def get_requests_session() -> Session:


def download_file(temp_dir: str, url: str, session: Session = None) -> str:
"""Download a file and return the local path."""
session = session or get_requests_session()
try:
return _download_file(session, temp_dir, url)
Expand All @@ -60,7 +61,7 @@ def _escape_s3_url(url: str) -> str:
)


def _download_file(session, temp_dir, url):
def _download_file(session, temp_dir, url) -> str:
with session.get(url, stream=True) as request:
request.raise_for_status()
out_path = join(temp_dir, os.path.basename(url))
Expand Down

0 comments on commit 91f8f61

Please sign in to comment.