Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bootstrap automatically #19

Merged
merged 3 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions rpm_s3_mirror/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

import argparse
import logging
from time import sleep

import sys
import time

from rpm_s3_mirror.config import JSONConfig, ENVConfig
from rpm_s3_mirror.mirror import Mirror
Expand All @@ -16,8 +15,11 @@

def run_forever(mirror, poll_seconds):
while True:
mirror.sync(bootstrap=False)
sleep(poll_seconds)
start_time = time.monotonic()
mirror.sync()
sleep_time = poll_seconds + start_time - time.monotonic()
if sleep_time > 0:
time.sleep(sleep_time)


def main():
Expand All @@ -36,12 +38,6 @@ def main():
operation_group.add_argument(
"--sync-snapshot", help="Sync snapshot metadata from one s3 mirror to another", default=False, type=str
)
operation_group.add_argument(
"--bootstrap",
help="Bootstrap an empty s3 mirror",
action="store_true",
default=False,
)

config_group = parser.add_mutually_exclusive_group(required=True)
config_group.add_argument("--config", help="Path to config file")
Expand All @@ -53,8 +49,8 @@ def main():
)

args = parser.parse_args()
if args.poll_seconds and (args.snapshot or args.sync_snapshot or args.bootstrap):
print("--poll-seconds and [--snapshot|--sync-snapshot|--bootstrap] are mutually exclusive", file=sys.stderr)
if args.poll_seconds and (args.snapshot or args.sync_snapshot):
print("--poll-seconds and [--snapshot|--sync-snapshot] are mutually exclusive", file=sys.stderr)
sys.exit(1)

if args.config:
Expand All @@ -76,7 +72,7 @@ def main():
if args.poll_seconds:
run_forever(mirror, args.poll_seconds)
else:
success = mirror.sync(bootstrap=args.bootstrap)
success = mirror.sync()
sys.exit(0 if success else 1)


Expand Down
12 changes: 7 additions & 5 deletions rpm_s3_mirror/mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ def __init__(self, config):
)
self.repositories = [RPMRepository(base_url=url) for url in config.upstream_repositories]

def sync(self, bootstrap=False):
def sync(self):
""" Sync upstream repositories to our s3 mirror """
start = time.monotonic()
sync_success = True
for upstream_repository in self.repositories:
try:
self._sync_repository(bootstrap, upstream_repository)
self._sync_repository(upstream_repository)
self.stats.gauge(metric="s3_mirror_failed", value=0, tags={"repo": upstream_repository.path})
except Exception as e: # pylint: disable=broad-except
self.log.exception("Failed to sync: %s", upstream_repository.base_url, exc_info=e)
Expand All @@ -62,18 +62,20 @@ def sync(self, bootstrap=False):
self.stats.gauge(metric="s3_mirror_sync_seconds_total", value=time.monotonic() - start)
return sync_success

def _sync_repository(self, bootstrap, upstream_repository):
def _sync_repository(self, upstream_repository):
mirror_start = time.monotonic()
update_time = now()
upstream_metadata = upstream_repository.parse_metadata()

mirror_repository = RPMRepository(base_url=self._build_s3_url(upstream_repository.base_url))
bootstrap = False if mirror_repository.exists() else True
if bootstrap:
self.log.info("Bootstrapping repository: %s", upstream_repository.base_url)
new_packages = upstream_metadata.package_list
else:
self.log.info("Syncing repository: %s", upstream_repository.base_url)
# If the upstream repomd.xml file was updated after the last time we updated our
# mirror repomd.xml file then there is probably some work to do.
mirror_repository = RPMRepository(base_url=self._build_s3_url(upstream_repository.base_url))
last_check_time = self.s3.repomd_update_time(base_url=mirror_repository.base_url)
if not upstream_repository.has_updates(since=last_check_time):
self.log.info("Skipping repository with no updates since: %s", last_check_time)
Expand Down Expand Up @@ -125,7 +127,7 @@ def _sync_repository(self, bootstrap, upstream_repository):
)

def _build_manifest_location(self, base_url):
sync_directory = now(microsecond=True).replace(tzinfo=None).isoformat()
sync_directory = now(microsecond=True).replace(tzinfo=None).isoformat() # like '2021-10-25T01:57:04'
manifest_location = join(urlparse(base_url).path, MANIFEST_DIRECTORY, sync_directory)
return manifest_location

Expand Down
16 changes: 14 additions & 2 deletions rpm_s3_mirror/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import gzip

from requests import Response
from requests.exceptions import HTTPError

from rpm_s3_mirror.util import get_requests_session, validate_checksum, sha256

Expand Down Expand Up @@ -143,6 +144,8 @@ def __init__(self, base_url: str):
base_url += "/"
self.base_url = base_url
self.path = urlparse(base_url).path
if self.path.find("//") != -1:
raise ValueError("Consecutive slashes detected in URL path")
self.session = get_requests_session()

def has_updates(self, since: datetime) -> bool:
Expand All @@ -157,6 +160,14 @@ def parse_metadata(self) -> Metadata:
package_list = self._extract_package_list(primary=repodata["primary"])
return Metadata(package_list=package_list, repodata=repodata, base_url=self.base_url)

def exists(self):
# S3 will respond with HTTP 403 (Access Denied) if the object does not exist when we tried to retreive it using HTTP GET (public access)
# Also handle 404 for "normal" web server behaviour.
response = self._req(self.session.get, "repodata/repomd.xml", acceptible_status_code={200,403,404})
if response.status_code == 200:
return True
return False

def get_repodata(self):
response = self._req(self.session.get, "repodata/repomd.xml")
repodata = self.parse_repomd(xml=safe_parse_xml(response.content))
Expand Down Expand Up @@ -273,9 +284,10 @@ def parse_repomd(self, xml: Element) -> Dict[str, RepodataSection]:
)
return sections

def _req(self, method, path, *, json=None, params=None, **kwargs) -> Response:
def _req(self, method, path, *, json=None, params=None, acceptible_status_code=None, **kwargs) -> Response:
acceptible_status_code = acceptible_status_code if acceptible_status_code else {200}
url = f"{self.base_url}{path}"
response = method(url, json=json, params=params, **kwargs)
if response.status_code != 200:
if response.status_code not in acceptible_status_code:
response.raise_for_status()
return response