Skip to content

Commit

Permalink
Merge pull request #16 from rouskinlab/0.18.2
Browse files Browse the repository at this point in the history
0.18.2
  • Loading branch information
matthewfallan authored Jun 26, 2024
2 parents 94b5b8d + e78301c commit cba844a
Show file tree
Hide file tree
Showing 9 changed files with 476 additions and 178 deletions.
22 changes: 2 additions & 20 deletions src/seismicrna/align/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,26 +169,17 @@ def fq_pipeline(fq_inp: FastqUnit,
path.builddir(*path.CMD_DIR_SEGS,
top=release_dir,
sample=sample,
cmd=CMD_ALIGN)
cmd=path.CMD_ALIGN_DIR)
xam_whole = path.buildpar(*path.XAM_STAGE_SEGS,
top=working_dir,
sample=sample,
cmd=CMD_ALIGN,
cmd=path.CMD_ALIGN_DIR,
stage=path.STAGE_ALIGN_MAP,
ref=refset,
ext=path.BAM_EXT)
reads_align = run_xamgen(fq_inp if fq_cut is None else fq_cut,
xam_whole,
index_pfx=bowtie2_index,
tmp_dir=path.builddir(
path.SampSeg,
path.CmdSeg,
path.StageSeg,
top=working_dir,
sample=sample,
cmd=path.CMD_ALIGN_DIR,
stage=path.STAGE_ALIGN_MAP
),
n_procs=n_procs,
bt2_local=bt2_local,
bt2_discordant=bt2_discordant,
Expand Down Expand Up @@ -272,15 +263,6 @@ def fq_pipeline(fq_inp: FastqUnit,
xam_ref,
ref=ref,
header=ref_headers[ref],
tmp_dir=path.builddir(path.SampSeg,
path.CmdSeg,
path.StageSeg,
path.RefSeg,
top=working_dir,
sample=sample,
cmd=path.CMD_ALIGN_DIR,
stage=path.STAGE_ALIGN_SORT,
ref=ref),
n_procs=n_procs)
except Exception as error:
logger.error(f"Failed to output {ref}: {error}")
Expand Down
44 changes: 19 additions & 25 deletions src/seismicrna/align/xamops.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ def parse_match(m: re.Match[str]):
elif prev != count:
# If so, then confirm the count matches the previous.
raise ValueError(
f"Inconsistent counts for '{key}': {prev}{count}")
f"Inconsistent counts for {repr(key)}: {prev}{count}"
)
# Read the next line, defaulting to an empty string.
line = next(lines, "").rstrip()
# Try to match the line with each pattern, until one matches.
Expand All @@ -239,13 +240,15 @@ def parse_match(m: re.Match[str]):


def xamgen_cmd(fq_inp: FastqUnit,
bam_out: Path | None, *,
tmp_dir: Path | None = None,
bam_out: Path, *,
min_mapq: int | None = None,
n_procs: int = 1,
**kwargs):
""" Wrap alignment and post-processing into one pipeline. """
bowtie2_step = bowtie2_cmd(fq_inp, None, n_procs=n_procs, **kwargs)
bowtie2_step = bowtie2_cmd(fq_inp,
None,
n_procs=max(n_procs - 2, 1),
**kwargs)
# Filter out any unaligned or otherwise unsuitable reads.
if fq_inp.paired:
# Require the paired flag.
Expand All @@ -264,7 +267,7 @@ def xamgen_cmd(fq_inp: FastqUnit,
n_procs=1)
sort_xam_step = sort_xam_cmd(None,
bam_out,
tmp_dir=tmp_dir,
tmp_dir=bam_out.parent,
n_procs=1)
return cmds_to_pipe([bowtie2_step, view_xam_step, sort_xam_step])

Expand All @@ -274,41 +277,32 @@ def xamgen_cmd(fq_inp: FastqUnit,
parse_bowtie2)


def export_cmd(xam_in: Path | None,
xam_out: Path | None, *,
def export_cmd(xam_in: Path,
xam_out: Path, *,
ref: str,
header: str,
ref_file: Path | None = None,
tmp_dir: Path | None = None,
n_procs: int = 1):
""" Wrap selecting, sorting, and exporting into one pipeline. """
# Pipe the header line.
""" Select and export one reference to its own XAM file. """
# Pipe the header line containing only this one reference.
echo_step = args_to_cmd([ECHO_CMD, header])
# Select only the reads that aligned to the reference, and ignore
# the original header.
# Select only the reads that aligned to the reference; ignore the
# original header so that the output XAM file contains only the
# one reference in the XAM file.
ref_step = view_xam_cmd(xam_in,
None,
sam=True,
with_header=False,
ref=ref,
n_procs=1)
n_procs=max(n_procs - 1, 1))
# Merge the one header line and the reads for the reference.
merge_step = cmds_to_subshell([echo_step, ref_step])
# Sort reads by name so that mates are adjacent.
sort_step = sort_xam_cmd(None,
None,
tmp_dir=tmp_dir,
name=True,
n_procs=n_procs)
# Export the reads into a XAM file.
export_step = view_xam_cmd(None,
xam_out,
refs_file=ref_file,
n_procs=1)
return cmds_to_pipe([merge_step, sort_step, export_step])
export_step = view_xam_cmd(None, xam_out, refs_file=ref_file)
return cmds_to_pipe([merge_step, export_step])


run_export = ShellCommand("selecting reference, sorting by name, and exporting",
run_export = ShellCommand("selecting reference and exporting",
export_cmd)

########################################################################
Expand Down
16 changes: 13 additions & 3 deletions src/seismicrna/core/batch/count.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,18 @@ def calc_rels_per_pos(mutations: dict[int, dict[int, np.ndarray]],
# The number of matches is the coverage minus the number of
# reads with another kind of relationship that is not the
# no-coverage relationship (no coverage is counted later).
counts[MATCH].loc[pos_base] = (cover_per_pos.loc[pos_base]
- num_reads_pos)
num_match = cover_per_pos.loc[pos_base] - num_reads_pos
if np.atleast_1d(num_match)[0] < 0:
raise ValueError("Number of matches must be ≥ 0, "
f"but got {num_match} at position {pos}")
counts[MATCH].loc[pos_base] = num_match
# The number of non-covered positions is the number of reads
# minus the number that cover the position.
counts[NOCOV].loc[pos_base] = num_reads - cover_per_pos.loc[pos_base]
num_nocov = num_reads - cover_per_pos.loc[pos_base]
if np.atleast_1d(num_nocov)[0] < 0:
raise ValueError("Number of non-covered positions must be ≥ 0, "
f"but got {num_nocov} at position {pos}")
counts[NOCOV].loc[pos_base] = num_nocov
return dict(counts)


Expand All @@ -286,6 +293,9 @@ def calc_rels_per_read(mutations: dict[int, dict[int, np.ndarray]],
for mut, reads in mutations[pos].items():
rows = read_indexes[reads]
counts[MATCH].values[rows, column] -= 1
if counts[MATCH].values[rows, column].min(initial=0) < 0:
raise ValueError("Number of matches must be ≥ 0, but got "
f"{counts[MATCH].values[rows, column]}")
counts[mut].values[rows, column] += 1
return dict(counts)

Expand Down
103 changes: 65 additions & 38 deletions src/seismicrna/core/ngs/xam.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
from pathlib import Path
from subprocess import CompletedProcess

from .. import path
from ..extern import SAMTOOLS_CMD, args_to_cmd, ShellCommand

logger = getLogger(__name__)

# SAM file format specifications
SAM_HEADER = '@'
SAM_DELIM = '\t'
SAM_NOREF = '*'
SAM_HEADER = "@"
SAM_DELIM = "\t"
SAM_NOREF = "*"
SAM_SEQLINE = "@SQ"
SAM_SEQNAME = "SN:"
SAM_SEQLEN = "LN:"
Expand Down Expand Up @@ -41,9 +40,26 @@
FLAG_SUPPLEMENTARY])


def calc_extra_threads(n_procs: int):
""" Calculate the number of extra threads to use (option -@). """
try:
if not isinstance(n_procs, int):
raise TypeError(
f"n_procs must be int, but got {type(n_procs).__name__}"
)
if n_procs < 1:
raise ValueError(f"n_procs must be ≥ 1, but got {n_procs}")
return n_procs - 1
except (TypeError, ValueError) as error:
logger.warning(error)
return 0


def index_xam_cmd(bam: Path, *, n_procs: int = 1):
""" Build an index of a XAM file using `samtools index`. """
return args_to_cmd([SAMTOOLS_CMD, "index", "-@", n_procs - 1, bam])
return args_to_cmd([SAMTOOLS_CMD, "index",
"-@", calc_extra_threads(n_procs),
bam])


run_index_xam = ShellCommand("indexing alignment map",
Expand All @@ -67,7 +83,8 @@ def sort_xam_cmd(xam_inp: Path | None,
name: bool = False,
n_procs: int = 1):
""" Sort a SAM or BAM file using `samtools sort`. """
args = [SAMTOOLS_CMD, "sort", "-@", n_procs - 1]
args = [SAMTOOLS_CMD, "sort",
"-@", calc_extra_threads(n_procs)]
if name:
# Sort by name instead of coordinate.
args.append("-n")
Expand All @@ -78,7 +95,7 @@ def sort_xam_cmd(xam_inp: Path | None,
args.extend(["-o", xam_out])
else:
# To increase speed, do not compress on stdout.
args.extend(["-l", 0])
args.append("-u")
if xam_inp:
args.append(xam_inp)
return args_to_cmd(args)
Expand All @@ -87,6 +104,36 @@ def sort_xam_cmd(xam_inp: Path | None,
sort_xam = ShellCommand("sorting alignment map", sort_xam_cmd)


def collate_xam_cmd(xam_inp: Path | None,
xam_out: Path | None, *,
tmp_dir: Path | None = None,
fast: bool = False,
n_procs: int = 1):
""" Collate a SAM or BAM file using `samtools collate`. """
args = [SAMTOOLS_CMD, "collate",
"-@", calc_extra_threads(n_procs)]
if fast:
# Use fast mode (outputs primary alignments only).
args.append("-f")
if tmp_dir:
# Write temporary files to this directory.
args.extend(["-T", tmp_dir])
if xam_out:
args.extend(["-o", xam_out])
else:
# Output to stdout.
args.append("-O")
# To increase speed, do not compress on stdout.
args.append("-u")
# samtools collate requires an input argument; - means to read from
# standard input.
args.append(xam_inp if xam_inp else "-")
return args_to_cmd(args)


run_collate_xam = ShellCommand("collating alignment map", collate_xam_cmd)


def view_xam_cmd(xam_inp: Path | None,
xam_out: Path | None, *,
sam: bool = False,
Expand All @@ -105,7 +152,8 @@ def view_xam_cmd(xam_inp: Path | None,
""" Convert between SAM and BAM formats, extract reads aligning to a
specific reference/section, and filter by flag and mapping quality
using `samtools view`. """
args = [SAMTOOLS_CMD, "view", "-@", n_procs - 1]
args = [SAMTOOLS_CMD, "view",
"-@", calc_extra_threads(n_procs)]
# Read filters
if min_mapq is not None:
# Require minimum mapping quality.
Expand All @@ -119,6 +167,8 @@ def view_xam_cmd(xam_inp: Path | None,
# Output format
if cram:
args.append("-C")
if sam:
logger.warning("Both SAM and CRAM flags were set: using CRAM")
if bam:
logger.warning("Both BAM and CRAM flags were set: using CRAM")
if not refs_file:
Expand Down Expand Up @@ -164,7 +214,8 @@ def view_xam_cmd(xam_inp: Path | None,

def flagstat_cmd(xam_inp: Path | None, *, n_procs: int = 1):
""" Compute the statistics with `samtools flagstat`. """
args = [SAMTOOLS_CMD, "flagstat", "-@", n_procs - 1]
args = [SAMTOOLS_CMD, "flagstat",
"-@", calc_extra_threads(n_procs)]
if xam_inp:
args.append(xam_inp)
return args_to_cmd(args)
Expand Down Expand Up @@ -252,8 +303,11 @@ def parse_idxstats(process: CompletedProcess):

def ref_header_cmd(xam_inp: Path, *, n_procs: int):
""" Get the header line for each reference. """
return view_xam_cmd(xam_inp, None,
sam=True, only_header=True, n_procs=n_procs)
return view_xam_cmd(xam_inp,
None,
sam=True,
only_header=True,
n_procs=n_procs)


def parse_ref_header(process: CompletedProcess):
Expand All @@ -277,33 +331,6 @@ def parse_ref_header(process: CompletedProcess):
parse_ref_header,
opath=False)


def xam_to_fq_cmd(xam_inp: Path | None,
fq_out: Path | None, *,
interleaved: bool = False,
n_procs: int = 1):
""" Convert a XAM file to a FASTQ file using `samtools fastq`. """
args = [SAMTOOLS_CMD, "fastq", "-@", n_procs - 1, "-n"]
if fq_out:
if xam_paired(run_flagstat(xam_inp, n_procs=n_procs)):
if interleaved:
# Interleave first and second reads in one file.
args.extend(["-o", fq_out.with_suffix(path.FQ_EXTS[0])])
else:
# Output first and second reads in separate files.
args.extend(["-1", fq_out.with_suffix(path.FQ1_EXTS[0]),
"-2", fq_out.with_suffix(path.FQ2_EXTS[0])])
else:
# Output single-end reads in one file.
args.extend(["-0", fq_out.with_suffix(path.FQ_EXTS[0])])
if xam_inp:
args.append(xam_inp)
return args_to_cmd(args)


run_xam_to_fq = ShellCommand("deconstructing alignment map",
xam_to_fq_cmd)

########################################################################
# #
# © Copyright 2024, the Rouskin Lab. #
Expand Down
Loading

0 comments on commit cba844a

Please sign in to comment.