Skip to content

Commit

Permalink
Added support for waiting for the zarr archive to be created before s…
Browse files Browse the repository at this point in the history
…tarting the copy.
  • Loading branch information
shinzlet committed Oct 10, 2024
1 parent 010d3e2 commit f71f776
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 6 deletions.
11 changes: 9 additions & 2 deletions softcopy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
type=float,
help="time to sleep in each copy process between copies. Can help mitigate down an overwhelemd system",
)
def main(targets_file, verbose, nprocs, sleep_time):
@click.option(
"--wait-for-source",
default=True,
help="If the source does not exist when softcopy is started, wait for it to appear. If false, softcopy will crash if the source does not exist",
)
def main(targets_file, verbose, nprocs, sleep_time, wait_for_source):
"""Tranfer data from source to destination as described in a yaml TARGETS_FILE. Uses low priority io to allow
data to be moved while the microscope is acquiring. The program is zarr-aware and can safely copy an archive
before it is finished being written to."""
Expand Down Expand Up @@ -56,7 +61,9 @@ def main(targets_file, verbose, nprocs, sleep_time):
# If the source ends with .ome.zarr, then infer ome mode for this entry:
is_ome = source.name.endswith(".ome.zarr")
copier_type = OMEZarrCopier if is_ome else ZarrCopier
copier = copier_type(source, destination, nprocs, sleep_time, LOG.getChild(f"Target {target_id}"))
copier = copier_type(
source, destination, nprocs, sleep_time, wait_for_source, LOG.getChild(f"Target {target_id}")
)
copiers.append(copier)
copier.start()

Expand Down
12 changes: 10 additions & 2 deletions softcopy/ome_zarr_copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,20 @@ class OMEZarrCopier(AbstractCopier):
_metadata_hashes: dict[str, str]

def __init__(
self, source: Path, destination: Path, n_copy_procs: int, sleep_time: float = 0, log: logging.Logger = LOG
self,
source: Path,
destination: Path,
n_copy_procs: int,
sleep_time: float = 0,
wait_for_source: bool = True,
log: logging.Logger = LOG,
):
super().__init__(source, destination, n_copy_procs, sleep_time, log)
image_0_source = source / "0"
image_0_destination = destination / "0"
self._zarr_copier = ZarrCopier(image_0_source, image_0_destination, n_copy_procs, sleep_time, log)
self._zarr_copier = ZarrCopier(
image_0_source, image_0_destination, n_copy_procs, sleep_time, wait_for_source, log
)
self._metadata_hashes = {}

def start(self):
Expand Down
12 changes: 10 additions & 2 deletions softcopy/zarr_copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ class ZarrCopier(AbstractCopier):
_dimension_separator: Literal[".", "/"]

def __init__(
self, source: Path, destination: Path, n_copy_procs: int = 1, sleep_time: float = 0, log: Logger = LOG
self,
source: Path,
destination: Path,
n_copy_procs: int = 1,
sleep_time: float = 0,
wait_for_source: bool = True,
log: Logger = LOG,
):
super().__init__(source, destination, n_copy_procs, sleep_time, log)

Expand All @@ -55,6 +61,9 @@ def __init__(
self._queue_draining = Value("b", 0)
self._copy_count = Value("i", 0)

if wait_for_source:
zarr_utils.wait_for_source(source, log)

self._zarr_format = zarr_utils.identify_zarr_format(source, log)
if self._zarr_format is None:
log.critical(f"Could not identify zarr version of source {source}.")
Expand Down Expand Up @@ -341,7 +350,6 @@ def _copy_worker(
while stop.value == 0:
try:
time.sleep(sleep)
print(sleep)
data: PackedName = queue.get(timeout=1)
srcfile = data.get_path(files_nd, source, dimension_separator, zarr_format)
destfile = data.get_path(files_nd, destination, dimension_separator, zarr_format)
Expand Down
15 changes: 15 additions & 0 deletions softcopy/zarr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import operator
import time
from logging import Logger
from pathlib import Path
from typing import Literal, Optional
Expand Down Expand Up @@ -52,3 +53,17 @@ def dtype_string_zarr2(dtype):

dtype_str = f"{endianness}{dtype_kind}{bytesize}"
return dtype_str


def wait_for_source(source: Path, log: Logger = LOG):
"""
Wait for the source to be created. This is useful if you want to start softcopy and a writer at similar times and don't know
how long the writer will take to create the source. This looks for the presence of a metadata file in the source.
"""
zarr_format = identify_zarr_format(source, log)
while zarr_format is None:
time.sleep(1.0)
zarr_format = identify_zarr_format(source, log)
log.debug(f"Waiting for source {source} to be created")

log.debug(f"Source {source} is available")

0 comments on commit f71f776

Please sign in to comment.