From 8e30c466e1fd0f4c80db05ae5f0f90008339a283 Mon Sep 17 00:00:00 2001 From: matthewfallan <31744230+matthewfallan@users.noreply.github.com> Date: Sat, 13 Jan 2024 20:33:26 -0500 Subject: [PATCH] Make +addclust back up the original files, to avoid data loss in case adding more clusters fails --- src/seismicrna/cluster/update.py | 162 +++++++++++++--------- src/seismicrna/core/io/file.py | 43 ++++++ src/seismicrna/core/io/tests/__init__.py | 22 +++ src/seismicrna/core/io/tests/file_test.py | 157 +++++++++++++++++++++ src/seismicrna/core/path.py | 19 ++- src/seismicrna/relate/main.py | 63 ++++----- 6 files changed, 360 insertions(+), 106 deletions(-) create mode 100644 src/seismicrna/core/io/tests/__init__.py create mode 100644 src/seismicrna/core/io/tests/file_test.py diff --git a/src/seismicrna/cluster/update.py b/src/seismicrna/cluster/update.py index 40be6e6b..69a99d17 100644 --- a/src/seismicrna/cluster/update.py +++ b/src/seismicrna/cluster/update.py @@ -1,6 +1,7 @@ from datetime import datetime from logging import getLogger from pathlib import Path +from shutil import rmtree from click import command @@ -17,11 +18,13 @@ docdef, arg_input_path, opt_max_clusters, + opt_temp_dir, + opt_keep_temp, opt_brotli_level, opt_parallel, opt_max_procs) -from ..core.io import recast_file_path -from ..core.parallel import as_list_of_tuples, dispatch +from ..core.io import make_temp_backup, recast_file_path, restore_temp_backup +from ..core.parallel import as_list_of_tuples, dispatch, lock_temp_dir from ..core.report import (calc_dt_minutes, Field, TimeBeganF, @@ -60,6 +63,8 @@ def update_field(report: ClustReport, def add_orders(cluster_report_file: Path, max_order: int, *, + temp_dir: Path, + keep_temp: bool, brotli_level: int, n_procs: int): """ Add orders to an existing report and dataset. """ @@ -84,83 +89,105 @@ def add_orders(cluster_report_file: Path, ClustReport, MaskReport)) uniq_reads = UniqReads.from_dataset(mask_dataset) - # Run clustering for every new order. - orders = list(run_orders(uniq_reads, - original_max_order + 1, - max_order, - n_runs, - prev_bic=prev_bic, - min_iter=min_iter, - max_iter=max_iter, - conv_thresh=conv_thresh, - n_procs=n_procs, - top=mask_dataset.top)) - if orders: - best_order = find_best_order(orders) - # Update the observed and expected counts for each best run. - try: + # Make a temporary backup of the original results. + backup_dir = make_temp_backup(cluster_report_file.parent, + mask_dataset.top, + temp_dir) + try: + # Run clustering for every new order. + orders = list(run_orders(uniq_reads, + original_max_order + 1, + max_order, + n_runs, + prev_bic=prev_bic, + min_iter=min_iter, + max_iter=max_iter, + conv_thresh=conv_thresh, + n_procs=n_procs, + top=mask_dataset.top)) + if orders: + best_order = find_best_order(orders) + # Update the expected counts for each best run. update_log_counts(orders, top=mask_dataset.top, sample=mask_dataset.sample, ref=mask_dataset.ref, sect=mask_dataset.sect) - except Exception as error: - logger.error(f"Failed to update counts: {error}") - # Output the cluster memberships in batches of reads. - cluster_dataset = load_cluster_dataset(cluster_report_file) - checksums = update_batches(cluster_dataset, orders, brotli_level) - ended = datetime.now() - new_report = ClustReport( - sample=cluster_dataset.sample, - ref=cluster_dataset.ref, - sect=cluster_dataset.sect, - n_uniq_reads=uniq_reads.num_uniq, - max_order=max_order, - num_runs=n_runs, - min_iter=min_iter, - max_iter=max_iter, - conv_thresh=conv_thresh, - checksums={ClustBatchIO.btype(): checksums}, - n_batches=len(checksums), - converged=update_field(original_report, - ClustsConvF, - orders, - "converged"), - log_likes=update_field(original_report, - ClustsLogLikesF, - orders, - "log_likes"), - clusts_rmsds=update_field(original_report, - ClustsRMSDsF, - orders, - "rmsds"), - clusts_meanr=update_field(original_report, - ClustsMeanRsF, - orders, - "meanr"), - bic=update_field(original_report, - ClustsBicF, - orders, - "bic"), - best_order=best_order, - began=original_began, - ended=ended, - taken=taken + calc_dt_minutes(new_began, ended), + # Output the cluster memberships in batches of reads. + cluster_dataset = load_cluster_dataset(cluster_report_file) + checksums = update_batches(cluster_dataset, + orders, + brotli_level) + ended = datetime.now() + new_report = ClustReport( + sample=cluster_dataset.sample, + ref=cluster_dataset.ref, + sect=cluster_dataset.sect, + n_uniq_reads=uniq_reads.num_uniq, + max_order=max_order, + num_runs=n_runs, + min_iter=min_iter, + max_iter=max_iter, + conv_thresh=conv_thresh, + checksums={ClustBatchIO.btype(): checksums}, + n_batches=len(checksums), + converged=update_field(original_report, + ClustsConvF, + orders, + "converged"), + log_likes=update_field(original_report, + ClustsLogLikesF, + orders, + "log_likes"), + clusts_rmsds=update_field(original_report, + ClustsRMSDsF, + orders, + "rmsds"), + clusts_meanr=update_field(original_report, + ClustsMeanRsF, + orders, + "meanr"), + bic=update_field(original_report, + ClustsBicF, + orders, + "bic"), + best_order=best_order, + began=original_began, + ended=ended, + taken=taken + calc_dt_minutes(new_began, ended), + ) + new_report.save(cluster_dataset.top, force=True) + else: + best_order = original_best_order + n_new = best_order - original_best_order + logger.info( + f"Ended adding {n_new} cluster(s) to {cluster_report_file}" ) - new_report.save(cluster_dataset.top, force=True) - else: - best_order = original_best_order - n_new = best_order - original_best_order - logger.info(f"Ended adding {n_new} cluster(s) to {cluster_report_file}") + except Exception: + # If any error happens, then restore the original results + # (as if this function never ran) and re-raise the error. + restore_temp_backup(cluster_report_file.parent, + mask_dataset.top, + temp_dir) + raise + finally: + # Always delete the backup unless keep_temp is True. + if not keep_temp: + rmtree(backup_dir, ignore_errors=True) + logger.info(f"Deleted backup of {cluster_report_file.parent} " + f"in {backup_dir}") else: logger.warning(f"New maximum order ({max_order}) is not greater than " f"original ({original_max_order}): nothing to update") return cluster_report_file +@lock_temp_dir @docdef.auto() def run(input_path: tuple[str, ...], *, max_clusters: int, + temp_dir: str, + keep_temp: bool, brotli_level: int, max_procs: int, parallel: bool) -> list[Path]: @@ -179,7 +206,9 @@ def run(input_path: tuple[str, ...], *, pass_n_procs=True, args=as_list_of_tuples(report_files), kwargs=dict(max_order=max_clusters, - brotli_level=brotli_level)) + brotli_level=brotli_level, + temp_dir=Path(temp_dir), + keep_temp=keep_temp)) params = [ @@ -187,6 +216,9 @@ def run(input_path: tuple[str, ...], *, arg_input_path, # Clustering options opt_max_clusters, + # Backup + opt_temp_dir, + opt_keep_temp, # Compression opt_brotli_level, # Parallelization diff --git a/src/seismicrna/core/io/file.py b/src/seismicrna/core/io/file.py index 21c21ba2..2e7fb148 100644 --- a/src/seismicrna/core/io/file.py +++ b/src/seismicrna/core/io/file.py @@ -2,6 +2,7 @@ from functools import cached_property from logging import getLogger from pathlib import Path +from shutil import copy2, copytree, move, rmtree from typing import Any, Iterable from .brickle import load_brickle, save_brickle @@ -167,6 +168,48 @@ def recast_file_path(input_path: Path, output_type.seg_types(), **(override | output_type.auto_fields())) + +def make_temp_backup(source_path: str | Path, + out_dir: str | Path, + temp_dir: str | Path): + """ Make a temporary backup of `source_path` in `temp_dir`. """ + # Determine the path of the backup. + backup_path = path.transpath(temp_dir, out_dir, source_path) + # Copy the source files to the backup. + if source_path.is_dir(): + if backup_path.exists(): + rmtree(backup_path) + logger.debug(f"Deleted existing backup in {backup_path}") + copytree(source_path, backup_path) + logger.debug(f"Copied directory {source_path} to {backup_path}") + else: + backup_path.parent.mkdir(parents=True, exist_ok=True) + copy2(source_path, backup_path) + logger.debug(f"Copied file {source_path} to {backup_path}") + logger.info(f"Backed up {source_path} to {backup_path}") + return backup_path + + +def restore_temp_backup(source_path: str | Path, + out_dir: str | Path, + temp_dir: str | Path): + """ Restore the original files from a temporary backup. """ + # Determine the path of the backup. + backup_path = path.transpath(temp_dir, out_dir, source_path) + # Replace the source files with the backup. + if backup_path.is_dir(): + if source_path.exists(): + rmtree(source_path) + logger.debug(f"Deleted original source in {source_path}") + move(backup_path, source_path.parent) + logger.debug(f"Moved directory {backup_path} to {source_path}") + else: + source_path.parent.mkdir(parents=True, exist_ok=True) + move(backup_path, source_path) + logger.debug(f"Moved file {backup_path} to {source_path}") + logger.info(f"Restored {source_path} from backup in {backup_path}") + return backup_path + ######################################################################## # # # © Copyright 2024, the Rouskin Lab. # diff --git a/src/seismicrna/core/io/tests/__init__.py b/src/seismicrna/core/io/tests/__init__.py new file mode 100644 index 00000000..2719edc3 --- /dev/null +++ b/src/seismicrna/core/io/tests/__init__.py @@ -0,0 +1,22 @@ + + +######################################################################## +# # +# © Copyright 2024, the Rouskin Lab. # +# # +# This file is part of SEISMIC-RNA. # +# # +# SEISMIC-RNA is free software; you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation; either version 3 of the License, or # +# (at your option) any later version. # +# # +# SEISMIC-RNA is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANT- # +# ABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # +# Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with SEISMIC-RNA; if not, see . # +# # +######################################################################## diff --git a/src/seismicrna/core/io/tests/file_test.py b/src/seismicrna/core/io/tests/file_test.py new file mode 100644 index 00000000..73fdda3e --- /dev/null +++ b/src/seismicrna/core/io/tests/file_test.py @@ -0,0 +1,157 @@ +import unittest as ut +from pathlib import Path +from shutil import rmtree +from tempfile import mkdtemp, mkstemp + +import numpy as np + +from seismicrna.core.io.file import (make_temp_backup, + restore_temp_backup) + +rng = np.random.default_rng() + +FILE_SIZE = 64 +FILE_SUFFIX = ".file" + + +class SourceBackupDirs(object): + + def __init__(self): + self._source_dir = None + self._backup_dir = None + + @property + def source_dir(self) -> Path: + if self._source_dir is None: + raise TypeError("Source directory is not set") + return self._source_dir + + @property + def backup_dir(self) -> Path: + if self._backup_dir is None: + raise TypeError("Backup directory is not set") + return self._backup_dir + + def __enter__(self): + # Make temporary source and backup directories. + self._source_dir = Path(mkdtemp()).resolve(strict=True) + self._backup_dir = Path(mkdtemp()).resolve(strict=True) + return self.source_dir, self.backup_dir + + def __exit__(self, exc_type, exc_val, exc_tb): + # Delete the temporary source and backup directories. + rmtree(self.source_dir, ignore_errors=True) + rmtree(self.backup_dir, ignore_errors=True) + self._source_dir = None + self._backup_dir = None + + +class TestMakeTempBackup(ut.TestCase): + + @staticmethod + def make_file(in_dir: Path): + _, path = mkstemp(dir=in_dir, suffix=FILE_SUFFIX) + with open(path, "wb") as f: + f.write(content := rng.bytes(FILE_SIZE)) + return Path(path), content + + @staticmethod + def make_dir(in_dir: Path): + return Path(mkdtemp(dir=in_dir)) + + def test_backup_file(self): + with SourceBackupDirs() as (source_dir, backup_dir): + source_file, content = self.make_file(source_dir) + backup_file = make_temp_backup(source_file, + source_dir, + backup_dir) + self.assertEqual(backup_file.parent, backup_dir) + self.assertEqual(backup_file.name, source_file.name) + with open(backup_file, "rb") as f: + self.assertEqual(f.read(), content) + + def test_backup_dir(self): + with SourceBackupDirs() as (source_dir, backup_dir): + source_subdir = self.make_dir(source_dir) + source_file1, content1 = self.make_file(source_subdir) + source_file2, content2 = self.make_file(source_subdir) + backup_subdir = make_temp_backup(source_subdir, + source_dir, + backup_dir) + self.assertEqual(backup_subdir, + backup_dir.joinpath(source_subdir.name)) + backup_file1 = backup_subdir.joinpath(source_file1.name) + backup_file2 = backup_subdir.joinpath(source_file2.name) + with open(backup_file1, "rb") as f: + self.assertEqual(f.read(), content1) + with open(backup_file2, "rb") as f: + self.assertEqual(f.read(), content2) + + def test_restore_file(self): + with SourceBackupDirs() as (source_dir, backup_dir): + source_file, content = self.make_file(source_dir) + backup_file = make_temp_backup(source_file, + source_dir, + backup_dir) + self.assertTrue(source_file.is_file()) + source_file.unlink() + self.assertFalse(source_file.is_file()) + self.assertEqual(restore_temp_backup(source_file, + source_dir, + backup_dir), + backup_file) + self.assertTrue(source_file.is_file()) + with open(source_file, "rb") as f: + self.assertEqual(f.read(), content) + + def test_restore_dir(self): + with SourceBackupDirs() as (source_dir, backup_dir): + source_subdir = self.make_dir(source_dir) + source_file1, content1 = self.make_file(source_subdir) + source_file2, content2 = self.make_file(source_subdir) + backup_subdir = make_temp_backup(source_subdir, + source_dir, + backup_dir) + self.assertTrue(source_subdir.is_dir()) + self.assertTrue(source_file1.is_file()) + self.assertTrue(source_file2.is_file()) + rmtree(source_subdir) + self.assertFalse(source_subdir.is_dir()) + self.assertFalse(source_file1.is_file()) + self.assertFalse(source_file2.is_file()) + self.assertEqual(restore_temp_backup(source_subdir, + source_dir, + backup_dir), + backup_subdir) + self.assertTrue(source_subdir.is_dir()) + self.assertTrue(source_file1.is_file()) + self.assertTrue(source_file2.is_file()) + with open(source_file1, "rb") as f: + self.assertEqual(f.read(), content1) + with open(source_file2, "rb") as f: + self.assertEqual(f.read(), content2) + + +if __name__ == "__main__": + ut.main() + +######################################################################## +# # +# © Copyright 2024, the Rouskin Lab. # +# # +# This file is part of SEISMIC-RNA. # +# # +# SEISMIC-RNA is free software; you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation; either version 3 of the License, or # +# (at your option) any later version. # +# # +# SEISMIC-RNA is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANT- # +# ABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # +# Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with SEISMIC-RNA; if not, see . # +# # +######################################################################## diff --git a/src/seismicrna/core/path.py b/src/seismicrna/core/path.py index 1387e392..4898cef4 100644 --- a/src/seismicrna/core/path.py +++ b/src/seismicrna/core/path.py @@ -688,8 +688,7 @@ def deduplicated(paths: Iterable[str | pl.Path]): def parse(path: str | pl.Path, /, *segment_types: Segment): - """ Return the fields of a path given as a `str` based on the - segment types. """ + """ Return the fields of a path based on the segment types. """ return create_path_type(*segment_types).parse(path) @@ -814,10 +813,10 @@ def transpath(to_dir: str | pl.Path, from_dir: str | pl.Path, path: str | pl.Path, strict: bool = False): - """ Return the path that would result by moving `path` from `indir` - to `outdir` (but do not actually move the path on the file system). - This function does not require that any of the given paths exist - unless `strict` is True. + """ Return the path that would be produced by moving `path` from + `from_dir` to `to_dir` (but do not actually move the path on the + file system). This function does not require that any of the given + paths exist, unless `strict` is True. Parameters ---------- @@ -851,10 +850,10 @@ def transpath(to_dir: str | pl.Path, def transpaths(to_dir: str | pl.Path, *paths: str | pl.Path, strict: bool = False): - """ Return all paths that would result by moving the paths in `path` - from their longest common sub-path to `outdir` (but do not actually - move the paths on the file system). This function does not require - that any of the given paths exist unless `strict` is True. + """ Return all paths that would be produced by moving all paths in + `paths` from their longest common sub-path to `to_dir` (but do not + actually move the paths on the file system). This function does not + require that any of the given paths exist, unless `strict` is True. Parameters ---------- diff --git a/src/seismicrna/relate/main.py b/src/seismicrna/relate/main.py index 40db8564..af04bbf3 100644 --- a/src/seismicrna/relate/main.py +++ b/src/seismicrna/relate/main.py @@ -35,37 +35,6 @@ logger = getLogger(__name__) -# Parameters for command line interface -params = [ - # Input files - arg_fasta, - arg_input_path, - # Output directories - opt_out_dir, - opt_temp_dir, - # SAM options - opt_min_mapq, - opt_phred_enc, - opt_min_phred, - # Relate options - opt_min_reads, - opt_batch_size, - opt_ambrel, - opt_brotli_level, - # Parallelization - opt_max_procs, - opt_parallel, - # File generation - opt_force, - opt_keep_temp, -] - - -@command(CMD_REL, params=params) -def cli(**kwargs): - """ Compute relationships between references and aligned reads. """ - return run(**kwargs) - @lock_temp_dir @docdef.auto() @@ -103,6 +72,38 @@ def run(fasta: str, force=force, keep_temp=keep_temp) + +# Parameters for command line interface +params = [ + # Input files + arg_fasta, + arg_input_path, + # Output directories + opt_out_dir, + opt_temp_dir, + # SAM options + opt_min_mapq, + opt_phred_enc, + opt_min_phred, + # Relate options + opt_min_reads, + opt_batch_size, + opt_ambrel, + opt_brotli_level, + # Parallelization + opt_max_procs, + opt_parallel, + # File generation + opt_force, + opt_keep_temp, +] + + +@command(CMD_REL, params=params) +def cli(**kwargs): + """ Compute relationships between references and aligned reads. """ + return run(**kwargs) + ######################################################################## # # # © Copyright 2024, the Rouskin Lab. #