Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe committed Dec 30, 2022
1 parent 13e2366 commit 0601888
Show file tree
Hide file tree
Showing 18 changed files with 542 additions and 1,134 deletions.
17 changes: 17 additions & 0 deletions snappy_pipeline/workflows/abstract/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Commonly used code and types"""

from itertools import chain
import re
import typing

from snakemake.io import Wildcards
Expand Down Expand Up @@ -60,3 +62,18 @@ def get_resource_usage(self, action: str) -> ResourceUsage:
assert self.resource_usage_dict is not None, "resource_usage_dict not set!"
assert action in self.resource_usage_dict, f"No resource usage entry for {action}"
return self.resource_usage_dict[action]


def augment_work_dir_with_output_links(
work_dir_dict: SnakemakeDict, log_files: typing.Optional[typing.List[str]] = None
) -> SnakemakeDict:
"""Augment a dictionary with key/value pairs to work directory with ``"output_links"`` key.
Optionally, the output files will be augmented from the paths in ``log_files``.
"""
result = dict(work_dir_dict)
result["output_links"] = [
re.sub(r"^work/", "output/", work_path)
for work_path in chain(work_dir_dict.values(), log_files or [])
]
return result
1 change: 1 addition & 0 deletions snappy_pipeline/workflows/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Code shared between workflows"""
128 changes: 128 additions & 0 deletions snappy_pipeline/workflows/common/delly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""Workflow step parts for Delly.
These are used in both ``sv_calling_targeted`` and ``sv_calling_wgs``.
"""

from snappy_pipeline.utils import dictify
from snappy_pipeline.workflows.abstract import BaseStepPart
from snappy_pipeline.workflows.abstract.common import (
ForwardResourceUsageMixin,
ForwardSnakemakeFilesMixin,
augment_work_dir_with_output_links,
)
from snappy_pipeline.workflows.common.sv_calling import (
SvCallingGetLogFileMixin,
SvCallingGetResultFilesMixin,
)
from snappy_wrappers.resource_usage import ResourceUsage


class Delly2StepPart(
ForwardSnakemakeFilesMixin,
ForwardResourceUsageMixin,
SvCallingGetResultFilesMixin,
SvCallingGetLogFileMixin,
BaseStepPart,
):
"""Perform SV calling on exomes using Delly2"""

name = "delly2"
actions = ("call", "merge_calls", "genotype", "merge_genotypes")

_cheap_resource_usage = ResourceUsage(
threads=2,
time="4-00:00:00",
memory=f"{7 * 1024 * 2}M",
)
_normal_resource_usage = ResourceUsage(
threads=2,
time="7-00:00:00", # 7 days
memory=f"{20 * 1024 * 2}M",
)
resource_usage_dict = {
"call": _normal_resource_usage,
"merge_calls": _cheap_resource_usage,
"genotype": _normal_resource_usage,
"merge_genotypes": _cheap_resource_usage,
}

def __init__(self, parent):
super().__init__(parent)

self.index_ngs_library_to_pedigree = {}
for sheet in self.parent.shortcut_sheets:
self.index_ngs_library_to_pedigree.update(sheet.index_ngs_library_to_pedigree)

self.donor_ngs_library_to_pedigree = {}
for sheet in self.parent.shortcut_sheets:
self.donor_ngs_library_to_pedigree.update(sheet.donor_ngs_library_to_pedigree)

@dictify
def _get_input_files_call(self, wildcards):
ngs_mapping = self.parent.sub_workflows["ngs_mapping"]
token = f"{wildcards.mapper}.{wildcards.library_name}"
yield "bam", ngs_mapping(f"output/{token}/out/{token}.bam")

@dictify
def _get_output_files_call(self):
infix = "{mapper}.delly2_call.{library_name}"
yield "bcf", f"work/{infix}/out/{infix}.bcf"
yield "bcf_md5", f"work/{infix}/out/{infix}.bcf.md5"
yield "bcf_csi", f"work/{infix}/out/{infix}.bcf.csi"
yield "bcf_csi_md5", f"work/{infix}/out/{infix}.bcf.csi.md5"

@dictify
def _get_input_files_merge_calls(self, wildcards):
bcfs = []
pedigree = self.index_ngs_library_to_pedigree[wildcards.library_name]
for donor in pedigree.donors:
if donor.dna_ngs_library:
infix = f"{wildcards.mapper}.delly2_call.{donor.dna_ngs_library.name}"
bcfs.append(f"work/{infix}/out/{infix}.bcf")
yield "bcf", bcfs

@dictify
def _get_output_files_merge_calls(self):
infix = "{mapper}.delly2_merge_calls.{library_name}"
yield "bcf", f"work/{infix}/out/{infix}.bcf"
yield "bcf_md5", f"work/{infix}/out/{infix}.bcf.md5"
yield "bcf_csi", f"work/{infix}/out/{infix}.bcf.csi"
yield "bcf_csi_md5", f"work/{infix}/out/{infix}.bcf.csi.md5"

@dictify
def _get_input_files_genotype(self, wildcards):
yield from self._get_input_files_call(wildcards).items()
pedigree = self.donor_ngs_library_to_pedigree[wildcards.library_name]
infix = f"{wildcards.mapper}.delly2_merge_calls.{pedigree.index.dna_ngs_library.name}"
yield "bcf", f"work/{infix}/out/{infix}.bcf"

@dictify
def _get_output_files_genotype(self):
infix = "{mapper}.delly2_genotype.{library_name}"
yield "bcf", f"work/{infix}/out/{infix}.bcf"
yield "bcf_md5", f"work/{infix}/out/{infix}.bcf.md5"
yield "bcf_csi", f"work/{infix}/out/{infix}.bcf.csi"
yield "bcf_csi_md5", f"work/{infix}/out/{infix}.bcf.csi.md5"

@dictify
def _get_input_files_merge_genotypes(self, wildcards):
bcfs = []
pedigree = self.index_ngs_library_to_pedigree[wildcards.library_name]
for donor in pedigree.donors:
if donor.dna_ngs_library:
infix = f"{wildcards.mapper}.delly2_genotype.{donor.dna_ngs_library.name}"
bcfs.append(f"work/{infix}/out/{infix}.bcf")
yield "bcf", bcfs

@dictify
def _get_output_files_merge_genotypes(self):
infix = "{mapper}.delly2.{library_name}"
work_files = {
"vcf": f"work/{infix}/out/{infix}.vcf.gz",
"vcf_md5": f"work/{infix}/out/{infix}.vcf.gz.md5",
"vcf_tbi": f"work/{infix}/out/{infix}.vcf.gz.tbi",
"vcf_tbi_md5": f"work/{infix}/out/{infix}.vcf.gz.tbi.md5",
}
yield from augment_work_dir_with_output_links(
work_files, self.get_log_file("merge_genotypes").values()
).items()
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from snakemake.io import expand, touch

from snappy_pipeline.utils import dictify, listify
from snappy_pipeline.workflows.gcnv.gcnv_common import GcnvCommonStepPart
from snappy_pipeline.workflows.common.gcnv.gcnv_common import GcnvCommonStepPart


class AnnotateGcMixin:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from snappy_pipeline.base import InvalidConfiguration, UnsupportedActionException
from snappy_pipeline.utils import dictify, flatten, listify
from snappy_pipeline.workflows.gcnv.gcnv_common import (
from snappy_pipeline.workflows.common.gcnv.gcnv_common import (
GcnvCommonStepPart,
InconsistentLibraryKitsWarning,
)
Expand Down
68 changes: 68 additions & 0 deletions snappy_pipeline/workflows/common/manta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Workflow step parts for Manta.
These are used in both ``sv_calling_targeted`` and ``sv_calling_wgs``.
"""

from snappy_pipeline.utils import dictify
from snappy_pipeline.workflows.abstract import BaseStepPart
from snappy_pipeline.workflows.abstract.common import (
ForwardResourceUsageMixin,
ForwardSnakemakeFilesMixin,
augment_work_dir_with_output_links,
)
from snappy_pipeline.workflows.common.sv_calling import (
SvCallingGetLogFileMixin,
SvCallingGetResultFilesMixin,
)
from snappy_wrappers.resource_usage import ResourceUsage


class MantaStepPart(
ForwardSnakemakeFilesMixin,
ForwardResourceUsageMixin,
SvCallingGetResultFilesMixin,
SvCallingGetLogFileMixin,
BaseStepPart,
):
"""Perform SV calling on exomes using Manta"""

name = "manta"
actions = ("run",)

resource_usage_dict = {
"run": ResourceUsage(
threads=16,
time="2-00:00:00",
memory=f"{int(3.75 * 1024 * 16)}M",
)
}

def __init__(self, parent):
super().__init__(parent)
#: Shortcuts from index NGS library name to Pedigree
self.index_ngs_library_to_pedigree = {}
for sheet in self.parent.shortcut_sheets:
self.index_ngs_library_to_pedigree.update(sheet.index_ngs_library_to_pedigree)

@dictify
def _get_input_files_run(self, wildcards):
ngs_mapping = self.parent.sub_workflows["ngs_mapping"]
bams = []
for donor in self.index_ngs_library_to_pedigree[wildcards.library_name].donors:
if donor.dna_ngs_library:
token = f"{wildcards.mapper}.{donor.dna_ngs_library.name}"
bams.append(ngs_mapping(f"output/{token}/out/{token}.bam"))
yield "bam", bams

@dictify
def _get_output_files_run(self):
infix = "{mapper}.delly2.{library_name}"
work_files = {
"vcf": f"work/{infix}/out/{infix}.vcf.gz",
"vcf_md5": f"work/{infix}/out/{infix}.vcf.gz.md5",
"vcf_tbi": f"work/{infix}/out/{infix}.vcf.gz.tbi",
"vcf_tbi_md5": f"work/{infix}/out/{infix}.vcf.gz.tbi.md5",
}
yield from augment_work_dir_with_output_links(
work_files, self.get_log_file("merge_genotypes").values()
).items()
59 changes: 59 additions & 0 deletions snappy_pipeline/workflows/common/sv_calling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Code shared by steps in ``sv_calling_{targeted,wgs}``"""

from snakemake.io import expand

from snappy_pipeline.utils import DictQuery, dictify, flatten, listify


class SvCallingGetResultFilesMixin:
"""Mixin that provides ``get_result_files()`` for SV calling steps"""

@listify
def get_result_files(self):
"""Return list of concrete output paths in ``output/``.
The implementation will return a list of all paths with prefix ``output/` that are
returned by ``self.get_output_files()`` for all actions in ``self.actions``.
"""
ngs_mapping_config = DictQuery(self.w_config).get("step_config/ngs_mapping")
for mapper in ngs_mapping_config["tools"]["dna"]:
# Get list of result path templates.
output_files_tmp = self.get_output_files(self.actions[-1])
if isinstance(output_files_tmp, dict):
output_files = output_files_tmp.values()
else:
output_files = output_files_tmp
result_paths_tpls = list(
filter(
lambda p: p.startswith("output/"),
flatten(output_files),
)
)
#: Generate all concrete output paths.
for path_tpl in result_paths_tpls:
for library_name in self.index_ngs_library_to_pedigree.keys():
yield from expand(path_tpl, mapper=[mapper], library_name=library_name)


class SvCallingGetLogFileMixin:
"""Mixin that provides ``get_log_files()`` for SV calling steps"""

@dictify
def get_log_file(self, action):
"""Return dict of log files in the "log" directory"""
_ = action
if action != "merge_genotypes":
infix = f"delly2_{action}"
else:
infix = "delly2"
prefix = f"work/{{mapper}}.{infix}.{{library_name}}/log/{{mapper}}.{infix}.{{library_name}}.sv_calling"
key_ext = (
("log", ".log"),
("conda_info", ".conda_info.txt"),
("conda_list", ".conda_list.txt"),
("wrapper", ".wrapper.py"),
("env_yaml", ".environment.yaml"),
)
for key, ext in key_ext:
yield key, prefix + ext
yield key + "_md5", prefix + ext + ".md5"
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@

from snappy_pipeline.utils import DictQuery, dictify, listify
from snappy_pipeline.workflows.abstract import BaseStep
from snappy_pipeline.workflows.gcnv.gcnv_build_model import BuildGcnvModelStepPart
from snappy_pipeline.workflows.common.gcnv.gcnv_build_model import BuildGcnvModelStepPart
from snappy_pipeline.workflows.ngs_mapping import NgsMappingWorkflow

#: Default configuration for the helper_gcnv_model_targeted schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@

from snappy_pipeline.utils import dictify, listify
from snappy_pipeline.workflows.abstract import BaseStep
from snappy_pipeline.workflows.gcnv.gcnv_build_model import BuildGcnvModelStepPart
from snappy_pipeline.workflows.common.gcnv.gcnv_build_model import BuildGcnvModelStepPart
from snappy_pipeline.workflows.ngs_mapping import NgsMappingWorkflow

#: Default configuration for the helper_gcnv_model_wgs schema
Expand Down
Loading

0 comments on commit 0601888

Please sign in to comment.