Skip to content

Commit

Permalink
Skip failed chunks (#27)
Browse files Browse the repository at this point in the history
- New hook added to downloadCutout._download() to collect failed chunks
- Retries exceeded behavior in downloadCutout._download() altered to default
  to continuoing the download. If callers of downloadCutout.download() want to
  stop the download, they can provide a failed_chunk_hook which raises
- Failed chunk collector class keeps failed sky locations and outputs them
  with astropy.table.Table.write()
- Cleanup documentation of downloadCutout._download and
  downloadCutout._download_chunk kwargs to also exist at top level in
  downloadCutout.download() for ease of discoverability.
- Cleanup: convert stats system to a context manager class to avoid proliferation
  of globals in download.py, and centralize stat pretty printing.

- Dedup rects from downloadCutout.py on their object_id
- Also read in the old failed_chunks.fits file at the start to
  handle the case where we are resuming a download that has already
  had to skip some chunks.
  • Loading branch information
mtauraso authored Aug 16, 2024
1 parent d92ffff commit 28c3eb4
Show file tree
Hide file tree
Showing 2 changed files with 274 additions and 114 deletions.
295 changes: 207 additions & 88 deletions src/fibad/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import Union

import numpy as np
from astropy.table import Table, hstack

import fibad.downloadCutout.downloadCutout as dC
Expand Down Expand Up @@ -157,102 +158,216 @@ def create_rects(locations: Table, offset: int = 0, default: dC.Rect = None) ->
args["tract"] = str(args["tract"])
# Sets the file name on the rect to be the object_id, also includes other rect fields
# which are interpolated at save time, and are native fields of dc.Rect.
#
# This name is also parsed by FailedChunkCollector.hook to identify the object_id, so don't
# change it without updating code there too.
args["name"] = f"{location['object_id']}_{{type}}_{{ra:.5f}}_{{dec:+.5f}}_{{tract}}_{{filter}}"
rect = dC.Rect.create(default=default, **args)
rects.append(rect)

return rects


stats = {
"request_duration": datetime.timedelta(), # Time from request sent to first byte from the server
"response_duration": datetime.timedelta(), # Total time spent recieving and processing a response
"request_size_bytes": 0, # Total size of all requests
"response_size_bytes": 0, # Total size of all responses
"snapshots": 0, # Number of fits snapshots downloaded
}
class DownloadStats:
"""Subsytem for keeping statistics on downloads:
Accumulates time spent on request, responses as well as sizes for same and number of snapshots
def _stat_accumulate(name: str, value: Union[int, datetime.timedelta]):
"""Accumulate a sum into the global stats dict
Parameters
----------
name : str
Name of the stat. Assumed to exist in the dict already.
value : Union[int, datetime.timedelta]
How much time or count to add to the stat
Can be used as a context manager for pretty printing.
"""
global stats
stats[name] += value

def __init__(self):
self.stats = {
"request_duration": datetime.timedelta(), # Time from request sent to first byte from the server
"response_duration": datetime.timedelta(), # Total time spent recieving and processing a response
"request_size_bytes": 0, # Total size of all requests
"response_size_bytes": 0, # Total size of all responses
"snapshots": 0, # Number of fits snapshots downloaded
}

def __enter__(self):
return self.hook

def __exit__(self, exc_type, exc_value, traceback):
print("") # Print a newline so the final stats line stays in the terminal and look pretty.

def _stat_accumulate(self, name: str, value: Union[int, datetime.timedelta]):
"""Accumulate a sum into the global stats dict
Parameters
----------
name : str
Name of the stat. Assumed to exist in the dict already.
value : Union[int, datetime.timedelta]
How much time or count to add to the stat
"""
self.stats[name] += value

def _print_stats(self):
"""Print the accumulated stats including bandwidth calculated from duration and sizes
This prints out multiple lines with `\r` at the end in order to create a continuously updating
line of text during download if your terminal supports it.
If you use this class as a context manager, the end of context will output a newline, perserving
the last line of stats in your terminal
"""
total_dur_s = (self.stats["request_duration"] + self.stats["response_duration"]).total_seconds()

resp_s = self.stats["response_duration"].total_seconds()
down_rate_mb_s = (self.stats["response_size_bytes"] / (1024**2)) / resp_s

req_s = self.stats["request_duration"].total_seconds()
up_rate_mb_s = (self.stats["request_size_bytes"] / (1024**2)) / req_s

snapshot_rate = self.stats["snapshots"] / total_dur_s

print(f"Stats: Duration: {total_dur_s:.2f} s, ", end="", flush=True)
print(f"Files: {self.stats['snapshots']}, ", end="", flush=True)
print(f"Upload: {up_rate_mb_s:.2f} MB/s, ", end="", flush=True)
print(f"Download: {down_rate_mb_s:.2f} MB/s, ", end="", flush=True)
print(f"File rate: {snapshot_rate:.2f} files/s", end="\r", flush=True)

def hook(
self,
request: urllib.request.Request,
request_start: datetime.datetime,
response_start: datetime.datetime,
response_size: int,
chunk_size: int,
):
"""This hook is called on each chunk of snapshots downloaded.
It is called immediately after the server has finished responding to the
request, so datetime.datetime.now() is the end moment of the request
Parameters
----------
request : urllib.request.Request
The request object relevant to this call
request_start : datetime.datetime
The moment the request was handed off to urllib.request.urlopen()
response_start : datetime.datetime
The moment there were bytes from the server to process
response_size : int
The size of the response from the server in bytes
chunk_size : int
The number of cutout files recieved in this request
"""

now = datetime.datetime.now()

self._stat_accumulate("request_duration", response_start - request_start)
self._stat_accumulate("response_duration", now - response_start)
self._stat_accumulate("request_size_bytes", len(request.data))
self._stat_accumulate("response_size_bytes", response_size)
self._stat_accumulate("snapshots", chunk_size)

self._print_stats()


class FailedChunkCollector:
"""Collection system for chunks of sky locations where the request for a chunk of cutouts failed.
Keeps track of all variable_fields plus object_id for failed chunks
save() dumps these chunks using astropy.table.Table.write()
def _print_stats():
"""Print the accumulated stats including bandwidth calculated from duration and sizes
This prints out multiple lines with `\r` at the end in order to create a continuously updating
line of text during download if your terminal supports it.
"""
global stats

total_dur_s = (stats["request_duration"] + stats["response_duration"]).total_seconds()

resp_s = stats["response_duration"].total_seconds()
down_rate_mb_s = (stats["response_size_bytes"] / (1024**2)) / resp_s

req_s = stats["request_duration"].total_seconds()
up_rate_mb_s = (stats["request_size_bytes"] / (1024**2)) / req_s

snapshot_rate = stats["snapshots"] / total_dur_s

print(
f"Stats: Duration: {total_dur_s:.2f} s, Files: {stats['snapshots']}, \
Upload: {up_rate_mb_s:.2f} MB/s, Download: {down_rate_mb_s:.2f} MB/s File rate: {snapshot_rate:.2f} files/s",
end="\r",
flush=True,
)


def request_hook(
request: urllib.request.Request,
request_start: datetime.datetime,
response_start: datetime.datetime,
response_size: int,
chunk_size: int,
):
"""This hook is called on each chunk of snapshots downloaded.
It is called immediately after the server has finished responding to the
request, so datetime.datetime.now() is the end moment of the request
Parameters
----------
request : urllib.request.Request
The request object relevant to this call
request_start : datetime.datetime
The moment the request was handed off to urllib.request.urlopen()
response_start : datetime.datetime
The moment there were bytes from the server to process
response_size : int
The size of the response from the server in bytes
chunk_size : int
The number of cutout files recieved in this request
"""

now = datetime.datetime.now()

_stat_accumulate("request_duration", response_start - request_start)
_stat_accumulate("response_duration", now - response_start)
_stat_accumulate("request_size_bytes", len(request.data))
_stat_accumulate("response_size_bytes", response_size)
_stat_accumulate("snapshots", chunk_size)

_print_stats()
def __init__(self, filepath: Path, **kwargs):
"""_summary_
Parameters
----------
filepath : Path
File to read in if we are resuming a download, and where to save the failed chunks after.
If the file does not exist yet an empty state is initialized.
**kwargs : dict
Keyword args passed to astropy.table.Table.read() and write() in the case that a file is used.
Should only be used to control file format, not read/write semantics
"""
self.__dict__.update({key: [] for key in variable_fields + ["object_id"]})
self.seen_object_ids = set()
self.filepath = filepath.resolve()
self.format_kwargs = kwargs

# If there is a failed chunk file from a previous run,
# Read it in to initialize us
if filepath.exists():
prev_failed_chunks = Table.read(filepath)
for key in variable_fields + ["object_id"]:
column_as_list = prev_failed_chunks[key].data.tolist()
self.__dict__[key] += column_as_list
print(self.object_id)

self.seen_object_ids = {id for id in self.object_id}

self.count = len(self.seen_object_ids)
print(f"Failed chunk handler initialized with {self.count} objects")

def __enter__(self):
return self.hook

def __exit__(self, exc_type, exc_value, traceback):
self.save()

def hook(self, rects: list[dC.Rect], exception: Exception, attempts: int):
"""Called when dc.Download fails to download a chunk of rects
Parameters
----------
rects : list[dC.Rect]
The list of rect objects that were requested from the server
exception : Exception
The exception that was thrown on the final attempt to request this chunk
attempts : int
The number of attempts that were made to request this chunk
"""

for rect in rects:
# Relies on the name format set up in create_rects to work properly
object_id = int(rect.name.split("_")[0])

if object_id not in self.seen_object_ids:
self.seen_object_ids.add(object_id)

self.object_id.append(object_id)

for key in variable_fields:
self.__dict__[key].append(rect.__dict__[key])

self.count += 1
print(f"Failed chunk handler processed {len(rects)} rects and is now of size {self.count}")

def save(self):
"""
Saves the current set of failed locations to the path specified.
If no failed locations were saved by the hook, this function does nothing.
"""
if self.count == 0:
return
else:
# convert our class-member-based representation to an astropy table.
for key in variable_fields + ["object_id"]:
self.__dict__[key] = np.array(self.__dict__[key])

missed = Table({key: self.__dict__[key] for key in variable_fields + ["object_id"]})

# note that the choice to do overwrite=True here and to read in the entire fits file in
# ___init__() is necessary because snapshots corresponding to the same object may cross
# chunk boundaries decided by dC.download.
#
# Since we are de-duplicating rects by object_id, we need to read in all rects from a prior
# run, and we therefore replace the file we were passed.
missed.write(self.filepath, overwrite=True, **self.format_kwargs)


def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], user, password):
"""Download cutouts to the given directory
Calls downloadCutout.download, so supports long lists of rects beyond the limits of the HSC API
Calls downloadCutout.download, so supports long lists of rects beyond the limits of the HSC web API
Parameters
----------
Expand All @@ -265,15 +380,19 @@ def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], us
password : string
Password for HSC's download service to use
"""

with working_directory(Path(cutout_dir)):
dC.download(
rects,
user=user,
password=password,
onmemory=False,
request_hook=request_hook,
resume=True,
chunksize=10,
)

print("") # Print a newline so the stats stay and look pretty.
with (
DownloadStats() as stats_hook,
FailedChunkCollector(Path("failed_locations.fits"), format="fits") as failed_chunk_hook,
):
dC.download(
rects,
user=user,
password=password,
onmemory=False,
request_hook=stats_hook,
failed_chunk_hook=failed_chunk_hook,
resume=True,
chunksize=10,
)
Loading

0 comments on commit 28c3eb4

Please sign in to comment.