From f71f776171ad0c30ad6eee477d2504f37672cc49 Mon Sep 17 00:00:00 2001 From: Seth Hinz Date: Thu, 10 Oct 2024 10:23:27 -0700 Subject: [PATCH] Added support for waiting for the zarr archive to be created before starting the copy. --- softcopy/main.py | 11 +++++++++-- softcopy/ome_zarr_copier.py | 12 ++++++++++-- softcopy/zarr_copier.py | 12 ++++++++++-- softcopy/zarr_utils.py | 15 +++++++++++++++ 4 files changed, 44 insertions(+), 6 deletions(-) diff --git a/softcopy/main.py b/softcopy/main.py index 66288df..1a895b6 100644 --- a/softcopy/main.py +++ b/softcopy/main.py @@ -26,7 +26,12 @@ type=float, help="time to sleep in each copy process between copies. Can help mitigate down an overwhelemd system", ) -def main(targets_file, verbose, nprocs, sleep_time): +@click.option( + "--wait-for-source", + default=True, + help="If the source does not exist when softcopy is started, wait for it to appear. If false, softcopy will crash if the source does not exist", +) +def main(targets_file, verbose, nprocs, sleep_time, wait_for_source): """Tranfer data from source to destination as described in a yaml TARGETS_FILE. Uses low priority io to allow data to be moved while the microscope is acquiring. The program is zarr-aware and can safely copy an archive before it is finished being written to.""" @@ -56,7 +61,9 @@ def main(targets_file, verbose, nprocs, sleep_time): # If the source ends with .ome.zarr, then infer ome mode for this entry: is_ome = source.name.endswith(".ome.zarr") copier_type = OMEZarrCopier if is_ome else ZarrCopier - copier = copier_type(source, destination, nprocs, sleep_time, LOG.getChild(f"Target {target_id}")) + copier = copier_type( + source, destination, nprocs, sleep_time, wait_for_source, LOG.getChild(f"Target {target_id}") + ) copiers.append(copier) copier.start() diff --git a/softcopy/ome_zarr_copier.py b/softcopy/ome_zarr_copier.py index 207c3a6..718e0ab 100644 --- a/softcopy/ome_zarr_copier.py +++ b/softcopy/ome_zarr_copier.py @@ -29,12 +29,20 @@ class OMEZarrCopier(AbstractCopier): _metadata_hashes: dict[str, str] def __init__( - self, source: Path, destination: Path, n_copy_procs: int, sleep_time: float = 0, log: logging.Logger = LOG + self, + source: Path, + destination: Path, + n_copy_procs: int, + sleep_time: float = 0, + wait_for_source: bool = True, + log: logging.Logger = LOG, ): super().__init__(source, destination, n_copy_procs, sleep_time, log) image_0_source = source / "0" image_0_destination = destination / "0" - self._zarr_copier = ZarrCopier(image_0_source, image_0_destination, n_copy_procs, sleep_time, log) + self._zarr_copier = ZarrCopier( + image_0_source, image_0_destination, n_copy_procs, sleep_time, wait_for_source, log + ) self._metadata_hashes = {} def start(self): diff --git a/softcopy/zarr_copier.py b/softcopy/zarr_copier.py index 05d004b..1e80b9d 100644 --- a/softcopy/zarr_copier.py +++ b/softcopy/zarr_copier.py @@ -46,7 +46,13 @@ class ZarrCopier(AbstractCopier): _dimension_separator: Literal[".", "/"] def __init__( - self, source: Path, destination: Path, n_copy_procs: int = 1, sleep_time: float = 0, log: Logger = LOG + self, + source: Path, + destination: Path, + n_copy_procs: int = 1, + sleep_time: float = 0, + wait_for_source: bool = True, + log: Logger = LOG, ): super().__init__(source, destination, n_copy_procs, sleep_time, log) @@ -55,6 +61,9 @@ def __init__( self._queue_draining = Value("b", 0) self._copy_count = Value("i", 0) + if wait_for_source: + zarr_utils.wait_for_source(source, log) + self._zarr_format = zarr_utils.identify_zarr_format(source, log) if self._zarr_format is None: log.critical(f"Could not identify zarr version of source {source}.") @@ -341,7 +350,6 @@ def _copy_worker( while stop.value == 0: try: time.sleep(sleep) - print(sleep) data: PackedName = queue.get(timeout=1) srcfile = data.get_path(files_nd, source, dimension_separator, zarr_format) destfile = data.get_path(files_nd, destination, dimension_separator, zarr_format) diff --git a/softcopy/zarr_utils.py b/softcopy/zarr_utils.py index fb15b86..17fa8f8 100644 --- a/softcopy/zarr_utils.py +++ b/softcopy/zarr_utils.py @@ -2,6 +2,7 @@ import json import logging import operator +import time from logging import Logger from pathlib import Path from typing import Literal, Optional @@ -52,3 +53,17 @@ def dtype_string_zarr2(dtype): dtype_str = f"{endianness}{dtype_kind}{bytesize}" return dtype_str + + +def wait_for_source(source: Path, log: Logger = LOG): + """ + Wait for the source to be created. This is useful if you want to start softcopy and a writer at similar times and don't know + how long the writer will take to create the source. This looks for the presence of a metadata file in the source. + """ + zarr_format = identify_zarr_format(source, log) + while zarr_format is None: + time.sleep(1.0) + zarr_format = identify_zarr_format(source, log) + log.debug(f"Waiting for source {source} to be created") + + log.debug(f"Source {source} is available")