Skip to content

Commit

Permalink
Retry and resume functionality for downloader (#17)
Browse files Browse the repository at this point in the history
Implementation basic of retry and resume within downloadCutout.py.

Retry means that when a request fails we will try again (defaults to 3 attempts). This is intended to address connection drops and the like. We use configurable exponential backoff to avoid a thundering herd if load from our client is causing some backend failure.

Resume describes the situation where the download fails for unrecoverable reasons (HSC infra goes down) or is terminated (e.g. downloads only occur at night). This generates a resume_download.toml file in the download directory, which allows the exact same download to resume from the chunk that was in progress when the interruption occurred. This resume functionality is off by default to preserve the download() interface used by downloadCutout.py's CLI which does not support resume.
  • Loading branch information
mtauraso authored Aug 14, 2024
1 parent 1615ee6 commit 7bfb7db
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 14 deletions.
14 changes: 11 additions & 3 deletions src/fibad/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,20 @@ def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], us
The rects we would like to download
cutout_dir : Union[str, Path]
The directory to put the files
user : _type_
user : string
Username for HSC's download service to use
password : _type_
password : string
Password for HSC's download service to use
"""
with working_directory(Path(cutout_dir)):
dC.download(rects, user=user, password=password, onmemory=True, request_hook=request_hook)
dC.download(
rects,
user=user,
password=password,
onmemory=False,
request_hook=request_hook,
resume=True,
chunksize=10,
)

print("") # Print a newline so the stats stay and look pretty.
197 changes: 186 additions & 11 deletions src/fibad/downloadCutout/downloadCutout.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import datetime
import errno
import getpass
import hashlib
import io
import json
import math
import os
import re
Expand All @@ -18,8 +20,11 @@
import urllib.request
import urllib.response
from collections.abc import Generator
from pathlib import Path
from typing import IO, Any, Callable, Optional, Union, cast

import toml

__all__ = []


Expand Down Expand Up @@ -470,6 +475,15 @@ def explode(self) -> list["Rect"]:
return [Rect.create(default=self)]


class RectEncoder(json.JSONEncoder):
# TODO this needs to be implemented on a subclass of JSONEncoder
# And it needs to do something very particular in order to work.
def default(self, obj):
if isinstance(obj, Rect):
return obj.__dict__
return json.JSONEncoder.default(self, obj)


@export
def read_rects(
file: Union[str, IO], default: Optional[Rect] = None, type: Optional[str] = None
Expand Down Expand Up @@ -1021,10 +1035,19 @@ def _download(
*,
onmemory: bool,
chunksize: int = 990,
resume: bool = False,
retries: int = 3,
retrywait: int = 30,
**kwargs_request,
) -> Optional[list[list]]:
"""
Cut `rects` out of the sky.
Cut `rects` out of the sky. Implements configurable request size, retries and exponential backoff.
When `onmemory == False` this function saves snapshots to the current working directory.
onmemory has no default and must be specified.
When `onmemory == False ` and the current working directory contains resume data, and
`resume = True` (default: False), then the download will be resumed.
Parameters
----------
Expand All @@ -1034,12 +1057,25 @@ def _download(
Username. If None, it will be asked interactively.
password
Password. If None, it will be asked interactively.
onmemory
onmemory: bool
Return `datalist` on memory.
If `onmemory` is False, downloaded cut-outs are written to files.
chunksize
If `onmemory` is False, downloaded cut-outs are written to files in the current working directory.
chunksize: int, optional
Number of cutout lines to pack into a single request. Defaults to 990 if unspecified.
kwargs_request
resume: bool, optional
When `onmemory == True`, uses resume data in the current working directory continue a failed download.
Noop when onmemory=False. Defaults to False if unspecified.
Passing resume=True is safe when no resume data exists.
_download() will simply start downloading from the beginning of rects.
retries: int, optional
Number of attempts to make to fetch each chunk. Defaults to 3 if unspecified.
retrywait: int, optional
Base number of seconds to wait between retries. Retry waits are computed using an exponential backoff
where the retry time for attempts is calculated as retrywait * (2 ** attempt) seconds , with attempt=0
for the first wait.
kwargs_request: dict, optional
Additional keyword args are passed through to _download_chunk
Returns
Expand Down Expand Up @@ -1078,21 +1114,160 @@ def _download(

datalist: list[tuple[int, dict, bytes]] = []

for i in range(0, len(exploded_rects), chunksize):
ret = _download_chunk(
exploded_rects[i : i + chunksize], user, password, onmemory=onmemory, **kwargs_request
)
if onmemory:
datalist += cast(list, ret)
failed_rect_index = 0

start_rect_index = 0
if not onmemory and resume:
start_rect_index = _read_resume_data(exploded_rects)

try:
# Chunk loop
for i in range(start_rect_index, len(exploded_rects), chunksize):
# Retry loop
for attempt in range(0, retries):
try:
ret = _download_chunk(
exploded_rects[i : i + chunksize], user, password, onmemory=onmemory, **kwargs_request
)
break
except (Exception, KeyboardInterrupt) as chunk_exception:
# Humans count attempts from 1, this loop counts from zero.
print(
f"Attempt {attempt + 1} of {retries} to request rects [{i}:{i+chunksize}] has error:"
)
print(chunk_exception)

# Reraise if the final attempt on this chunk has failed, or if we're being terminated
if attempt + 1 == retries or isinstance(chunk_exception, KeyboardInterrupt):
failed_rect_index = i
raise

# Otherwise do exponential backoff and try again
else:
backoff = retrywait * (2**attempt)
if backoff != 0:
print(f"Retrying in {backoff} seconds... ", end="", flush=True)
time.sleep(backoff)
print("Retrying now.")
continue
if onmemory:
datalist += cast(list, ret)

# Retries have failed or we are being killed
except (Exception, KeyboardInterrupt):
# Write out resume data if we're saving to filesystem and there's been any progress
if (not onmemory) and failed_rect_index != 0:
_write_resume_data(exploded_rects, failed_rect_index)

# Reraise so exception can reach top level, very important for KeyboardInterrupt
raise

if onmemory:
returnedlist: list[list[tuple[dict, bytes]]] = [[] for i in range(len(rects))]
for index, metadata, data in datalist:
returnedlist[index].append((metadata, data))

# On success we remove resume data
if not onmemory and resume and os.path.exists(resume_data_filename):
os.remove(resume_data_filename)

return returnedlist if onmemory else None


# TODO multiple connections resume data will need to be instanced by connection
# That will require some interface so the connection number can make it here
resume_data_filename = "resume_download.toml"


def _read_resume_data(rects: list[Rect]) -> int:
"""Read the resume data from the current working directory
Parameters
----------
rects : list[Rect]
List of rects we intend to process, needed for checksum to ensure the download we are resuming
is the same one that output resume data.
Returns
-------
Returns an integer specifying what index in the rect list the resumeing download should start.
If no resume data is found, 0 is returned.
Raises
------
RuntimeError
"No resume data found in <path>" when the resume file could not be found in cwd.
RuntimeError
"Resume data in <path> corrupt" when the file is not a toml file containing keys
'checksum' and 'start_rect_index'
RuntimeError
"Resume data failed checksum ..." when the rect list has changed from when the resume data file was
written
"""
# Load resume data so we start at the appropriate chunk.
if not os.path.exists(resume_data_filename):
return 0

print(f"Resuming failed download from {Path.cwd() / resume_data_filename}")
with open(resume_data_filename, "r") as f:
resumedata = toml.load(f)
if "start_rect_index" not in resumedata or "checksum" not in resumedata:
raise RuntimeError(f"Resume data in {Path.cwd() / resume_data_filename} corrupt.")

start_rect_index = resumedata["start_rect_index"]

checksum = _calc_rect_list_checksum(rects[0:start_rect_index])
if resumedata["checksum"] != checksum:
message = f"""Resume data failed checksum.
Has the list of sky locations changed? If so, remove {Path.cwd() / resume_data_filename}"""
raise RuntimeError(message)

return start_rect_index


def _write_resume_data(rects: list[Rect], failed_rect_index: int) -> None:
"""Write resume data
Parameters
----------
rects : list[Rect]
List of Rects we were intending to download, needed to write the checksum into the resume data
failed_rect_index : int
The index of the beginning of the first chunk of rects to fail.
"""
print("\nWriting resume data")
# Output enough information that we can retry/resume assuming same dir but,
# whatever was DL'ed in current chunk is corrupt
resumedata = {
"start_rect_index": failed_rect_index,
"checksum": _calc_rect_list_checksum(rects[0:failed_rect_index]),
}
with open(resume_data_filename, mode="w") as f:
toml.dump(resumedata, f)
print("Done writing resume data")


def _calc_rect_list_checksum(rects: list[Rect]) -> str:
"""
Calculate a sha256 checksum of a list of Rects for the purpose of identifying tha list in the context of
a resumed download
The method is to dump the list of Rects to JSON and sha256 the JSON.
Parameters
----------
rects : list[Rect]
List of rects that we will checksum
Returns
-------
str
Sha256 hex digest of the list of rects.
"""
byte_string = json.dumps(rects, sort_keys=True, cls=RectEncoder).encode("utf-8")
return hashlib.sha256(byte_string).hexdigest()


def _download_chunk(
rects: list[tuple[Rect, Any]],
user: str,
Expand Down

0 comments on commit 7bfb7db

Please sign in to comment.