Skip to content

Commit

Permalink
Cache repomd.xml and use cache when overwrite the repo
Browse files Browse the repository at this point in the history
since the sync packages may take a bit time, and repo.xml may have been changed in upstream
  • Loading branch information
Aiqin-Aiven committed Jul 7, 2022
1 parent 98a53be commit 6ecc9bb
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 12 deletions.
20 changes: 14 additions & 6 deletions rpm_s3_mirror/mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections import namedtuple, defaultdict
from urllib.parse import urlparse

from rpm_s3_mirror.repository import RPMRepository, safe_parse_xml
from rpm_s3_mirror.repository import RPMRepository, safe_parse_xml, Metadata
from rpm_s3_mirror.s3 import S3, S3DirectoryNotFound
from rpm_s3_mirror.statsd import StatsClient
from rpm_s3_mirror.util import (
Expand Down Expand Up @@ -40,6 +40,7 @@ def __init__(self, config):
self.session = get_requests_session()
self.log = logging.getLogger(type(self).__name__)
self.stats = StatsClient()
# S3 bucket
self.s3 = S3(
aws_secret_access_key=self.config.aws_secret_access_key,
aws_access_key_id=self.config.aws_access_key_id,
Expand All @@ -49,6 +50,7 @@ def __init__(self, config):
max_workers=self.config.max_workers,
scratch_dir=self.config.scratch_dir,
)
# upstream repos
self.repositories = [RPMRepository(base_url=url) for url in config.upstream_repositories]

def sync(self):
Expand All @@ -69,12 +71,17 @@ def sync(self):
self.stats.gauge(metric="s3_mirror_sync_seconds_total", value=time.monotonic() - start)
return sync_success

def _sync_repository(self, upstream_repository):
def _sync_repository(self, upstream_repository: RPMRepository):
mirror_start = time.monotonic()
update_time = now()
upstream_metadata = upstream_repository.parse_metadata()

mirror_repository = RPMRepository(base_url=self._build_s3_url(upstream_repository.base_url))
upstream_metadata: Metadata = upstream_repository.parse_metadata()
# Cache the repomd.xml when 1st reading from upstream repo.
base_url = upstream_repository.base_url
url = f"{base_url}repodata/repomd.xml"
cache_xml_path = download_file(temp_dir="/var/tmp", url=url, session=self.session)
self.log.info("Cache repomd.xml to local path: %s", cache_xml_path)
# Our S3 mirror. Hostname based on S3 bucket region & name. Path based on upstream path.
mirror_repository: RPMRepository = RPMRepository(base_url=self._build_s3_url(upstream_repository.base_url))
bootstrap = not mirror_repository.exists()
if bootstrap:
self.log.info("Bootstrapping repository: %s", upstream_repository.base_url)
Expand Down Expand Up @@ -125,7 +132,8 @@ def _sync_repository(self, upstream_repository):
self.s3.put_manifest(location=manifest_path, manifest=manifest)

# Finally, overwrite the repomd.xml file to make our changes live
self.s3.overwrite_repomd(base_url=upstream_repository.base_url)
# Copy from the cached file in /var/tmp, because sync_packages may take a bit of time, and repomd.xml may change in upstream
self.s3.overwrite_repomd(repomd_xml_local_path=cache_xml_path, base_url=upstream_repository.base_url)
self.log.info("Updated mirror with %s packages", len(new_packages))
self.stats.gauge(
metric="s3_mirror_sync_seconds",
Expand Down
4 changes: 3 additions & 1 deletion rpm_s3_mirror/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def __iter__(self) -> Iterator[Package]:


class RPMRepository:
"""Upstream repository. This MAY NOT be a S3 bucket."""

def __init__(self, base_url: str):
if not base_url.startswith("https://"):
raise ValueError("Only https upstream repositories can be synced from")
Expand Down Expand Up @@ -238,7 +240,7 @@ def _rewrite_primary(self, temp_dir, primary: RepodataSection):
location=f"repodata/{basename(local_path)}",
)

def _rewrite_repomd(self, repomd_xml, snapshot: SnapshotPrimary):
def _rewrite_repomd(self, repomd_xml: Element, snapshot: SnapshotPrimary):
for element in repomd_xml.findall("repo:*", namespaces=namespaces):
# We only support *.xml.gz files currently
if element.attrib.get("type", None) not in {"primary", "filelists", "other", "modules", "updateinfo"}:
Expand Down
8 changes: 4 additions & 4 deletions rpm_s3_mirror/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ def sync_packages(
)
self._sync_objects(temp_dir=temp_dir, repo_objects=upstream_repodata.values(), skip_existing=skip_existing)

def overwrite_repomd(self, base_url):
def overwrite_repomd(self, repomd_xml_local_path, base_url):
with TemporaryDirectory(prefix=self.scratch_dir) as temp_dir:
url = f"{base_url}repodata/repomd.xml"
repomd_xml = download_file(temp_dir=temp_dir, url=url, session=self.session)
path = urlparse(url).path
remote_path = urlparse(url).path
self.log.info("Overwriting repomd.xml")
self.put_object(repomd_xml, path, cache_age=0)
self.put_object(repomd_xml_local_path, remote_path, cache_age=0)
os.remove("/var/tmp/repomd.xml")

def archive_repomd(self, base_url, location):
self.log.debug("Archiving repomd.xml to %s", location)
Expand Down
3 changes: 2 additions & 1 deletion rpm_s3_mirror/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def get_requests_session() -> Session:


def download_file(temp_dir: str, url: str, session: Session = None) -> str:
"""Download a file and return the local path."""
session = session or get_requests_session()
try:
return _download_file(session, temp_dir, url)
Expand All @@ -60,7 +61,7 @@ def _escape_s3_url(url: str) -> str:
)


def _download_file(session, temp_dir, url):
def _download_file(session, temp_dir, url) -> str:
with session.get(url, stream=True) as request:
request.raise_for_status()
out_path = join(temp_dir, os.path.basename(url))
Expand Down

0 comments on commit 6ecc9bb

Please sign in to comment.