Skip to content

Commit

Permalink
Add sleep to allow calmer copying. Added error handling so that the c…
Browse files Browse the repository at this point in the history
…opy processes don't die as easily
  • Loading branch information
shinzlet committed Oct 9, 2024
1 parent b797e28 commit 010d3e2
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
5 changes: 4 additions & 1 deletion softcopy/copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@


class AbstractCopier(ABC):
def __init__(self, source: Path, destination: Path, n_copy_procs: int, log: logging.Logger = LOG):
def __init__(
self, source: Path, destination: Path, n_copy_procs: int, sleep_time: float = 1.0, log: logging.Logger = LOG
):
self._source = source
self._destination = destination
self._n_copy_procs = n_copy_procs
self._log = log
self._sleep_time = sleep_time

@abstractmethod
def start(self):
Expand Down
10 changes: 8 additions & 2 deletions softcopy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@
@click.argument("targets_file", type=click.File("r"))
@click.option("--verbose", default=False, is_flag=True, help="print debug information while running")
@click.option("--nprocs", default=1, type=int, help="number of processes to use for copying")
def main(targets_file, verbose, nprocs):
@click.option(
"--sleep-time",
default=0.0,
type=float,
help="time to sleep in each copy process between copies. Can help mitigate down an overwhelemd system",
)
def main(targets_file, verbose, nprocs, sleep_time):
"""Tranfer data from source to destination as described in a yaml TARGETS_FILE. Uses low priority io to allow
data to be moved while the microscope is acquiring. The program is zarr-aware and can safely copy an archive
before it is finished being written to."""
Expand Down Expand Up @@ -50,7 +56,7 @@ def main(targets_file, verbose, nprocs):
# If the source ends with .ome.zarr, then infer ome mode for this entry:
is_ome = source.name.endswith(".ome.zarr")
copier_type = OMEZarrCopier if is_ome else ZarrCopier
copier = copier_type(source, destination, nprocs, LOG.getChild(f"Target {target_id}"))
copier = copier_type(source, destination, nprocs, sleep_time, LOG.getChild(f"Target {target_id}"))
copiers.append(copier)
copier.start()

Expand Down
8 changes: 5 additions & 3 deletions softcopy/ome_zarr_copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ class OMEZarrCopier(AbstractCopier):
_zarr_copier: ZarrCopier
_metadata_hashes: dict[str, str]

def __init__(self, source: Path, destination: Path, n_copy_procs: int, log: logging.Logger = LOG):
super().__init__(source, destination, n_copy_procs, log)
def __init__(
self, source: Path, destination: Path, n_copy_procs: int, sleep_time: float = 0, log: logging.Logger = LOG
):
super().__init__(source, destination, n_copy_procs, sleep_time, log)
image_0_source = source / "0"
image_0_destination = destination / "0"
self._zarr_copier = ZarrCopier(image_0_source, image_0_destination, n_copy_procs, log)
self._zarr_copier = ZarrCopier(image_0_source, image_0_destination, n_copy_procs, sleep_time, log)
self._metadata_hashes = {}

def start(self):
Expand Down
21 changes: 18 additions & 3 deletions softcopy/zarr_copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ class ZarrCopier(AbstractCopier):
_zarr_format: int
_dimension_separator: Literal[".", "/"]

def __init__(self, source: Path, destination: Path, n_copy_procs: int = 1, log: Logger = LOG):
super().__init__(source, destination, n_copy_procs, log)
def __init__(
self, source: Path, destination: Path, n_copy_procs: int = 1, sleep_time: float = 0, log: Logger = LOG
):
super().__init__(source, destination, n_copy_procs, sleep_time, log)

self._stop = Value("b", 0)
self._observation_finished = Value("b", 0)
Expand Down Expand Up @@ -129,6 +131,7 @@ def start(self):
self._dimension_separator,
self._zarr_format,
self._copy_count,
self._sleep_time,
),
)
proc.start()
Expand Down Expand Up @@ -330,13 +333,19 @@ def _copy_worker(
dimension_separator: Literal[".", "/"],
zarr_format: Literal[2, 3],
count: Synchronized,
sleep: float = 0,
):
srcfile = None
destfile = None

while stop.value == 0:
try:
time.sleep(sleep)
print(sleep)
data: PackedName = queue.get(timeout=1)
srcfile = data.get_path(files_nd, source, dimension_separator, zarr_format)
destfile = data.get_path(files_nd, destination, dimension_separator, zarr_format)
print(f"Copying {srcfile} to {destfile}")
# print(f"Copying {srcfile} to {destfile}")

# TODO: this is just for on-demand folder creation. it slows things down, so we should
# make it optional in the targets.yaml
Expand All @@ -353,6 +362,12 @@ def _copy_worker(
# a due to some other issue, although a 1s timeout is extremely unlikely if the queue is nonempty.
if queue_draining.value == 1 and queue.empty():
break
except PermissionError:
print(
f"Permission error while handling copying {srcfile} to {destfile}. This is rare and will be handled in the integrity check."
)
except Exception as e:
print(f"Unknown exception while copying {srcfile} to {destfile}: {e}")


class ZarrFileEventHandler(FileSystemEventHandler):
Expand Down

0 comments on commit 010d3e2

Please sign in to comment.