diff --git a/src/seismicrna/align/write.py b/src/seismicrna/align/write.py index dfaef94d..5aaf653e 100644 --- a/src/seismicrna/align/write.py +++ b/src/seismicrna/align/write.py @@ -185,6 +185,15 @@ def fq_pipeline(fq_inp: FastqUnit, reads_align = run_xamgen(fq_inp if fq_cut is None else fq_cut, xam_whole, index_pfx=bowtie2_index, + temp_dir=path.builddir( + path.SampSeg, + path.CmdSeg, + path.StepSeg, + top=temp_dir, + sample=sample, + cmd=path.CMD_ALIGN_DIR, + step=path.STEP_ALIGN_MAP + ), n_procs=n_procs, bt2_local=bt2_local, bt2_discordant=bt2_discordant, @@ -280,7 +289,20 @@ def fq_pipeline(fq_inp: FastqUnit, ext=(path.CRAM_EXT if cram else path.BAM_EXT)) if xam_ref.parent != xams_out_dir: raise path.PathValueError(f"{xam_ref} is not in {xams_out_dir}") - exp_kwargs = dict(ref=ref, header=ref_headers[ref], n_procs=n_procs) + exp_kwargs = dict( + ref=ref, + header=ref_headers[ref], + temp_dir=path.builddir(path.SampSeg, + path.CmdSeg, + path.StepSeg, + path.RefSeg, + top=temp_dir, + sample=sample, + cmd=path.CMD_ALIGN_DIR, + step=path.STEP_ALIGN_SORT, + ref=ref), + n_procs=n_procs + ) if cram: # Write the one reference sequence to a temporary FASTA. # Do NOT use overwrite=True because if refs_file is a diff --git a/src/seismicrna/align/xamops.py b/src/seismicrna/align/xamops.py index f1a71124..b4d53da1 100644 --- a/src/seismicrna/align/xamops.py +++ b/src/seismicrna/align/xamops.py @@ -240,8 +240,9 @@ def parse_match(m: re.Match[str]): def xamgen_cmd(fq_inp: FastqUnit, bam_out: Path | None, *, - min_mapq: int, - n_procs: int, + temp_dir: Path | None = None, + min_mapq: int | None = None, + n_procs: int = 1, **kwargs): """ Wrap alignment and post-processing into one pipeline. """ bowtie2_step = bowtie2_cmd(fq_inp, None, n_procs=n_procs, **kwargs) @@ -261,7 +262,10 @@ def xamgen_cmd(fq_inp: FastqUnit, flags_exc=flags_exc, bam=True, n_procs=1) - sort_xam_step = sort_xam_cmd(None, bam_out, n_procs=1) + sort_xam_step = sort_xam_cmd(None, + bam_out, + temp_dir=temp_dir, + n_procs=1) return cmds_to_pipe([bowtie2_step, view_xam_step, sort_xam_step]) @@ -275,6 +279,7 @@ def export_cmd(xam_in: Path | None, ref: str, header: str, ref_file: Path | None = None, + temp_dir: Path | None = None, n_procs: int = 1): """ Wrap selecting, sorting, and exporting into one pipeline. """ # Pipe the header line. @@ -290,7 +295,11 @@ def export_cmd(xam_in: Path | None, # Merge the one header line and the reads for the reference. merge_step = cmds_to_subshell([echo_step, ref_step]) # Sort reads by name so that mates are adjacent. - sort_step = sort_xam_cmd(None, None, name=True, n_procs=n_procs) + sort_step = sort_xam_cmd(None, + None, + temp_dir=temp_dir, + name=True, + n_procs=n_procs) # Export the reads into a XAM file. export_step = view_xam_cmd(None, xam_out, diff --git a/src/seismicrna/cluster/em.py b/src/seismicrna/cluster/em.py index b2d43dd6..32dcd855 100644 --- a/src/seismicrna/cluster/em.py +++ b/src/seismicrna/cluster/em.py @@ -444,8 +444,8 @@ def run(self, props_seed: int | None = None, resps_seed: int | None = None): logger.warning(f"{self}, iteration {self.iter} returned a " f"smaller log likelihood ({self.log_like}) than " f"the previous iteration ({self.log_like_prev})") - elif (self.delta_log_like < self.conv_thresh - and self.iter >= self.min_iter): + if (self.delta_log_like < self.conv_thresh + and self.iter >= self.min_iter): # Converge if the increase in log likelihood is # smaller than the convergence cutoff and at least # the minimum number of iterations have been run. diff --git a/src/seismicrna/core/batch/muts.py b/src/seismicrna/core/batch/muts.py index 1ea654ac..f1c6418c 100644 --- a/src/seismicrna/core/batch/muts.py +++ b/src/seismicrna/core/batch/muts.py @@ -86,11 +86,11 @@ def pos_nums(self) -> np.ndarray: @property def mid5s(self): - return self._mid5s + return self._mid5s # compatibility @property def mid3s(self): - return self._mid3s + return self._mid3s # compatibility @property def read_weights(self) -> pd.DataFrame | None: diff --git a/src/seismicrna/core/extern/shell.py b/src/seismicrna/core/extern/shell.py index d1c10a61..1a6d2295 100644 --- a/src/seismicrna/core/extern/shell.py +++ b/src/seismicrna/core/extern/shell.py @@ -16,6 +16,7 @@ ECHO_CMD = "echo" FASTQC_CMD = "fastqc" RNASTRUCTURE_FOLD_CMD = "Fold" +RNASTRUCTURE_FOLD_SMP_CMD = "Fold-smp" GUNZIP_CMD = "gunzip" SAMTOOLS_CMD = "samtools" WORD_COUNT_CMD = "wc" diff --git a/src/seismicrna/core/mu/unbias.py b/src/seismicrna/core/mu/unbias.py index bdc2502b..3774cbb6 100644 --- a/src/seismicrna/core/mu/unbias.py +++ b/src/seismicrna/core/mu/unbias.py @@ -235,6 +235,37 @@ def _triu_norm(a: np.ndarray): return a / _triu_sum(a) +@jit() +def _triu_mul(factor1: np.ndarray, factor2: np.ndarray): + """ Multiply the upper triangles of `numer` and `denom`. + + This function is meant to be called by another function that has + validated the arguments; hence, this function makes assumptions: + + - `factor1` has at least 2 dimensions. + - The first two dimensions of `factor1` have equal length. + - `factor2` has the same first 2 dimensions as `factor1`. + - `factor1` and `factor2` can be broadcast to each other. + + Parameters + ---------- + factor1: np.ndarray + Factor 1. + factor2: np.ndarray + Factor 2. + + Returns + ------- + np.ndarray + Product of the upper triangles; values below the main diagonal + are undefined. + """ + product = np.empty(np.broadcast_shapes(factor1.shape, factor2.shape)) + for j in range(factor1.shape[0]): + product[j, j:] = factor1[j, j:] * factor2[j, j:] + return product + + @jit() def _triu_div(numer: np.ndarray, denom: np.ndarray): """ Divide the upper triangles of `numer` and `denom`. @@ -545,7 +576,9 @@ def _calc_p_mut_given_span_noclose(p_mut_given_span: np.ndarray, * np.expand_dims(p_ends, 2)) # Compute the mutation rates given no two mutations are too close # one position (j) at a time. - p_mut_given_span_noclose = p_mut_given_span / p_noclose_given_span + p_mut_given_span_noclose = np.nan_to_num( + p_mut_given_span / p_noclose_given_span + ) for j in range(npos): nrows = j + 1 ncols = npos - j @@ -1064,7 +1097,7 @@ def calc_p_ends_given_noclose(p_ends: np.ndarray, nonzero=True) # Calculate the proportion of total reads that would have each # pair of end coordinates. - return _triu_norm(p_ends[:, :, np.newaxis] * p_noclose_given_ends) + return _triu_norm(_triu_mul(p_ends[:, :, np.newaxis], p_noclose_given_ends)) @jit() diff --git a/src/seismicrna/core/ngs/xam.py b/src/seismicrna/core/ngs/xam.py index e732abe8..e6c18ad2 100755 --- a/src/seismicrna/core/ngs/xam.py +++ b/src/seismicrna/core/ngs/xam.py @@ -63,6 +63,7 @@ def index_fasta_cmd(fasta: Path): def sort_xam_cmd(xam_inp: Path | None, xam_out: Path | None, *, + temp_dir: Path | None = None, name: bool = False, n_procs: int = 1): """ Sort a SAM or BAM file using `samtools sort`. """ @@ -70,6 +71,9 @@ def sort_xam_cmd(xam_inp: Path | None, if name: # Sort by name instead of coordinate. args.append("-n") + if temp_dir: + # Write temporary files to this directory. + args.extend(["-T", temp_dir]) if xam_out: args.extend(["-o", xam_out]) else: diff --git a/src/seismicrna/core/path.py b/src/seismicrna/core/path.py index 5d11695a..95e15e88 100644 --- a/src/seismicrna/core/path.py +++ b/src/seismicrna/core/path.py @@ -80,6 +80,7 @@ STEP_ALIGN_INDEX_DEMULT = "index-demult" STEP_ALIGN_TRIM = "trim" STEP_ALIGN_MAP = "map" +STEP_ALIGN_SORT = "sort" STEPS_REL_SAMS = "sams" @@ -89,6 +90,7 @@ STEP_ALIGN_INDEX_DEMULT, STEP_ALIGN_TRIM, STEP_ALIGN_MAP, + STEP_ALIGN_SORT, STEPS_REL_SAMS) # Tables diff --git a/src/seismicrna/core/report.py b/src/seismicrna/core/report.py index 73f1bd17..3ac066dc 100644 --- a/src/seismicrna/core/report.py +++ b/src/seismicrna/core/report.py @@ -436,10 +436,12 @@ def oconv_datetime(dtime: datetime): int) NumReadsLoNCovF = Field("n_reads_min_ncov", "Number of Reads Cut -- Too Few Covered Positions", - int) + int, + default=0) # compatibility NumReadsDiscontigF = Field("n_reads_discontig", "Number of Reads Cut -- Discontiguous Mates", - int) + int, + default=0) # compatibility NumReadsLoInfoF = Field("n_reads_min_finfo", "Number of Reads Cut -- Too Few Informative Positions", int) @@ -454,7 +456,8 @@ def oconv_datetime(dtime: datetime): int) NumUniqReadKeptF = Field("n_uniq_reads", "Number of Unique Reads", - int) + int, + default=0) # compatibility # Cluster fields diff --git a/src/seismicrna/core/version.py b/src/seismicrna/core/version.py index 18175d26..b5800333 100644 --- a/src/seismicrna/core/version.py +++ b/src/seismicrna/core/version.py @@ -6,7 +6,7 @@ logger = getLogger(__name__) -__version__ = "0.15.0" +__version__ = "0.15.1" VERSION_DELIM = "." diff --git a/src/seismicrna/fold/main.py b/src/seismicrna/fold/main.py index 26486c3c..1e45dc4c 100644 --- a/src/seismicrna/fold/main.py +++ b/src/seismicrna/fold/main.py @@ -124,6 +124,7 @@ def run(input_path: tuple[str, ...], return list(chain(dispatch(fold_profile, max_procs, parallel, + pass_n_procs=True, args=[(loader, ref_sections.list(loader.ref)) for loader in tables], kwargs=dict(temp_dir=Path(temp_dir), @@ -139,26 +140,26 @@ def run(input_path: tuple[str, ...], fold_mfe=fold_mfe, fold_max=fold_max, fold_percent=fold_percent, - force=force), - pass_n_procs=True))) + force=force)))) def fold_profile(table: MaskPosTable | ClustPosTable, sections: list[Section], - n_procs: int, quantile: float, + n_procs: int, **kwargs): """ Fold an RNA molecule from one table of reactivities. """ return dispatch(fold_section, n_procs, parallel=True, + hybrid=True, + pass_n_procs=True, args=as_list_of_tuples(table.iter_profiles( sections=sections, quantile=quantile) ), kwargs=dict(out_dir=table.top, quantile=quantile, - **kwargs), - pass_n_procs=False) + **kwargs)) def fold_section(rna: RNAProfile, @@ -171,6 +172,7 @@ def fold_section(rna: RNAProfile, fold_max: int, fold_percent: float, force: bool, + n_procs: int, **kwargs): """ Fold a section of an RNA from one mutational profile. """ began = datetime.now() @@ -184,6 +186,7 @@ def fold_section(rna: RNAProfile, fold_max=fold_max, fold_percent=fold_percent, force=force, + n_procs=n_procs, **kwargs) ct2dot(ct_file) ended = datetime.now() diff --git a/src/seismicrna/fold/rnastructure.py b/src/seismicrna/fold/rnastructure.py index 1e59658b..9d920be8 100644 --- a/src/seismicrna/fold/rnastructure.py +++ b/src/seismicrna/fold/rnastructure.py @@ -5,6 +5,7 @@ https://rna.urmc.rochester.edu/RNAstructure.html """ +import os import re from logging import getLogger from pathlib import Path @@ -13,6 +14,7 @@ from ..core.extern import (RNASTRUCTURE_CT2DOT_CMD, RNASTRUCTURE_DOT2CT_CMD, RNASTRUCTURE_FOLD_CMD, + RNASTRUCTURE_FOLD_SMP_CMD, args_to_cmd, run_cmd) from ..core.rna import RNAProfile, renumber_ct @@ -20,6 +22,8 @@ logger = getLogger(__name__) +FOLD_SMP_NUM_THREADS = "OMP_NUM_THREADS" + def fold(rna: RNAProfile, *, fold_temp: float, @@ -31,24 +35,34 @@ def fold(rna: RNAProfile, *, out_dir: Path, temp_dir: Path, keep_temp: bool, - force: bool): - """ Run the 'Fold' program of RNAstructure. """ + force: bool, + n_procs: int): + """ Run the 'Fold' or 'Fold-smp' program of RNAstructure. """ ct_file = rna.get_ct_file(out_dir) if need_write(ct_file, force): - cmd = [RNASTRUCTURE_FOLD_CMD, "--temperature", fold_temp] - # Constraints. + if n_procs > 1: + # Fold with multiple threads using the Fold-smp program. + cmd = [RNASTRUCTURE_FOLD_SMP_CMD] + os.environ[FOLD_SMP_NUM_THREADS] = str(n_procs) + else: + # Fold with one thread using the Fold program. + cmd = [RNASTRUCTURE_FOLD_CMD] + # Temperature of folding (Kelvin). + cmd.extend(["--temperature", fold_temp]) if fold_constraint is not None: + # File of constraints. cmd.extend(["--constraint", fold_constraint]) - # Maximum distance between paired bases. if fold_md > 0: + # Maximum distance between paired bases. cmd.extend(["--maxdistance", fold_md]) - # Predict only the minimum free energy structure. if fold_mfe: + # Predict only the minimum free energy structure. cmd.append("--MFE") - # Maximum number of structures. - cmd.extend(["--maximum", fold_max]) - # Maximum % difference between free energies of structures. - cmd.extend(["--percent", fold_percent]) + else: + # Maximum number of structures. + cmd.extend(["--maximum", fold_max]) + # Maximum % difference between free energies of structures. + cmd.extend(["--percent", fold_percent]) # DMS reactivities file for the RNA. cmd.extend(["--DMS", dms_file := rna.to_dms(temp_dir)]) # Temporary FASTA file for the RNA. diff --git a/src/seismicrna/mask/write.py b/src/seismicrna/mask/write.py index fd644f96..4524050d 100644 --- a/src/seismicrna/mask/write.py +++ b/src/seismicrna/mask/write.py @@ -84,15 +84,7 @@ def __init__(self, self.max_fmut_read = max_fmut_read self._n_reads = defaultdict(int) # Set the parameters for filtering positions. - if not min_ninfo_pos >= 0: - raise ValueError( - f"min_ninfo_pos must be ≥ 0, but got {min_ninfo_pos}" - ) self.min_ninfo_pos = min_ninfo_pos - if not 0. <= max_fmut_pos < 1.: - raise ValueError( - f"max_fmut_pos Must be ≥ 0, < 1, but got {max_fmut_pos}" - ) self.max_fmut_pos = max_fmut_pos # Set the parameters for saving files. self.top = dataset.top @@ -189,8 +181,7 @@ def _filter_discontig_read(self, batch: RefseqMutsBatch): """ Filter out reads with discontiguous mates. """ # Find the reads with contiguous mates. reads = batch.read_nums[batch.contiguous_reads] - logger.debug(f"{self} kept {reads.size} reads with coverage " - f"≥ {self.min_ncov_read} in {batch}") + logger.debug(f"{self} kept {reads.size} contiguous reads in {batch}") # Return a new batch of only those reads. return apply_mask(batch, reads) @@ -307,13 +298,21 @@ def _filter_batch_reads(self, batch: RefseqMutsBatch): def _filter_positions(self, info: pd.Series, muts: pd.Series): """ Remove the positions that do not pass the filters. """ # Mask the positions with insufficient informative reads. + if not 1 <= self.min_ninfo_pos: + raise ValueError("min_ninfo_pos must be ≥ 1, " + f"but got {self.min_ninfo_pos}") self.section.add_mask( self.MASK_POS_NINFO, - index_to_pos(info.index[info < self.min_ninfo_pos])) + index_to_pos(info.index[info < self.min_ninfo_pos]) + ) # Mask the positions with excessive mutation fractions. + if not 0. <= self.max_fmut_pos <= 1.: + raise ValueError("max_fmut_pos must be ≥ 0 and ≤ 1, " + f"but got {self.max_fmut_pos}") self.section.add_mask( self.MASK_POS_FMUT, - index_to_pos(info.index[(muts / info) > self.max_fmut_pos])) + index_to_pos(info.index[(muts / info) > self.max_fmut_pos]) + ) def mask(self): # Exclude positions based on the parameters.