Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change varfish-annotator to mehari (#392) #393

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 35 additions & 35 deletions snappy_pipeline/workflows/varfish_export/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,67 +45,67 @@ rule varfish_export_write_pedigree_run:
wf.substep_dispatch("write_pedigree", "run", wildcards, output)


# Run varfish-annotator-cli annotate ------------------------------------------
# Run varfish-annotator-cli annotate-seqvars -----------------------------------


rule varfish_export_varfish_annotator_annotate:
rule varfish_export_mehari_annotate_seqvars:
input:
unpack(wf.get_input_files("varfish_annotator", "annotate")),
unpack(wf.get_input_files("mehari", "annotate_seqvars")),
output:
**wf.get_output_files("varfish_annotator", "annotate"),
threads: wf.get_resource("varfish_annotator", "annotate", "threads")
**wf.get_output_files("mehari", "annotate_seqvars"),
threads: wf.get_resource("mehari", "annotate_seqvars", "threads")
resources:
time=wf.get_resource("varfish_annotator", "annotate", "time"),
memory=wf.get_resource("varfish_annotator", "annotate", "memory"),
partition=wf.get_resource("varfish_annotator", "annotate", "partition"),
tmpdir=wf.get_resource("varfish_annotator", "annotate", "tmpdir"),
time=wf.get_resource("mehari", "annotate_seqvars", "time"),
memory=wf.get_resource("mehari", "annotate_seqvars", "memory"),
partition=wf.get_resource("mehari", "annotate_seqvars", "partition"),
tmpdir=wf.get_resource("mehari", "annotate_seqvars", "tmpdir"),
log:
**wf.get_log_file("varfish_annotator", "annotate"),
**wf.get_log_file("mehari", "annotate_seqvars"),
params:
**{"args": wf.get_params("varfish_annotator", "annotate")},
**{"args": wf.get_params("mehari", "annotate_seqvars")},
wrapper:
wf.wrapper_path("varfish_annotator/annotate")
wf.wrapper_path("mehari/annotate_seqvars")


# Run varfish-annotator-cli annotate-svs ---------------------------------------
# Run varfish-annotator-cli annotate-strucvars ---------------------------------


rule varfish_export_varfish_annotator_annotate_svs:
rule varfish_export_mehari_annotate_strucvars:
input:
unpack(wf.get_input_files("varfish_annotator", "annotate_svs")),
unpack(wf.get_input_files("mehari", "annotate_strucvars")),
output:
**wf.get_output_files("varfish_annotator", "annotate_svs"),
threads: wf.get_resource("varfish_annotator", "annotate_svs", "threads")
**wf.get_output_files("mehari", "annotate_strucvars"),
threads: wf.get_resource("mehari", "annotate_strucvars", "threads")
resources:
time=wf.get_resource("varfish_annotator", "annotate_svs", "time"),
memory=wf.get_resource("varfish_annotator", "annotate_svs", "memory"),
partition=wf.get_resource("varfish_annotator", "annotate_svs", "partition"),
tmpdir=wf.get_resource("varfish_annotator", "annotate_svs", "tmpdir"),
time=wf.get_resource("mehari", "annotate_strucvars", "time"),
memory=wf.get_resource("mehari", "annotate_strucvars", "memory"),
partition=wf.get_resource("mehari", "annotate_strucvars", "partition"),
tmpdir=wf.get_resource("mehari", "annotate_strucvars", "tmpdir"),
log:
**wf.get_log_file("varfish_annotator", "annotate_svs"),
**wf.get_log_file("mehari", "annotate_strucvars"),
params:
**{"args": wf.get_params("varfish_annotator", "annotate_svs")},
**{"args": wf.get_params("mehari", "annotate_strucvars")},
wrapper:
wf.wrapper_path("varfish_annotator/annotate_svs")
wf.wrapper_path("mehari/annotate_strucvars")


# Gather statistics about the alignment ---------------------------------------


rule varfish_export_varfish_annotator_bam_qc:
rule varfish_export_mehari_bam_qc:
input:
unpack(wf.get_input_files("varfish_annotator", "bam_qc")),
unpack(wf.get_input_files("mehari", "bam_qc")),
output:
**wf.get_output_files("varfish_annotator", "bam_qc"),
threads: wf.get_resource("varfish_annotator", "bam_qc", "threads")
**wf.get_output_files("mehari", "bam_qc"),
threads: wf.get_resource("mehari", "bam_qc", "threads")
resources:
time=wf.get_resource("varfish_annotator", "bam_qc", "time"),
memory=wf.get_resource("varfish_annotator", "bam_qc", "memory"),
partition=wf.get_resource("varfish_annotator", "bam_qc", "partition"),
tmpdir=wf.get_resource("varfish_annotator", "bam_qc", "tmpdir"),
time=wf.get_resource("mehari", "bam_qc", "time"),
memory=wf.get_resource("mehari", "bam_qc", "memory"),
partition=wf.get_resource("mehari", "bam_qc", "partition"),
tmpdir=wf.get_resource("mehari", "bam_qc", "tmpdir"),
log:
**wf.get_log_file("varfish_annotator", "bam_qc"),
**wf.get_log_file("mehari", "bam_qc"),
params:
**{"args": wf.get_params("varfish_annotator", "bam_qc")},
**{"args": wf.get_params("mehari", "bam_qc")},
wrapper:
wf.wrapper_path("varfish_annotator/bam_qc")
wf.wrapper_path("mehari/bam_qc")
72 changes: 33 additions & 39 deletions snappy_pipeline/workflows/varfish_export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,26 +121,19 @@
release: GRCh37 # REQUIRED: default 'GRCh37'
# Path to BED file with exons; used for reducing data to near-exon small variants.
path_exon_bed: null # REQUIRED: exon BED file to use
# Path to Jannovar RefSeq ``.ser`` file for annotation
path_refseq_ser: REQUIRED # REQUIRED: path to RefSeq .ser file
# Path to Jannovar ENSEMBL ``.ser`` file for annotation
path_ensembl_ser: REQUIRED # REQUIRED: path to ENSEMBL .ser file
# Path to VarFish annotator database file to use for annotating.
path_db: REQUIRED # REQUIRED: spath to varfish-annotator DB file to use
# Path to mehari database.
path_mehari_db: REQUIRED # REQUIRED: path to mehari database
"""


class VarfishAnnotatorAnnotateStepPart(VariantCallingGetLogFileMixin, BaseStepPart):
"""This step part is responsible for annotating the variants with VarFish Annotator"""
class MehariStepPart(VariantCallingGetLogFileMixin, BaseStepPart):
"""This step part is responsible for annotating the variants with Mehari"""

name = "varfish_annotator"
actions = ("annotate", "annotate_svs", "bam_qc")
name = "mehari"
actions = ("annotate_seqvars", "annotate_strucvars", "bam_qc")

def __init__(self, parent):
super().__init__(parent)
self.base_path_out = (
"work/{mapper}.{var_caller}.varfish_annotated.{index_ngs_library}/out/.done"
)
# Build shortcut from index library name to pedigree
self.index_ngs_library_to_pedigree = {}
for sheet in self.parent.shortcut_sheets:
Expand All @@ -159,7 +152,7 @@ def get_log_file(self, action: str) -> SnakemakeDictItemsGenerator:
self._validate_action(action)
prefix = (
"work/{mapper}.varfish_export.{index_ngs_library}/log/"
f"{{mapper}}.varfish_annotator_{action}.{{index_ngs_library}}"
f"{{mapper}}.mehari_{action}.{{index_ngs_library}}"
)
key_ext = (
("wrapper", ".wrapper.py"),
Expand Down Expand Up @@ -187,16 +180,16 @@ def get_resource_usage(self, action: str) -> ResourceUsage:
@listify
def get_result_files(self, action):
# Generate templates to the output paths from action's result files.
if action == "annotate":
raw_path_tpls = self._get_output_files_annotate().values()
elif action == "annotate_svs":
# Only annotate SVs if path to step for calling them is configured.
if action == "annotate_seqvars":
raw_path_tpls = self._get_output_files_annotate_seqvars().values()
elif action == "annotate_strucvars":
# Only annotate_seqvars SVs if path to step for calling them is configured.
if (
not self.parent.config["path_sv_calling_targeted"]
and not self.parent.config["path_sv_calling_wgs"]
):
return
raw_path_tpls = self._get_output_files_annotate_svs().values()
raw_path_tpls = self._get_output_files_annotate_strucvars().values()
elif action == "bam_qc":
raw_path_tpls = self._get_output_files_bam_qc().values()
# Filter the templates to the paths in the output directory.
Expand All @@ -205,7 +198,8 @@ def get_result_files(self, action):
# Create concrete paths for all pedigrees in the sample sheet.
index_ngs_libraries = self._get_index_ngs_libraries(
require_consistent_pedigree_kits=(
bool(self.parent.config["path_sv_calling_targeted"]) and (action == "annotate_svs")
bool(self.parent.config["path_sv_calling_targeted"])
and (action == "annotate_strucvars")
)
)
kwargs = {
Expand Down Expand Up @@ -249,7 +243,7 @@ def _is_pedigree_good(self, pedigree: Pedigree) -> bool:
return not msg

@dictify
def _get_input_files_annotate(self, wildcards):
def _get_input_files_annotate_seqvars(self, wildcards):
yield "ped", "work/write_pedigree.{index_ngs_library}/out/{index_ngs_library}.ped"

variant_calling = self.parent.sub_workflows["variant_calling"]
Expand All @@ -271,13 +265,13 @@ def _get_input_files_annotate(self, wildcards):
yield "vcf", vcfs

@dictify
def _get_output_files_annotate(self):
def _get_output_files_annotate_seqvars(self):
# Generate paths in "work/" directory
prefix = (
"work/{mapper}.varfish_export.{index_ngs_library}/out/"
"{mapper}.varfish_annotator_annotate.{index_ngs_library}"
"{mapper}.mehari_annotate_seqvars.{index_ngs_library}"
)
work_paths = { # annotate will write out PED file
work_paths = { # annotate_seqvars will write out PED file
"ped": f"{prefix}.ped",
"ped_md5": f"{prefix}.ped.md5",
"gts": f"{prefix}.gts.tsv.gz",
Expand All @@ -289,10 +283,12 @@ def _get_output_files_annotate(self):
# Generate paths in "output/" directory
yield "output_links", [
re.sub(r"^work/", "output/", work_path)
for work_path in chain(work_paths.values(), self.get_log_file("annotate").values())
for work_path in chain(
work_paths.values(), self.get_log_file("annotate_seqvars").values()
)
]

def _get_params_annotate(self, wildcards: Wildcards) -> typing.Dict[str, typing.Any]:
def _get_params_annotate_seqvars(self, wildcards: Wildcards) -> typing.Dict[str, typing.Any]:
pedigree = self.index_ngs_library_to_pedigree[wildcards.index_ngs_library]
for donor in pedigree.donors:
if (
Expand All @@ -303,7 +299,7 @@ def _get_params_annotate(self, wildcards: Wildcards) -> typing.Dict[str, typing.
return {"step_name": "varfish_export"}

@dictify
def _get_input_files_annotate_svs(self, wildcards):
def _get_input_files_annotate_strucvars(self, wildcards):
yield "ped", "work/write_pedigree.{index_ngs_library}/out/{index_ngs_library}.ped"

if self.parent.config["path_sv_calling_targeted"]:
Expand Down Expand Up @@ -387,28 +383,28 @@ def _get_input_files_annotate_svs(self, wildcards):
yield "vcf_cov", cov_vcfs

@dictify
def _get_output_files_annotate_svs(self):
def _get_output_files_annotate_strucvars(self):
prefix = (
"work/{mapper}.varfish_export.{index_ngs_library}/out/"
"{mapper}.varfish_annotator_annotate_svs.{index_ngs_library}"
"{mapper}.mehari_annotate_strucvars.{index_ngs_library}"
)
work_paths = {
"gts": f"{prefix}.gts.tsv.gz",
"gts_md5": f"{prefix}.gts.tsv.gz.md5",
"feature_effects": f"{prefix}.feature-effects.tsv.gz",
"feature_effects_md5": f"{prefix}.feature-effects.tsv.gz.md5",
"db_infos": f"{prefix}.db-infos.tsv.gz",
"db_infos_md5": f"{prefix}.db-infos.tsv.gz.md5",
}
yield from work_paths.items()
# Generate paths in "output/" directory
yield "output_links", [
re.sub(r"^work/", "output/", work_path)
for work_path in chain(work_paths.values(), self.get_log_file("annotate_svs").values())
for work_path in chain(
work_paths.values(), self.get_log_file("annotate_strucvars").values()
)
]

#: Alias the get params function.
_get_params_annotate_svs = _get_params_annotate
_get_params_annotate_strucvars = _get_params_annotate_seqvars

@dictify
def _get_input_files_bam_qc(self, wildcards):
Expand Down Expand Up @@ -439,7 +435,7 @@ def _get_input_files_bam_qc(self, wildcards):
def _get_output_files_bam_qc(self) -> SnakemakeDictItemsGenerator:
prefix = (
"work/{mapper}.varfish_export.{index_ngs_library}/out/"
"{mapper}.varfish_annotator_bam_qc.{index_ngs_library}"
"{mapper}.mehari_bam_qc.{index_ngs_library}"
)
work_paths = {
"bam_qc": f"{prefix}.bam-qc.tsv.gz",
Expand Down Expand Up @@ -502,9 +498,7 @@ def __init__(self, workflow, config, config_lookup_paths, config_paths, workdir)
)

# Register sub step classes so the sub steps are available
self.register_sub_step_classes(
(WritePedigreeStepPart, VarfishAnnotatorAnnotateStepPart, LinkOutStepPart)
)
self.register_sub_step_classes((WritePedigreeStepPart, MehariStepPart, LinkOutStepPart))

# Register sub workflows
self.register_sub_workflow("variant_calling", self.config["path_variant_calling"])
Expand Down Expand Up @@ -564,8 +558,8 @@ def get_result_files(self):

We will process all primary DNA libraries and perform joint calling within pedigrees
"""
for action in self.sub_steps["varfish_annotator"].actions:
yield from self.sub_steps["varfish_annotator"].get_result_files(action)
for action in self.sub_steps["mehari"].actions:
yield from self.sub_steps["mehari"].get_result_files(action)

def check_config(self):
self.ensure_w_config(
Expand Down
Loading