Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support stripping metadata from updates. #25

Merged
merged 2 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rpm_s3_mirror.spec
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Requires: python3-dateutil
Requires: python3-boto3
Requires: python3-lxml
Requires: systemd
Requires: zchunk

%undefine _missing_build_ids_terminate_build

Expand Down
4 changes: 3 additions & 1 deletion rpm_s3_mirror/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DEFAULTS = {
"scratch_dir": "/var/tmp/",
"max_workers": 4,
"trim_updates_to_arches": [],
}


Expand All @@ -31,6 +32,7 @@ class Config:
scratch_dir = None
max_workers = None
upstream_repositories = None
trim_updates_to_arches = None
_config = DEFAULTS

def __init__(self):
Expand Down Expand Up @@ -63,7 +65,7 @@ def _populate_required(self):
raise ConfigError(f"Missing required environment variable: {key.upper()}")
else:
continue
elif key == "upstream_repositories":
elif key in ("upstream_repositories", "trim_updates_to_arches"):
facetoe marked this conversation as resolved.
Show resolved Hide resolved
value = value.split(",")
elif key == "max_workers":
value = int(value)
Expand Down
32 changes: 27 additions & 5 deletions rpm_s3_mirror/mirror.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/

import logging
import os
import re
from contextlib import suppress
from os.path import join
from os.path import join, basename
from tempfile import TemporaryDirectory

import time
Expand Down Expand Up @@ -132,10 +133,31 @@ def _sync_repository(self, upstream_repository: RPMRepository):
manifest_path = join(manifest_location, "manifest.json")
self.s3.put_manifest(location=manifest_path, manifest=manifest)

# Finally, overwrite the repomd.xml file to make our changes live
# Copy from the cached file in /var/tmp, because sync_packages may take a bit of time, and repomd.xml may
# change in upstream.
self.s3.overwrite_repomd(repomd_xml_local_path=cache_xml_path, base_url=upstream_repository.base_url)
# The metadata can be quite large and cause dnf to use excessive memory processing
# the updates. As an optimization, support dropping architectures that we know
# we don't need.
if self.config.trim_updates_to_arches:
self.log.info("Trimming metadata to: %s", self.config.trim_updates_to_arches)
metadata_scratch = os.path.join(temp_dir, "metadata")
os.makedirs(metadata_scratch, exist_ok=True)
with open(cache_xml_path, "rb") as f:
repodata_files = upstream_repository.strip_metadata(
xml_bytes=f.read(),
target_arches=self.config.trim_updates_to_arches,
scratch_dir=metadata_scratch,
)
base_path = urlparse(upstream_repository.base_url).path[1:] # need to strip the leading slash
facetoe marked this conversation as resolved.
Show resolved Hide resolved
for file_path in repodata_files.upload_files:
dest_path = join(base_path, "repodata", basename(file_path))
self.log.info("Uploading: %s -> %s", file_path, dest_path)
self.s3.put_object(
local_path=file_path,
key=dest_path,
)
else:
# Overwrite the repomd.xml file from upstream to make our changes live.
self.log.info("Overwriting repomd.xml")
self.s3.overwrite_repomd(repomd_xml_local_path=cache_xml_path, base_url=upstream_repository.base_url)
self.log.info("Updated mirror with %s packages", len(new_packages))
self.stats.gauge(
metric="s3_mirror_sync_seconds",
Expand Down
196 changes: 168 additions & 28 deletions rpm_s3_mirror/repository.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/

import dataclasses
import lzma
import re
import subprocess
from abc import abstractmethod
from collections import namedtuple
from datetime import datetime
from typing import Iterator, Dict
from typing import Iterator, Dict, Optional, Tuple
from urllib.parse import urlparse
from xml.etree.ElementTree import ElementTree

from lxml.etree import fromstring, Element, tostring # pylint: disable=no-name-in-module
from lxml.etree import XMLParser # pylint: disable=no-name-in-module
Expand Down Expand Up @@ -126,20 +131,128 @@ def __iter__(self) -> Iterator[Package]:
"checksum",
],
)
SnapshotPrimary = namedtuple(
"SnapshotPrimary",
[
"open_checksum",
"checksum",
"checksum_type",
"size",
"open_size",
"local_path",
"location",
],
)

Snapshot = namedtuple("Snapshot", ["sync_files", "upload_files"])
RepoDataFiles = namedtuple("RepoDataFiles", ["sync_files", "upload_files"])


@dataclasses.dataclass
class SectionMetadata:
size: int
open_size: int
open_checksum: str
checksum: str
local_path: str
location: str
checksum_type: str = "sha256"
header_checksum: Optional[str] = None
header_size: Optional[int] = None


class UpdateInfoSection:
def __init__(self, path: str, scratch_dir):
self.path = path
self.scratch_dir = scratch_dir

@classmethod
def from_path(cls, path: str, scratch_dir):
if path.endswith(".zck"):
return ZCKUpdateInfoSection(path, scratch_dir)
elif path.endswith(".xz"):
return XZUpdateInfoSection(path, scratch_dir)
else:
raise ValueError("Only xz and zck files supported")

@abstractmethod
def _read(self) -> bytes:
pass

@abstractmethod
def _compress(self, root, open_size, open_checksum):
pass

def strip_to_arches(self, arches):
xml_bytes = self._read()
root = safe_parse_xml(xml_bytes)
self._strip(root, arches)
open_size = len(xml_bytes)
open_checksum = sha256(xml_bytes)
return self._compress(root, open_size, open_checksum)

def _strip(self, root, arches):
for update_element in root:
for collection in update_element.find("pkglist"):
for package in collection.getchildren():
arch = package.get("arch")
if arch is not None and arch not in arches:
collection.remove(package)


class XZUpdateInfoSection(UpdateInfoSection):
def _read(self) -> bytes:
with lzma.open(self.path, mode="rb") as f:
return f.read()

def _compress(self, root, open_size, open_checksum):
compressed_xml = lzma.compress(tostring(root, encoding="utf-8"))
compressed_sha256 = sha256(compressed_xml)
compressed_size = len(compressed_xml)

local_path = os.path.join(self.scratch_dir, f"{compressed_sha256}-updateinfo.xml.xz")
with open(local_path, "wb+") as out:
out.write(compressed_xml)
return SectionMetadata(
open_checksum=open_checksum,
checksum=compressed_sha256,
checksum_type="sha256",
size=compressed_size,
open_size=open_size,
local_path=local_path,
location=f"repodata/{basename(local_path)}",
)


class ZCKUpdateInfoSection(UpdateInfoSection):
def _read(self):
return subprocess.check_output(["unzck", self.path, "--stdout"])

def _compress(self, root, open_size, open_checksum):
stripped_path = os.path.join(self.scratch_dir, "stripped.xml")
ElementTree(root).write(stripped_path)

# Now compress and take compressed checksum,size.
compressed_stripped_path = os.path.join(self.scratch_dir, "stripped.xml.zck")
subprocess.check_call(["zck", stripped_path, "-o", compressed_stripped_path])
sha256_compressed_out = subprocess.check_output(["sha256sum", compressed_stripped_path], text=True)
checksum = sha256_compressed_out.split()[0]
size = os.path.getsize(compressed_stripped_path)

# We also need some ZChunk specific metadata.
header_out = subprocess.check_output(["zck_read_header", compressed_stripped_path], text=True)
header_checksum, header_size = self._parse_zck_read_header(output=header_out)
final_path = os.path.join(self.scratch_dir, f"{checksum}-updateinfo.xml.zck")

# Rename it in the same format as the other sections.
os.rename(compressed_stripped_path, final_path)

return SectionMetadata(
size=size,
open_size=open_size,
header_size=header_size,
header_checksum=header_checksum,
open_checksum=open_checksum,
checksum=checksum,
local_path=final_path,
location=f"repodata/{os.path.basename(final_path)}",
)

def _parse_zck_read_header(self, output):
checksum_match = re.search("Header checksum: (?P<checksum>.*$)", output, flags=re.MULTILINE)
if not checksum_match:
raise ValueError(f"Failed to locate checksum in output: {output}")
size_match = re.search("Header size:(?P<size>.*$)", output, flags=re.MULTILINE)
if not size_match:
raise ValueError(f"Failed to locate size in output: {output}")
return checksum_match.groupdict()["checksum"], int(size_match.groupdict()["size"])


class RPMRepository:
Expand Down Expand Up @@ -176,17 +289,45 @@ def exists(self):
return True
return False

def get_repodata(self):
response = self._req(self.session.get, "repodata/repomd.xml")
repodata = self.parse_repomd(xml=safe_parse_xml(response.content))
def get_repodata(self, xml_bytes=None):
if xml_bytes is None:
xml_bytes = self._req(self.session.get, "repodata/repomd.xml").content
repodata = self.parse_repomd(xml=safe_parse_xml(xml_bytes))
return repodata

def strip_metadata(
self,
xml_bytes: bytes,
target_arches: Tuple[str],
scratch_dir: str,
):
sync_files, upload_files = [], []
Copy link
Contributor

@nicois nicois Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: sync_files is never modified, which makes mypy sad.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might see about adding mypy lint step and adding proper typing.

repomd_xml = safe_parse_xml(xml_bytes)
repodata = self.parse_repomd(xml=repomd_xml)
for key, section in repodata.items():
if key.startswith("updateinfo"):
with self._req(self.session.get, path=section.location, stream=True) as request:
local_path = download_repodata_section(section, request, destination_dir=scratch_dir)
update_section = UpdateInfoSection.from_path(path=local_path, scratch_dir=scratch_dir)
rewritten_section = update_section.strip_to_arches(arches=target_arches)
self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=rewritten_section, section_name=key)
upload_files.append(rewritten_section.local_path)
repomd_xml_path = join(scratch_dir, "repomd.xml")
with open(repomd_xml_path, "wb+") as out:
out.write(tostring(repomd_xml, encoding="utf-8"))
upload_files.append(repomd_xml_path)

return RepoDataFiles(
sync_files=sync_files,
upload_files=upload_files,
)

def create_snapshot(self, scratch_dir):
response = self._req(self.session.get, "repodata/repomd.xml")
repomd_xml = safe_parse_xml(response.content)
repodata = self.parse_repomd(xml=repomd_xml)
snapshot_primary = self._rewrite_primary(temp_dir=scratch_dir, primary=repodata["primary"])
self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=snapshot_primary)
self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=snapshot_primary, section_name="primary")
repomd_xml_path = join(scratch_dir, "repomd.xml")
with open(repomd_xml_path, "wb+") as out:
out.write(tostring(repomd_xml, encoding="utf-8"))
Expand All @@ -199,7 +340,7 @@ def create_snapshot(self, scratch_dir):
or section.location.endswith("modules.yaml.gz")
):
sync_files.append(urlparse(join(self.base_url, section.location)).path)
return Snapshot(
return RepoDataFiles(
sync_files=sync_files,
upload_files=[repomd_xml_path, snapshot_primary.local_path],
)
Expand Down Expand Up @@ -230,7 +371,7 @@ def _rewrite_primary(self, temp_dir, primary: RepodataSection):
with open(local_path, "wb+") as out:
out.write(compressed_xml)

return SnapshotPrimary(
return SectionMetadata(
open_checksum=open_checksum,
checksum=compressed_sha256,
checksum_type="sha256",
Expand All @@ -240,14 +381,9 @@ def _rewrite_primary(self, temp_dir, primary: RepodataSection):
location=f"repodata/{basename(local_path)}",
)

def _rewrite_repomd(self, repomd_xml: Element, snapshot: SnapshotPrimary):
for element in repomd_xml.findall("repo:*", namespaces=namespaces):
# We only support *.xml.gz files currently
if element.attrib.get("type", None) not in {"primary", "filelists", "other", "modules", "updateinfo"}:
repomd_xml.remove(element)

def _rewrite_repomd(self, repomd_xml: Element, snapshot: SectionMetadata, section_name: str):
# Rewrite the XML with correct metadata for our changed primary.xml
for element in repomd_xml.find("repo:data[@type='primary']", namespaces=namespaces):
for element in repomd_xml.find(f"repo:data[@type='{section_name}']", namespaces=namespaces):
_, _, key = element.tag.partition("}")
if key == "checksum":
element.text = snapshot.checksum
Expand All @@ -259,6 +395,10 @@ def _rewrite_repomd(self, repomd_xml: Element, snapshot: SnapshotPrimary):
element.text = str(snapshot.size)
elif key == "open-size":
element.text = str(snapshot.open_size)
elif key == "header-size" and snapshot.header_size is not None:
element.text = str(snapshot.header_size)
elif key == "header-checksum" and snapshot.header_checksum is not None:
element.text = snapshot.header_checksum

def _extract_package_list(self, primary: RepodataSection) -> PackageList:
with self._req(self.session.get, path=primary.location, stream=True) as request:
Expand Down
1 change: 0 additions & 1 deletion rpm_s3_mirror/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ def sync_packages(
def overwrite_repomd(self, repomd_xml_local_path, base_url):
url = f"{base_url}repodata/repomd.xml"
remote_path = urlparse(url).path
self.log.info("Overwriting repomd.xml")
self.put_object(repomd_xml_local_path, remote_path, cache_age=0)

def archive_repomd(self, base_url, location):
Expand Down