Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
Fix diffuse bugs with parameters and docdef.py
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewfallan committed Mar 4, 2023
1 parent 20c725f commit 33a4cb9
Show file tree
Hide file tree
Showing 14 changed files with 250 additions and 273 deletions.
2 changes: 1 addition & 1 deletion dreem/aggregate/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def run(bv_files: list, library: str, samples: str, sample: str, clustering_file
Use RNAstructure probability to predict free energy.
verbose: bool
Verbose output.
coords: tuple
initial_coords: tuple
coordinates for reference: '-c ref-name first last'
primers: tuple
primers for reference: '-p ref-name fwd rev'
Expand Down
119 changes: 58 additions & 61 deletions dreem/align/align.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ def check_for_duplicates(fq_units: list[FastqUnit]):
return dups


def write_temp_ref_files(temp_path: path.TopDirPath,
refset_path: path.RefsetSeqInFilePath,
def write_temp_ref_files(temp_dir: str,
fasta: str,
fq_units: list[FastqUnit]):
""" Write temporary FASTA files, each containing one reference that
corresponds to a FASTQ file from demultiplexing. """
Expand All @@ -45,12 +45,12 @@ def write_temp_ref_files(temp_path: path.TopDirPath,
refs = {fq_unit.ref for fq_unit in fq_units if fq_unit.demult}
if refs:
# Parse the FASTA only if there are any references to write.
for ref, seq in FastaParser(refset_path.path).parse():
for ref, seq in FastaParser(fasta).parse():
if ref in refs:
# Write the reference sequence to a temporary FASTA file
# only if at least one demultiplexed FASTQ file uses it.
ref_path = path.OneRefSeqStepFilePath(
top=temp_path.top,
top=temp_dir,
module=path.Module.ALIGN.value,
step=path.Step.ALIGN_ALIGN.value,
ref=ref,
Expand All @@ -62,29 +62,27 @@ def write_temp_ref_files(temp_path: path.TopDirPath,
return ref_paths


def infer_outputs(out_path: path.TopDirPath,
fasta: path.RefsetSeqInFilePath | path.OneRefSeqStepFilePath,
fastq: FastqUnit):
def infer_outputs(out_dir: str, fasta: str, fq_unit: FastqUnit):
translator = path.AlignmentInToAlignmentOut.inverse()
return [
translator.trans_inst(path.OneRefAlignmentOutFilePath(
top=out_path.top,
top=out_dir,
module=path.Module.ALIGN,
sample=fastq.sample,
sample=fq_unit.sample,
ref=ref,
ext=path.BAM_EXT))
for ref, _ in FastaParser(fasta.path).parse()
for ref, _ in FastaParser(fasta).parse()
]


@docdef.auto(return_doc="List of paths to binary alignment map (BAM) files")
def run_steps_fq(out_dir: path.TopDirPath,
temp_dir: path.TopDirPath,
save_temp: bool,
num_cpus: int,
def run_steps_fq(fq_unit: FastqUnit,
fasta: path.RefsetSeqInFilePath | path.OneRefSeqStepFilePath,
fastq: FastqUnit,
/, *,
*,
n_procs: int,
out_dir: str,
temp_dir: str,
save_temp: bool,
rerun: bool,
resume: bool,
fastqc: bool,
Expand Down Expand Up @@ -126,40 +124,40 @@ def run_steps_fq(out_dir: path.TopDirPath,
if not rerun:
# If alignment is not required to be rerun, then check if all
# the expected output files already exist.
outputs = infer_outputs(out_dir, fasta, fastq)
outputs = infer_outputs(out_dir, fasta.path, fq_unit)
if all(output.path.is_file() for output in outputs):
# If all the output files already exist, just return them.
logging.warning(
f"Skipping alignment for {' and '.join(fastq.str_paths)} "
f"Skipping alignment for {' and '.join(fq_unit.str_paths)} "
"because all expected output files already exist. "
"Add --rerun to rerun.")
return outputs
# Trim the FASTQ file(s).
trimmer = FastqTrimmer(top_dir=temp_dir, num_cpus=num_cpus, fastq=fastq,
trimmer = FastqTrimmer(top_dir=temp_dir, n_procs=n_procs, fq_unit=fq_unit,
save_temp=save_temp, resume=resume)
if fastqc:
trimmer.qc_input(fastqc_extract)
if cut:
if resume and all(p.is_file() for p in trimmer.output.paths):
fastq = trimmer.output
fq_unit = trimmer.output
else:
fastq = trimmer.run(cut_q1=cut_q1,
cut_q2=cut_q2,
cut_g1=cut_g1,
cut_a1=cut_a1,
cut_g2=cut_g2,
cut_a2=cut_a2,
cut_o=cut_o,
cut_e=cut_e,
cut_indels=cut_indels,
cut_nextseq=cut_nextseq,
cut_discard_trimmed=cut_discard_trimmed,
cut_discard_untrimmed=cut_discard_untrimmed,
cut_m=cut_m)
fq_unit = trimmer.run(cut_q1=cut_q1,
cut_q2=cut_q2,
cut_g1=cut_g1,
cut_a1=cut_a1,
cut_g2=cut_g2,
cut_a2=cut_a2,
cut_o=cut_o,
cut_e=cut_e,
cut_indels=cut_indels,
cut_nextseq=cut_nextseq,
cut_discard_trimmed=cut_discard_trimmed,
cut_discard_untrimmed=cut_discard_untrimmed,
cut_m=cut_m)
if fastqc:
trimmer.qc_output(fastqc_extract)
# Align the FASTQ to the reference.
aligner = FastqAligner(top_dir=temp_dir, num_cpus=num_cpus, fastq=fastq,
aligner = FastqAligner(top_dir=temp_dir, n_procs=n_procs, fq_unit=fq_unit,
fasta=fasta, save_temp=save_temp, resume=resume)
xam_path = aligner.run(bt2_local=bt2_local,
bt2_discordant=bt2_discordant,
Expand All @@ -180,23 +178,23 @@ def run_steps_fq(out_dir: path.TopDirPath,
trimmer.clean()
# Remove equally mapping reads.
remover = SamRemoveEqualMappers(top_dir=temp_dir,
num_cpus=num_cpus,
n_procs=n_procs,
xam=xam_path,
save_temp=save_temp,
resume=resume)
xam_path = remover.run(rem_buffer=rem_buffer)
aligner.clean()
# Sort the SAM file and output a BAM file.
sorter = BamAlignSorter(top_dir=temp_dir,
num_cpus=num_cpus,
n_procs=n_procs,
xam=xam_path,
save_temp=save_temp,
resume=resume)
xam_path = sorter.run()
remover.clean()
# Split the BAM file into one file for each reference.
splitter = BamSplitter(top_dir=temp_dir,
num_cpus=num_cpus,
n_procs=n_procs,
xam=xam_path,
fasta=fasta,
save_temp=save_temp,
Expand All @@ -205,7 +203,7 @@ def run_steps_fq(out_dir: path.TopDirPath,
sorter.clean()
# Output the BAM files and generate an index for each.
bams = [BamOutputter(top_dir=out_dir,
num_cpus=num_cpus,
n_procs=n_procs,
xam=bam,
save_temp=save_temp,
resume=resume).run()
Expand All @@ -215,14 +213,14 @@ def run_steps_fq(out_dir: path.TopDirPath,


@docdef.auto()
def run_steps_fqs(fasta: str,
fq_units: list[FastqUnit],
/, *,
def run_steps_fqs(fq_units: list[FastqUnit],
fasta: str,
*,
max_procs: int,
parallel: bool,
out_dir: str,
temp_dir: str,
save_temp: bool,
parallel: bool,
max_procs: int,
**kwargs) -> tuple[path.OneRefAlignmentInFilePath, ...]:
""" Run all steps of alignment for one or more FASTQ files or pairs
of mated FASTQ files. """
Expand All @@ -237,45 +235,44 @@ def run_steps_fqs(fasta: str,
if max_procs < 1:
logging.warning("Maximum CPUs must be ≥ 1: setting to 1")
max_procs = 1
# Generate the paths for the output, temporary files, and the input
# FASTA file.
out_path = path.TopDirPath.parse(out_dir)
temp_path = path.TopDirPath.parse(temp_dir)
refset_path = path.RefsetSeqInFilePath.parse(fasta)
# Write the temporary FASTA files for demultiplexed FASTQs.
temp_ref_paths = write_temp_ref_files(temp_path, refset_path, fq_units)
temp_ref_paths = write_temp_ref_files(temp_dir, fasta, fq_units)
try:
# Determine how to parallelize each alignment task.
n_tasks_parallel, procs_per_task = get_num_parallel(n_fqs,
max_procs,
parallel,
hybrid=True)
n_tasks_parallel, n_procs_per_task = get_num_parallel(n_fqs,
max_procs,
parallel,
hybrid=True)
# One alignment task will be created for each FASTQ unit.
# Get the arguments for each task, including procs_per_task.
# Get the arguments for each task, including n_procs_per_task.
iter_args = list()
for fq in fq_units:
if fq.demult:
# If the FASTQ came from demultiplexing (so contains
# reads from only one reference), then align to the
# FASTA of only that reference.
try:
fasta_path = temp_ref_paths[fq.ref]
fasta_arg = temp_ref_paths[fq.ref]
except KeyError:
# If the FASTA with that reference does not exist,
# then log an error and skip this FASTQ.
logging.error(f"Skipping {', '.join(fq.str_paths)} because "
f"reference '{fq.ref}' was not found in "
f"FASTA file: {refset_path}")
logging.error(f"Skipped FASTQ(s) {', '.join(fq.str_paths)} "
f"because reference '{fq.ref}' was not found "
f"in FASTA file {fasta}")
continue
else:
# If the FASTQ may contain reads from ≥ 1 references,
# then align to the FASTA file with all references.
fasta_path = refset_path
fasta_arg = refset_path
# Add these arguments to the lists of arguments that will be
# passed to run_steps_fq.
iter_args.append((out_path, temp_path, save_temp, procs_per_task,
fasta_path, fq))
partial_run_steps_fq = partial(run_steps_fq, **kwargs)
iter_args.append((fq, fasta_arg))
partial_run_steps_fq = partial(run_steps_fq,
**{**dict(n_procs=n_procs_per_task,
out_dir=out_dir,
temp_dir=temp_dir),
**kwargs})
if n_tasks_parallel > 1:
# Process multiple FASTQ files simultaneously.
with Pool(n_tasks_parallel) as pool:
Expand Down
16 changes: 8 additions & 8 deletions dreem/align/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,26 @@
opt_bt2_orient,
opt_rem_buffer,
])
# Pass context object
# Pass context object.
@pass_obj
# Turn into DREEM command
# Turn into DREEM command.
@dreem_command(imports=("fastqs_dir", "fastqi_dir", "fastq12_dir"),
exports=("fasta", "bamf"))
exports=("fasta", "phred_enc"),
result_key="bamf")
def cli(*args, **kwargs):
return run(*args, **kwargs)


@docdef.auto()
def run(fasta: str,
/, *,
phred_enc: int,
def run(*,
fastqs: tuple[str],
fastqi: tuple[str],
fastq1: tuple[str],
fastq2: tuple[str],
fastqs_dir: tuple[str],
fastqi_dir: tuple[str],
fastq12_dir: tuple[str],
phred_enc: int,
**kwargs):
"""
Run the alignment module.
Expand Down Expand Up @@ -136,7 +136,7 @@ def run(fasta: str,
"""

# FASTQ files of read sequences may come from up to seven different
# sources (i.e. each argument beginning with "fastq"). This step
# sources (i.e. each argument beginning with "fq_unit"). This step
# collects all of them into one list (fq_units) and also bundles
# together pairs of FASTQ files containing mate 1 and mate 2 reads.
fq_units = list(FastqUnit.from_strs(fastqs=fastqs,
Expand All @@ -150,4 +150,4 @@ def run(fasta: str,
no_dup_samples=True))

# Run the alignment pipeline on every FASTQ.
return fasta, run_steps_fqs(fasta, fq_units, **kwargs)
return run_steps_fqs(fq_units, **kwargs)
Loading

0 comments on commit 33a4cb9

Please sign in to comment.