Skip to content

Commit

Permalink
Merge pull request #2 from aiven/will-add-snapshot-functionality
Browse files Browse the repository at this point in the history
mirror: support snapshotting
  • Loading branch information
facetoe authored May 6, 2020
2 parents 6e04e39 + a8684c1 commit 504d61d
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 47 deletions.
38 changes: 33 additions & 5 deletions rpm_s3_mirror/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,50 @@
logging.getLogger("boto").setLevel(logging.WARNING)
logging.getLogger("botocore").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.basicConfig(level=logging.DEBUG)


def main():
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--config", help="Path to config file")
group.add_argument("--env", help="Read configuration from environment variables", action="store_true")
parser.add_argument(
"--verbose",
help="Verbose logging",
action="store_true",
default=False,
)

operation_group = parser.add_mutually_exclusive_group(required=False)
operation_group.add_argument(
"--snapshot",
help="Create a snapshot of current repository state",
action="store_true",
default=False,
)
operation_group.add_argument(
"--bootstrap",
help="Bootstrap an empty s3 mirror",
action="store_true",
default=False,
)
config_group = parser.add_mutually_exclusive_group(required=True)
config_group.add_argument("--config", help="Path to config file")
config_group.add_argument("--env", help="Read configuration from environment variables", action="store_true")

args = parser.parse_args()
if args.config:
config = JSONConfig(path=args.config)
elif args.env:
config = ENVConfig()

if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)

mirror = Mirror(config=config)
mirror.sync()
if args.snapshot:
mirror.snapshot()
else:
mirror.sync(bootstrap=args.bootstrap)


if __name__ == "__main__":
Expand Down
4 changes: 0 additions & 4 deletions rpm_s3_mirror/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
DEFAULTS = {
"scratch_dir": "/var/tmp/",
"max_workers": 4,
"bootstrap": False,
}


Expand All @@ -32,7 +31,6 @@ class Config:
scratch_dir = None
max_workers = None
upstream_repositories = None
bootstrap = None
_config = DEFAULTS

def __init__(self):
Expand Down Expand Up @@ -69,8 +67,6 @@ def _populate_required(self):
value = value.split(",")
elif key == "max_workers":
value = int(value)
elif key == "bootstrap":
value = value.lower() == "true"
self._config[key] = value


Expand Down
59 changes: 55 additions & 4 deletions rpm_s3_mirror/mirror.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Copyright (c) 2020 Aiven, Helsinki, Finland. https://aiven.io/

import logging
import uuid
from os.path import basename, join
from tempfile import TemporaryDirectory

import time
from collections import namedtuple
from urllib.parse import urlparse
Expand Down Expand Up @@ -31,14 +35,14 @@ def __init__(self, config):
)
self.repositories = [RPMRepository(base_url=url) for url in config.upstream_repositories]

def sync(self):
def sync(self, bootstrap):
start = time.monotonic()
for upstream_repository in self.repositories:
mirror_start = time.monotonic()
update_time = now()
upstream_metadata = upstream_repository.parse_metadata()

if self.config.bootstrap:
if bootstrap:
self.log.info("Bootstrapping repository: %s", upstream_repository.base_url)
new_packages = upstream_metadata.package_list
else:
Expand Down Expand Up @@ -66,11 +70,11 @@ def sync(self):
# we have completed bootstrapping and are just running a sync we don't benefit from checking as it
# slows things down for no good reason (we expect the packages to be there already and if not
# it is a bug of some kind).
skip_existing=self.config.bootstrap
skip_existing=bootstrap
)

# If we are not bootstrapping, store a manifest that describes the changes synced in this run
if not self.config.bootstrap:
if not bootstrap:
archive_location = self.s3.archive_repomd(
update_time=update_time,
base_url=upstream_repository.base_url,
Expand All @@ -97,6 +101,53 @@ def sync(self):
self.log.info("Synced %s repos in %s seconds", len(self.repositories), time.monotonic() - start)
self.stats.gauge(metric="s3_mirror_sync_seconds_total", value=time.monotonic() - start)

def snapshot(self):
snapshot_id = uuid.uuid4()
self.log.debug("Creating snapshot: %s", snapshot_id)
with TemporaryDirectory(prefix=self.config.scratch_dir) as temp_dir:
for upstream_repository in self.repositories:
try:
self._snapshot_repository(
snapshot_id=snapshot_id,
temp_dir=temp_dir,
upstream_repository=upstream_repository,
)
except Exception as e:
self._try_remove_snapshots(snapshot_id=snapshot_id)
raise Exception("Failed to snapshot repositories") from e
return snapshot_id

def _snapshot_repository(self, snapshot_id, temp_dir, upstream_repository):
self.log.debug("Snapshotting repository: %s", upstream_repository.base_url)
repository = RPMRepository(base_url=self._build_s3_url(upstream_repository))
snapshot = repository.create_snapshot(scratch_dir=temp_dir)
base_path = urlparse(repository.base_url).path[1:] # need to strip the leading slash
for file_path in snapshot.sync_files:
self.s3.copy_object(
source=file_path,
destination=self._snapshot_path(base_path, snapshot_id, file_path),
)
for file_path in snapshot.upload_files:
self.s3.put_object(
local_path=file_path,
key=self._snapshot_path(base_path, snapshot_id, file_path),
)

def _try_remove_snapshots(self, snapshot_id):
for repository in self.repositories:
snapshot_dir = self._snapshot_directory(base_path=repository.base_url, snapshot_id=snapshot_id)
try:
self.s3.delete_subdirectory(subdir=snapshot_dir)
self.log.debug("Deleted: %s", snapshot_dir)
except: # pylint: disable=bare-except
self.log.warning("Failed to remove snapshot: %s", snapshot_dir)

def _snapshot_path(self, base_path, snapshot_id, file_path):
return join(self._snapshot_directory(base_path, snapshot_id), "repodata", basename(file_path))

def _snapshot_directory(self, base_path, snapshot_id):
return join(base_path, "snapshots", str(snapshot_id))

def _build_s3_url(self, upstream_repository) -> str:
dest_path = urlparse(upstream_repository.base_url).path
# For some reason, s3 buckets in us-east-1 have a different URL structure to all the rest...
Expand Down
123 changes: 112 additions & 11 deletions rpm_s3_mirror/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
from typing import Iterator, Dict
from urllib.parse import urlparse

from lxml.etree import fromstring, Element # pylint: disable=no-name-in-module
from lxml.etree import fromstring, Element, tostring # pylint: disable=no-name-in-module
from lxml.etree import XMLParser # pylint: disable=no-name-in-module
from dateutil.parser import parse
from tempfile import TemporaryDirectory
import os
import shutil
from os.path import join
from os.path import join, basename

import gzip

from requests import Response

from rpm_s3_mirror.util import get_requests_session, validate_checksum
from rpm_s3_mirror.util import get_requests_session, validate_checksum, sha256

namespaces = {
"common": "http://linux.duke.edu/metadata/common",
Expand All @@ -26,9 +26,9 @@
}


def safe_parse_xml(xml_string: bytes) -> Element:
def safe_parse_xml(xml_bytes: bytes) -> Element:
safe_parser = XMLParser(resolve_entities=False)
return fromstring(xml_string, parser=safe_parser)
return fromstring(xml_bytes, parser=safe_parser)


def download_repodata_section(section, request, destination_dir) -> str:
Expand Down Expand Up @@ -108,8 +108,31 @@ def __iter__(self) -> Iterator[Package]:
yield Package(base_url=self.base_url, destination_path=self.path, package_element=package_element)


Metadata = namedtuple("Metadata", ["package_list", "repodata", "base_url"])
RepodataSection = namedtuple("RepodataSection", ["url", "location", "destination", "checksum_type", "checksum"])
Metadata = namedtuple("Metadata", [
"package_list",
"repodata",
"base_url",
])
RepodataSection = namedtuple("RepodataSection", [
"url",
"location",
"destination",
"checksum_type",
"checksum",
])
SnapshotPrimary = namedtuple(
"SnapshotPrimary", [
"open_checksum",
"checksum",
"checksum_type",
"size",
"open_size",
"local_path",
"location",
]
)

Snapshot = namedtuple("Snapshot", ["sync_files", "upload_files"])


class RPMRepository:
Expand All @@ -130,20 +153,98 @@ def has_updates(self, since: datetime) -> bool:
return parse(last_modified_header) > since

def parse_metadata(self) -> Metadata:
response = self._req(self.session.get, "repodata/repomd.xml")
repodata = self.parse_repomd(xml=response.content)
repodata = self.get_repodata()
package_list = self._extract_package_list(primary=repodata["primary"])
return Metadata(package_list=package_list, repodata=repodata, base_url=self.base_url)

def get_repodata(self):
response = self._req(self.session.get, "repodata/repomd.xml")
repodata = self.parse_repomd(xml=safe_parse_xml(response.content))
return repodata

def create_snapshot(self, scratch_dir):
response = self._req(self.session.get, "repodata/repomd.xml")
repomd_xml = safe_parse_xml(response.content)
repodata = self.parse_repomd(xml=repomd_xml)
snapshot_primary = self._rewrite_primary(temp_dir=scratch_dir, primary=repodata["primary"])
self._rewrite_repomd(repomd_xml=repomd_xml, snapshot=snapshot_primary)
repomd_xml_path = join(scratch_dir, "repomd.xml")
with open(repomd_xml_path, "wb+") as out:
out.write(tostring(repomd_xml))

sync_files = []
for section in repodata.values():
if section.location.endswith(".xml.gz"):
sync_files.append(urlparse(join(self.base_url, section.location)).path)
return Snapshot(
sync_files=sync_files,
upload_files=[repomd_xml_path, snapshot_primary.local_path],
)

def _rewrite_primary(self, temp_dir, primary: RepodataSection):
with self._req(self.session.get, path=primary.location, stream=True) as request:
local_path = download_repodata_section(primary, request, temp_dir)
with gzip.open(local_path) as f:
file_bytes = f.read()
primary_xml = safe_parse_xml(xml_bytes=file_bytes)
open_checksum = sha256(content=file_bytes)
open_size = len(file_bytes)
for package_element in primary_xml:
location = package_element.find("common:location", namespaces=namespaces)
# As our S3 structure is https://<base-repo>/snapshots/<snapshot-uuid>/, and the "location"
# attribute of the packages in primary.xml references a path relative to the root like:
# "Packages/v/vim.rmp", we need to rewrite this location to point to back a few directories
# from our snapshot root.
relative_location = f"../../{location.get('href')}"
location.set("href", relative_location)

# Now we have rewritten our XML file the checksums no longer match, so calculate some new ones (along with
# size etc from above).
compressed_xml = gzip.compress(tostring(primary_xml))
compressed_sha256 = sha256(compressed_xml)
compressed_size = len(compressed_xml)
local_path = f"{temp_dir}/{compressed_sha256}-primary.xml.gz"
with open(local_path, "wb+") as out:
out.write(compressed_xml)

return SnapshotPrimary(
open_checksum=open_checksum,
checksum=compressed_sha256,
checksum_type="sha256",
size=compressed_size,
open_size=open_size,
local_path=local_path,
location=f"repodata/{basename(local_path)}"
)

def _rewrite_repomd(self, repomd_xml, snapshot: SnapshotPrimary):
for element in repomd_xml.findall(f"repo:*", namespaces=namespaces):
# We only support *.xml.gz files currently
if element.attrib.get("type", None) not in {"primary", "filelists", "other"}:
repomd_xml.remove(element)

# Rewrite the XML with correct metadata for our changed primary.xml
for element in repomd_xml.find(f"repo:data[@type='primary']", namespaces=namespaces):
_, _, key = element.tag.partition("}")
if key == "checksum":
element.text = snapshot.checksum
elif key == "open-checksum":
element.text = snapshot.open_checksum
elif key == "location":
element.set("href", snapshot.location)
elif key == "size":
element.text = str(snapshot.size)
elif key == "open-size":
element.text = str(snapshot.open_size)

def _extract_package_list(self, primary: RepodataSection) -> PackageList:
with self._req(self.session.get, path=primary.location, stream=True) as request:
with TemporaryDirectory(prefix="/var/tmp/") as temp_dir:
local_path = download_repodata_section(primary, request, temp_dir)
with gzip.open(local_path) as f:
return PackageList(base_url=self.base_url, packages_xml=f.read())

def parse_repomd(self, xml: bytes) -> Dict[str, RepodataSection]:
xml = safe_parse_xml(xml)
def parse_repomd(self, xml: Element) -> Dict[str, RepodataSection]:
sections = {}
for data_element in xml.findall(f"repo:data", namespaces=namespaces):
section_type = data_element.attrib["type"]
Expand Down
Loading

0 comments on commit 504d61d

Please sign in to comment.