Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
Run pipeline via command "dreem"; simplify reads.py
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewfallan committed Mar 6, 2023
1 parent 33a4cb9 commit 1b9ef26
Show file tree
Hide file tree
Showing 17 changed files with 1,441 additions and 1,097 deletions.
2 changes: 1 addition & 1 deletion docs/source/plots/create_a_plot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,4 @@ Make sure that it looks good!

11. Commit your changes and push them to GitHub. The docs will be automatically updated on GitHub Pages. Make sure that the docstrings are displayed and that the plot looks good.

12. Send us a pull request to the DREEM repository!
12. Send us a pull request to the DREEM repository!
2 changes: 1 addition & 1 deletion dreem/align/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .main import cli, run
from .main import cli, params, run
49 changes: 29 additions & 20 deletions dreem/align/align.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def check_for_duplicates(fq_units: list[FastqUnit]):
samples = defaultdict(int)
sample_refs = defaultdict(lambda: defaultdict(int))
for fq_unit in fq_units:
if fq_unit.demult:
if fq_unit.by_ref:
sample_refs[fq_unit.sample][fq_unit.ref] += 1
else:
samples[fq_unit.sample] += 1
Expand All @@ -42,7 +42,7 @@ def write_temp_ref_files(temp_dir: str,
ref_paths: dict[str, path.OneRefSeqStepFilePath] = dict()
# Only the reference sequences of FASTQ files that have come from
# demultiplexing need to be written.
refs = {fq_unit.ref for fq_unit in fq_units if fq_unit.demult}
refs = {fq_unit.ref for fq_unit in fq_units if fq_unit.by_ref}
if refs:
# Parse the FASTA only if there are any references to write.
for ref, seq in FastaParser(fasta).parse():
Expand All @@ -63,19 +63,21 @@ def write_temp_ref_files(temp_dir: str,


def infer_outputs(out_dir: str, fasta: str, fq_unit: FastqUnit):
translator = path.AlignmentInToAlignmentOut.inverse()
""" Infer the paths of the BAM files that are expected to be output
from the alignment. """
return [
translator.trans_inst(path.OneRefAlignmentOutFilePath(
path.OneRefAlignmentOutFilePath(
top=out_dir,
module=path.Module.ALIGN,
sample=fq_unit.sample,
ref=ref,
ext=path.BAM_EXT))
ext=path.BAM_EXT
).path
for ref, _ in FastaParser(fasta).parse()
]


@docdef.auto(return_doc="List of paths to binary alignment map (BAM) files")
@docdef.auto()
def run_steps_fq(fq_unit: FastqUnit,
fasta: path.RefsetSeqInFilePath | path.OneRefSeqStepFilePath,
*,
Expand Down Expand Up @@ -117,26 +119,26 @@ def run_steps_fq(fq_unit: FastqUnit,
bt2_r: int,
bt2_dpad: int,
bt2_orient: str,
rem_buffer: int) -> list[path.OneRefAlignmentInFilePath]:
rem_buffer: int) -> list[str]:
""" Run all steps of alignment for one FASTQ file or one pair of
mated FASTQ files. """
# Determine whether alignment needs to be run.
if not rerun:
# If alignment is not required to be rerun, then check if all
# the expected output files already exist.
outputs = infer_outputs(out_dir, fasta.path, fq_unit)
if all(output.path.is_file() for output in outputs):
expected_output_paths = infer_outputs(out_dir, fasta.path, fq_unit)
if all(out_path.is_file() for out_path in expected_output_paths):
# If all the output files already exist, just return them.
logging.warning(
f"Skipping alignment for {' and '.join(fq_unit.str_paths)} "
"because all expected output files already exist. "
"Add --rerun to rerun.")
return outputs
return list(map(str, expected_output_paths))
# Trim the FASTQ file(s).
trimmer = FastqTrimmer(top_dir=temp_dir, n_procs=n_procs, fq_unit=fq_unit,
save_temp=save_temp, resume=resume)
if fastqc:
trimmer.qc_input(fastqc_extract)
trimmer.qc(fastqc_extract)
if cut:
if resume and all(p.is_file() for p in trimmer.output.paths):
fq_unit = trimmer.output
Expand All @@ -155,7 +157,12 @@ def run_steps_fq(fq_unit: FastqUnit,
cut_discard_untrimmed=cut_discard_untrimmed,
cut_m=cut_m)
if fastqc:
trimmer.qc_output(fastqc_extract)
trimmed_qc = FastqTrimmer(top_dir=temp_dir,
n_procs=n_procs,
fq_unit=fq_unit,
save_temp=save_temp,
resume=resume)
trimmed_qc.qc(fastqc_extract)
# Align the FASTQ to the reference.
aligner = FastqAligner(top_dir=temp_dir, n_procs=n_procs, fq_unit=fq_unit,
fasta=fasta, save_temp=save_temp, resume=resume)
Expand All @@ -179,23 +186,23 @@ def run_steps_fq(fq_unit: FastqUnit,
# Remove equally mapping reads.
remover = SamRemoveEqualMappers(top_dir=temp_dir,
n_procs=n_procs,
xam=xam_path,
input_path=xam_path,
save_temp=save_temp,
resume=resume)
xam_path = remover.run(rem_buffer=rem_buffer)
aligner.clean()
# Sort the SAM file and output a BAM file.
sorter = BamAlignSorter(top_dir=temp_dir,
n_procs=n_procs,
xam=xam_path,
input_path=xam_path,
save_temp=save_temp,
resume=resume)
xam_path = sorter.run()
remover.clean()
# Split the BAM file into one file for each reference.
splitter = BamSplitter(top_dir=temp_dir,
n_procs=n_procs,
xam=xam_path,
input_path=xam_path,
fasta=fasta,
save_temp=save_temp,
resume=resume)
Expand All @@ -204,12 +211,12 @@ def run_steps_fq(fq_unit: FastqUnit,
# Output the BAM files and generate an index for each.
bams = [BamOutputter(top_dir=out_dir,
n_procs=n_procs,
xam=bam,
input_path=bam,
save_temp=save_temp,
resume=resume).run()
for bam in bams]
splitter.clean()
return bams
return list(map(str, bams))


@docdef.auto()
Expand All @@ -221,7 +228,7 @@ def run_steps_fqs(fq_units: list[FastqUnit],
out_dir: str,
temp_dir: str,
save_temp: bool,
**kwargs) -> tuple[path.OneRefAlignmentInFilePath, ...]:
**kwargs) -> tuple[str, ...]:
""" Run all steps of alignment for one or more FASTQ files or pairs
of mated FASTQ files. """
n_fqs = len(fq_units)
Expand All @@ -248,7 +255,7 @@ def run_steps_fqs(fq_units: list[FastqUnit],
# Get the arguments for each task, including n_procs_per_task.
iter_args = list()
for fq in fq_units:
if fq.demult:
if fq.by_ref:
# If the FASTQ came from demultiplexing (so contains
# reads from only one reference), then align to the
# FASTA of only that reference.
Expand All @@ -268,10 +275,12 @@ def run_steps_fqs(fq_units: list[FastqUnit],
# Add these arguments to the lists of arguments that will be
# passed to run_steps_fq.
iter_args.append((fq, fasta_arg))
# Pass the keyword arguments to every call of run_steps_fq.
partial_run_steps_fq = partial(run_steps_fq,
**{**dict(n_procs=n_procs_per_task,
out_dir=out_dir,
temp_dir=temp_dir),
temp_dir=temp_dir,
save_temp=save_temp),
**kwargs})
if n_tasks_parallel > 1:
# Process multiple FASTQ files simultaneously.
Expand Down
14 changes: 9 additions & 5 deletions dreem/align/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
from ..util import docdef


@command(DreemCommandName.ALIGN.value, params=[
# Parameters for command line interface
params = [
# Input files
opt_fasta,
opt_fastqs,
Expand Down Expand Up @@ -84,16 +85,19 @@
opt_bt2_r,
opt_bt2_dpad,
opt_bt2_orient,
opt_rem_buffer,
])
opt_rem_buffer
]


@command(DreemCommandName.ALIGN.value, params=params)
# Pass context object.
@pass_obj
# Turn into DREEM command.
@dreem_command(imports=("fastqs_dir", "fastqi_dir", "fastq12_dir"),
exports=("fasta", "phred_enc"),
result_key="bamf")
def cli(*args, **kwargs):
return run(*args, **kwargs)
def cli(**kwargs):
return run(**kwargs)


@docdef.auto()
Expand Down
Loading

0 comments on commit 1b9ef26

Please sign in to comment.