diff --git a/src/fibad/download.py b/src/fibad/download.py index 4e88998..ed691d0 100644 --- a/src/fibad/download.py +++ b/src/fibad/download.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Union +import numpy as np from astropy.table import Table, hstack import fibad.downloadCutout.downloadCutout as dC @@ -157,6 +158,9 @@ def create_rects(locations: Table, offset: int = 0, default: dC.Rect = None) -> args["tract"] = str(args["tract"]) # Sets the file name on the rect to be the object_id, also includes other rect fields # which are interpolated at save time, and are native fields of dc.Rect. + # + # This name is also parsed by FailedChunkCollector.hook to identify the object_id, so don't + # change it without updating code there too. args["name"] = f"{location['object_id']}_{{type}}_{{ra:.5f}}_{{dec:+.5f}}_{{tract}}_{{filter}}" rect = dC.Rect.create(default=default, **args) rects.append(rect) @@ -164,95 +168,206 @@ def create_rects(locations: Table, offset: int = 0, default: dC.Rect = None) -> return rects -stats = { - "request_duration": datetime.timedelta(), # Time from request sent to first byte from the server - "response_duration": datetime.timedelta(), # Total time spent recieving and processing a response - "request_size_bytes": 0, # Total size of all requests - "response_size_bytes": 0, # Total size of all responses - "snapshots": 0, # Number of fits snapshots downloaded -} +class DownloadStats: + """Subsytem for keeping statistics on downloads: + Accumulates time spent on request, responses as well as sizes for same and number of snapshots -def _stat_accumulate(name: str, value: Union[int, datetime.timedelta]): - """Accumulate a sum into the global stats dict - - Parameters - ---------- - name : str - Name of the stat. Assumed to exist in the dict already. - value : Union[int, datetime.timedelta] - How much time or count to add to the stat + Can be used as a context manager for pretty printing. """ - global stats - stats[name] += value + def __init__(self): + self.stats = { + "request_duration": datetime.timedelta(), # Time from request sent to first byte from the server + "response_duration": datetime.timedelta(), # Total time spent recieving and processing a response + "request_size_bytes": 0, # Total size of all requests + "response_size_bytes": 0, # Total size of all responses + "snapshots": 0, # Number of fits snapshots downloaded + } + + def __enter__(self): + return self.hook + + def __exit__(self, exc_type, exc_value, traceback): + print("") # Print a newline so the final stats line stays in the terminal and look pretty. + + def _stat_accumulate(self, name: str, value: Union[int, datetime.timedelta]): + """Accumulate a sum into the global stats dict + + Parameters + ---------- + name : str + Name of the stat. Assumed to exist in the dict already. + value : Union[int, datetime.timedelta] + How much time or count to add to the stat + """ + self.stats[name] += value + + def _print_stats(self): + """Print the accumulated stats including bandwidth calculated from duration and sizes + + This prints out multiple lines with `\r` at the end in order to create a continuously updating + line of text during download if your terminal supports it. + + If you use this class as a context manager, the end of context will output a newline, perserving + the last line of stats in your terminal + """ + total_dur_s = (self.stats["request_duration"] + self.stats["response_duration"]).total_seconds() + + resp_s = self.stats["response_duration"].total_seconds() + down_rate_mb_s = (self.stats["response_size_bytes"] / (1024**2)) / resp_s + + req_s = self.stats["request_duration"].total_seconds() + up_rate_mb_s = (self.stats["request_size_bytes"] / (1024**2)) / req_s + + snapshot_rate = self.stats["snapshots"] / total_dur_s + + print(f"Stats: Duration: {total_dur_s:.2f} s, ", end="", flush=True) + print(f"Files: {self.stats['snapshots']}, ", end="", flush=True) + print(f"Upload: {up_rate_mb_s:.2f} MB/s, ", end="", flush=True) + print(f"Download: {down_rate_mb_s:.2f} MB/s, ", end="", flush=True) + print(f"File rate: {snapshot_rate:.2f} files/s", end="\r", flush=True) + + def hook( + self, + request: urllib.request.Request, + request_start: datetime.datetime, + response_start: datetime.datetime, + response_size: int, + chunk_size: int, + ): + """This hook is called on each chunk of snapshots downloaded. + It is called immediately after the server has finished responding to the + request, so datetime.datetime.now() is the end moment of the request + + Parameters + ---------- + request : urllib.request.Request + The request object relevant to this call + request_start : datetime.datetime + The moment the request was handed off to urllib.request.urlopen() + response_start : datetime.datetime + The moment there were bytes from the server to process + response_size : int + The size of the response from the server in bytes + chunk_size : int + The number of cutout files recieved in this request + """ + + now = datetime.datetime.now() + + self._stat_accumulate("request_duration", response_start - request_start) + self._stat_accumulate("response_duration", now - response_start) + self._stat_accumulate("request_size_bytes", len(request.data)) + self._stat_accumulate("response_size_bytes", response_size) + self._stat_accumulate("snapshots", chunk_size) + + self._print_stats() + + +class FailedChunkCollector: + """Collection system for chunks of sky locations where the request for a chunk of cutouts failed. + + Keeps track of all variable_fields plus object_id for failed chunks + + save() dumps these chunks using astropy.table.Table.write() -def _print_stats(): - """Print the accumulated stats including bandwidth calculated from duration and sizes - - This prints out multiple lines with `\r` at the end in order to create a continuously updating - line of text during download if your terminal supports it. """ - global stats - - total_dur_s = (stats["request_duration"] + stats["response_duration"]).total_seconds() - - resp_s = stats["response_duration"].total_seconds() - down_rate_mb_s = (stats["response_size_bytes"] / (1024**2)) / resp_s - - req_s = stats["request_duration"].total_seconds() - up_rate_mb_s = (stats["request_size_bytes"] / (1024**2)) / req_s - - snapshot_rate = stats["snapshots"] / total_dur_s - - print( - f"Stats: Duration: {total_dur_s:.2f} s, Files: {stats['snapshots']}, \ -Upload: {up_rate_mb_s:.2f} MB/s, Download: {down_rate_mb_s:.2f} MB/s File rate: {snapshot_rate:.2f} files/s", - end="\r", - flush=True, - ) - -def request_hook( - request: urllib.request.Request, - request_start: datetime.datetime, - response_start: datetime.datetime, - response_size: int, - chunk_size: int, -): - """This hook is called on each chunk of snapshots downloaded. - It is called immediately after the server has finished responding to the - request, so datetime.datetime.now() is the end moment of the request - - Parameters - ---------- - request : urllib.request.Request - The request object relevant to this call - request_start : datetime.datetime - The moment the request was handed off to urllib.request.urlopen() - response_start : datetime.datetime - The moment there were bytes from the server to process - response_size : int - The size of the response from the server in bytes - chunk_size : int - The number of cutout files recieved in this request - """ - - now = datetime.datetime.now() - - _stat_accumulate("request_duration", response_start - request_start) - _stat_accumulate("response_duration", now - response_start) - _stat_accumulate("request_size_bytes", len(request.data)) - _stat_accumulate("response_size_bytes", response_size) - _stat_accumulate("snapshots", chunk_size) - - _print_stats() + def __init__(self, filepath: Path, **kwargs): + """_summary_ + + Parameters + ---------- + filepath : Path + File to read in if we are resuming a download, and where to save the failed chunks after. + If the file does not exist yet an empty state is initialized. + + **kwargs : dict + Keyword args passed to astropy.table.Table.read() and write() in the case that a file is used. + Should only be used to control file format, not read/write semantics + """ + self.__dict__.update({key: [] for key in variable_fields + ["object_id"]}) + self.seen_object_ids = set() + self.filepath = filepath.resolve() + self.format_kwargs = kwargs + + # If there is a failed chunk file from a previous run, + # Read it in to initialize us + if filepath.exists(): + prev_failed_chunks = Table.read(filepath) + for key in variable_fields + ["object_id"]: + column_as_list = prev_failed_chunks[key].data.tolist() + self.__dict__[key] += column_as_list + print(self.object_id) + + self.seen_object_ids = {id for id in self.object_id} + + self.count = len(self.seen_object_ids) + print(f"Failed chunk handler initialized with {self.count} objects") + + def __enter__(self): + return self.hook + + def __exit__(self, exc_type, exc_value, traceback): + self.save() + + def hook(self, rects: list[dC.Rect], exception: Exception, attempts: int): + """Called when dc.Download fails to download a chunk of rects + + Parameters + ---------- + rects : list[dC.Rect] + The list of rect objects that were requested from the server + exception : Exception + The exception that was thrown on the final attempt to request this chunk + attempts : int + The number of attempts that were made to request this chunk + + """ + + for rect in rects: + # Relies on the name format set up in create_rects to work properly + object_id = int(rect.name.split("_")[0]) + + if object_id not in self.seen_object_ids: + self.seen_object_ids.add(object_id) + + self.object_id.append(object_id) + + for key in variable_fields: + self.__dict__[key].append(rect.__dict__[key]) + + self.count += 1 + print(f"Failed chunk handler processed {len(rects)} rects and is now of size {self.count}") + + def save(self): + """ + Saves the current set of failed locations to the path specified. + If no failed locations were saved by the hook, this function does nothing. + """ + if self.count == 0: + return + else: + # convert our class-member-based representation to an astropy table. + for key in variable_fields + ["object_id"]: + self.__dict__[key] = np.array(self.__dict__[key]) + + missed = Table({key: self.__dict__[key] for key in variable_fields + ["object_id"]}) + + # note that the choice to do overwrite=True here and to read in the entire fits file in + # ___init__() is necessary because snapshots corresponding to the same object may cross + # chunk boundaries decided by dC.download. + # + # Since we are de-duplicating rects by object_id, we need to read in all rects from a prior + # run, and we therefore replace the file we were passed. + missed.write(self.filepath, overwrite=True, **self.format_kwargs) def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], user, password): """Download cutouts to the given directory - Calls downloadCutout.download, so supports long lists of rects beyond the limits of the HSC API + Calls downloadCutout.download, so supports long lists of rects beyond the limits of the HSC web API Parameters ---------- @@ -265,15 +380,19 @@ def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], us password : string Password for HSC's download service to use """ + with working_directory(Path(cutout_dir)): - dC.download( - rects, - user=user, - password=password, - onmemory=False, - request_hook=request_hook, - resume=True, - chunksize=10, - ) - - print("") # Print a newline so the stats stay and look pretty. + with ( + DownloadStats() as stats_hook, + FailedChunkCollector(Path("failed_locations.fits"), format="fits") as failed_chunk_hook, + ): + dC.download( + rects, + user=user, + password=password, + onmemory=False, + request_hook=stats_hook, + failed_chunk_hook=failed_chunk_hook, + resume=True, + chunksize=10, + ) diff --git a/src/fibad/downloadCutout/downloadCutout.py b/src/fibad/downloadCutout/downloadCutout.py index b4be40e..36947e7 100644 --- a/src/fibad/downloadCutout/downloadCutout.py +++ b/src/fibad/downloadCutout/downloadCutout.py @@ -982,28 +982,51 @@ def download( password: Optional[str] = None, *, onmemory: bool = True, - **kwargs_request, + **kwargs__download, ) -> Union[list, list[list], None]: """ Cut `rects` out of the sky. Parameters ---------- - rects - A `Rect` object or a list of `Rect` objects - user + rects : Union[Rect, list[Rect]] + A `Rect` object or a list of `Rect` objects to download + user : Optional[str] Username. If None, it will be asked interactively. - password + password : Optional [str] Password. If None, it will be asked interactively. - onmemory + onmemory, optional : bool Return `datalist` on memory. If `onmemory` is False, downloaded cut-outs are written to files. - kwargs_request - Additional keyword args are passed through to _download + kwargs__download: dict + Additional keyword args are passed through to _download and eventually to _download_chunk() and + urllib.request.urlopen. + + Some important (but entirely optional!) keyword args processed later in the download callstack are + listed below. Anything urllib.request.urlopen will accept is fair game too! + + resume : bool + Whether to attempt to resume an ongoing download from filesystem data in onmemory=False mode. + Default: False. See _download() for greater detail. + chunksize : int + The number of rects to include in a single http request. Default 990 rects. See _download() + for greater detail. + retries : int + The number of times to retry a failed http request before moving on or attempting to trigger + failed_chunk_handler. Default is 3. See _download() for greater detail. + retrywait : int + Base seconds to wait between retries. Acts as base for exponential retry formula of + base * (2**attempt_num) seconds. Default is 30 s. See _download() for greater detail. + failed_chunk_hook : Callable[[list[Rect], Exception, int], Any] + Hook which is called every time a chunk fails `retries` time. The arguments to the hook are + the rects in the failed chunk, the exception encountered while making the last request, and + the number of attempts. See _download() for greater detail. + request_hook : Callable[[urllib.request.Request, datetime.datetime, datetime.datetime, int, int] + Optional hook called on every request. See _download_chunk() for greater detail Returns ------- - datalist + datalist : Union[list, list[list], None] If onmemory == False, `datalist` is None. If onmemory == True: - If `rects` is a simple `Rect` object, @@ -1020,7 +1043,7 @@ def download( rects = [cast(Rect, rects)] rects = cast(list[Rect], rects) - ret = _download(rects, user, password, onmemory=onmemory, **kwargs_request) + ret = _download(rects, user, password, onmemory=onmemory, **kwargs__download) if isscalar and onmemory: ret = cast(list[list], ret) return ret[0] @@ -1038,7 +1061,8 @@ def _download( resume: bool = False, retries: int = 3, retrywait: int = 30, - **kwargs_request, + failed_chunk_hook: Optional[Callable[[list[Rect], Exception, int], Any]] = None, + **kwargs__download_chunk, ) -> Optional[list[list]]: """ Cut `rects` out of the sky. Implements configurable request size, retries and exponential backoff. @@ -1075,8 +1099,15 @@ def _download( where the retry time for attempts is calculated as retrywait * (2 ** attempt) seconds , with attempt=0 for the first wait. - kwargs_request: dict, optional - Additional keyword args are passed through to _download_chunk + failed_chunk_hook: Callable[[list[Rect], Exception, int], Any] + Hook which is called every time a chunk fails `retries` time. The arguments to the hook are + the rects in the failed chunk, the exception encountered while making the last request, and + the number of attempts. + + If this function raises, the entire download stops, but otherwise the download will ocntinue + + kwargs__download_chunk: dict, optional + Additional keyword args are passed through to _download_chunk() Returns ------- @@ -1127,21 +1158,31 @@ def _download( for attempt in range(0, retries): try: ret = _download_chunk( - exploded_rects[i : i + chunksize], user, password, onmemory=onmemory, **kwargs_request + exploded_rects[i : i + chunksize], + user, + password, + onmemory=onmemory, + **kwargs__download_chunk, ) break - except (Exception, KeyboardInterrupt) as chunk_exception: + except KeyboardInterrupt: + print("Keyboard Interrupt recieved.") + failed_rect_index = i + raise + except Exception as exception: # Humans count attempts from 1, this loop counts from zero. print( f"Attempt {attempt + 1} of {retries} to request rects [{i}:{i+chunksize}] has error:" ) - print(chunk_exception) - - # Reraise if the final attempt on this chunk has failed, or if we're being terminated - if attempt + 1 == retries or isinstance(chunk_exception, KeyboardInterrupt): - failed_rect_index = i - raise - + print(exception) + + # If the final attempt on this chunk fails, we try to call the failed_chunk_hook + if attempt + 1 == retries: + if failed_chunk_hook is not None: + rect_chunk = [rect for rect, idx in exploded_rects[i : i + chunksize]] + failed_chunk_hook(rects=rect_chunk, exception=exception, attempts=retries) + # If no hook provided, or if the provided hook doesn't raise, we continue the download + break # Otherwise do exponential backoff and try again else: backoff = retrywait * (2**attempt) @@ -1277,7 +1318,7 @@ def _download_chunk( request_hook: Optional[ Callable[[urllib.request.Request, datetime.datetime, datetime.datetime, int, int], Any] ], - **kwargs_request, + **kwargs_urlopen, ) -> Optional[list]: """ Cut `rects` out of the sky. @@ -1300,7 +1341,7 @@ def _download_chunk( request_hook Function that is called with the response of all requests made Intended to support bandwidth instrumentation. - kwargs_request + kwargs_urlopen Additional keyword args are passed through to urllib.request.urlopen Returns @@ -1342,11 +1383,11 @@ def _download_chunk( returnedlist = [] # Set timeout to 1 hour if no timout was set higher up - kwargs_request.setdefault("timeout", 3600) + kwargs_urlopen.setdefault("timeout", 3600) with get_connection_semaphore(): request_started = datetime.datetime.now() - with urllib.request.urlopen(req, **kwargs_request) as fin: + with urllib.request.urlopen(req, **kwargs_urlopen) as fin: response_started = datetime.datetime.now() response_size = 0 with tarfile.open(fileobj=fin, mode="r|") as tar: