Skip to content

Commit

Permalink
feat: integration of worker "strucvars ingest" (#1190)
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe authored Oct 27, 2023
1 parent ead404a commit 79cc5e7
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 57 deletions.
188 changes: 152 additions & 36 deletions cases_import/models/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,10 @@ def _import_ngsbits_qc_mappingqc(
}


class SeqvarImportExecutor(FileImportExecutorBase):
"""Run the import of sequence variant import."""
class VariantImportExecutorBase(FileImportExecutorBase):
"""Base class for variant import."""

var_type: str

def __init__(self, case: Case, bgjob: CaseImportBackgroundJob):
super().__init__(case.project)
Expand All @@ -684,32 +686,35 @@ def __init__(self, case: Case, bgjob: CaseImportBackgroundJob):
#: The `FileSystemWrapper` for the internal storage.
self.internal_fs = FileSystemWrapper(self.internal_fs_options)

def run(self):
"""Perform the import."""
ext_vcf_on_s3 = self._copy_external_internal()
def run(self) -> typing.List[PedigreeInternalFile]:
"""Perform the import.
:returns: the `PedigreeInternalFile` objects resulting from the import
"""
ext_vcf_on_s3 = self.copy_external_internal()
if ext_vcf_on_s3:
int_on_s3 = self._annotate(ext_vcf_on_s3)
int_vcf_on_s3 = [
obj for obj in int_on_s3 if obj.designation == "variant_calls/seqvars/ingested-vcf"
]
if int_vcf_on_s3:
self._prefilter(int_vcf_on_s3[0])

def _copy_external_internal(self) -> typing.Optional[PedigreeInternalFile]:
return self.annotate_outer(ext_vcf_on_s3)
else:
return []

def copy_external_internal(self) -> typing.Optional[PedigreeInternalFile]:
"""Copy the external VCF file to the internal storage.
:return: whether a file was copied
:return: the corresponding `PedigreeInternalFile` object
:raises ValueError: if more than one file is found
"""
# Find files with the correct designation, variant_type, and mimetype; ensure that there
# is only one such file and bail out otherwise.
extfile_qs = PedigreeExternalFile.objects.filter(
pedigree=self.case.pedigree_obj,
designation=ExternalFileDesignation.VARIANT_CALLS.value,
file_attributes__variant_type=self.var_type,
mimetype="text/plain+x-bgzip+x-variant-call-format",
)
if extfile_qs.count() > 1:
raise ValueError(f"expected at most one seqvar VCF file, found {extfile_qs.count()}")
raise ValueError(
f"expected at most one {self.var_type} VCF file, found {extfile_qs.count()}"
)
elif extfile_qs.count() == 0:
return None
extfile = extfile_qs.first()
Expand All @@ -718,7 +723,7 @@ def _copy_external_internal(self) -> typing.Optional[PedigreeInternalFile]:
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
path_int = (
f"case-data/{uuid_frag(self.case.sodar_uuid)}/{self.bgjob.sodar_uuid}/"
"seqvar/external-copy.vcf.gz"
f"{self.var_type}/external-copy.vcf.gz"
)
path_int_full = f"s3://{bucket}/{path_int}"
with (
Expand All @@ -735,27 +740,64 @@ def _copy_external_internal(self) -> typing.Optional[PedigreeInternalFile]:
mimetype=extfile.mimetype,
file_attributes=extfile.file_attributes,
identifier_map=extfile.identifier_map,
# is copy of the original seqvar VCF file
designation="variant_calls/seqvars/orig-copy",
# is copy of the original VCF file
designation=f"variant_calls/{self.var_type}/orig-copy",
# checksum=extfile.checksum, # TODO
pedigree=self.case.pedigree_obj,
)

def _annotate(self, vcf_on_s3: PedigreeExternalFile) -> typing.List[PedigreeExternalFile]:
"""Annotate the VCF file from the internal storage."""
def annotate_outer(self, vcf_on_s3: PedigreeExternalFile) -> typing.List[PedigreeExternalFile]:
"""Annotate the VCF file from the internal storage.
Will write temporary PLINK PED file and then call the actual annotation functin.
"""
with tempfile.NamedTemporaryFile(mode="w+t") as tmpf:
write_pedigree_as_plink(self.case.pedigree_obj, tmpf)
tmpf.flush()
return self._annotate_inner(vcf_on_s3, path_ped=tmpf.name)
return self.annotate(vcf_on_s3, path_ped=tmpf.name)

def annotate(
self, vcf_on_s3: PedigreeExternalFile, path_ped: str
) -> typing.List[PedigreeExternalFile]:
_ = vcf_on_s3
_ = path_ped
raise NotImplementedError

def run_worker(self, args: list[str], env: typing.Dict[str, str] | None = None):
"""Run the worker with the given arguments.
The worker will create a new VCF file and a TBI file.
"""
cmd = [settings.WORKER_EXE_PATH, *args]
subprocess.check_call(cmd, env=env)


class SeqvarsImportExecutor(VariantImportExecutorBase):
"""Run the import of sequence variant import."""

var_type = "seqvars"

def _annotate_inner(
def run(self) -> typing.List[PedigreeInternalFile]:
"""Override superclass behaviour to also prefilter the VCF file."""
int_on_s3 = super().run()
int_vcf_on_s3 = [
obj
for obj in int_on_s3
if obj.designation == f"variant_calls/{self.var_type}/ingested-vcf"
]
if int_vcf_on_s3:
self.prefilter_seqvars_outer(int_vcf_on_s3[0])
return int_on_s3

def annotate(
self, vcf_on_s3: PedigreeExternalFile, path_ped: str
) -> typing.List[PedigreeExternalFile]:
"""Implementation of sequence variant annotation."""
# Path create path of the new fiel.
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
path_out = (
f"case-data/{uuid_frag(self.case.sodar_uuid)}/{self.bgjob.sodar_uuid}/"
"seqvar/ingested.vcf.gz"
"seqvars/ingested.vcf.gz"
)
# Create arguments to use.
args = [
Expand Down Expand Up @@ -788,7 +830,7 @@ def _annotate_inner(
# "AWS_REGION": "us-east-1",
}
# Actually execute the worker.
self._run_worker(args=args, env=env)
self.run_worker(args=args, env=env)
# Create the `PedigreeInternalFile` record after ingest is complete.
return [
PedigreeInternalFile.objects.create(
Expand Down Expand Up @@ -816,9 +858,12 @@ def _annotate_inner(
)
]

def _prefilter(self, ingested_on_s3: PedigreeInternalFile):
def prefilter_seqvars_outer(self, ingested_on_s3: PedigreeInternalFile):
"""Writes out the prefilter configuration JSON to file and then calls the actual
prefiltration.
"""
with tempfile.NamedTemporaryFile(mode="w+t") as tmpf:
configs: list[PrefilterConfig] = settings.VARFISH_CASE_IMPORT_SEQVAR_PREFILTER_CONFIGS
configs: list[PrefilterConfig] = settings.VARFISH_CASE_IMPORT_SEQVARS_PREFILTER_CONFIGS
out_lst = []
for idx, config in enumerate(configs):
dirname = os.path.dirname(ingested_on_s3.path)
Expand All @@ -833,13 +878,14 @@ def _prefilter(self, ingested_on_s3: PedigreeInternalFile):
)
json.dump([obj.dict() for obj in out_lst], tmpf)
tmpf.flush()
self._prefilter_inner(
self.prefilter_seqvars(
ingested_on_s3=ingested_on_s3, configs=out_lst, path_config=tmpf.name
)

def _prefilter_inner(
def prefilter_seqvars(
self, ingested_on_s3: PedigreeInternalFile, configs: list[PrefilterConfig], path_config: str
):
"""Run prefiltration of sequence variants."""
# Create arguments to use.
args = [
"seqvars",
Expand All @@ -861,7 +907,7 @@ def _prefilter_inner(
# "AWS_REGION": "us-east-1",
}
# Actually execute the worker.
self._run_worker(args=args, env=env)
self.run_worker(args=args, env=env)
# Create the `PedigreeInternalFile` records after prefilter is complete.
return [
PedigreeInternalFile.objects.create(
Expand Down Expand Up @@ -892,13 +938,80 @@ def _prefilter_inner(
for config in configs
]

def _run_worker(self, args: list[str], env: typing.Dict[str, str] | None = None):
"""Run the worker with the given arguments.

The worker will create a new VCF file and a TBI file.
"""
cmd = [settings.WORKER_EXE_PATH, *args]
subprocess.check_call(cmd, env=env)
class StrucvarsImportExecutor(VariantImportExecutorBase):
"""Run the import of structural variant import."""

var_type = "strucvars"

def annotate(
self, vcf_on_s3: PedigreeExternalFile, path_ped: str
) -> typing.List[PedigreeExternalFile]:
"""Implementation of structural variant annotation."""
# Path create path of the new fiel.
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
path_out = (
f"case-data/{uuid_frag(self.case.sodar_uuid)}/{self.bgjob.sodar_uuid}/"
"strucvars/ingested.vcf.gz"
)
# Create arguments to use.
args = [
"strucvars",
"ingest",
"--file-date",
timezone.now().strftime("%Y%m%d"),
"--case-uuid",
str(self.case.sodar_uuid),
"--genomebuild",
vcf_on_s3.genomebuild,
"--path-mehari-db",
f"{settings.WORKER_DB_PATH}/mehari",
"--path-ped",
path_ped,
"--path-in",
vcf_on_s3.path,
"--path-out",
f"{bucket}/{path_out}",
]
# Setup environment so the worker can access the internal S3 storage.
endpoint_host = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.host
endpoint_port = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.port
env = {
**dict(os.environ.items()),
"LC_ALL": "C",
"AWS_ACCESS_KEY_ID": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.access_key,
"AWS_SECRET_ACCESS_KEY": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.secret_key,
"AWS_ENDPOINT_URL": f"http://{endpoint_host}:{endpoint_port}",
# "AWS_REGION": "us-east-1",
}
# Actually execute the worker.
self.run_worker(args=args, env=env)
# Create the `PedigreeInternalFile` record after ingest is complete.
return [
PedigreeInternalFile.objects.create(
case=self.case,
path=f"{path_out}{suffix}",
genomebuild=vcf_on_s3.genomebuild,
mimetype=mimetype,
identifier_map=vcf_on_s3.identifier_map,
designation=designation,
file_attributes={},
# checksum=extfile.checksum, # TODO
pedigree=self.case.pedigree_obj,
)
for mimetype, designation, suffix in (
(
"text/plain+x-bgzip+x-variant-call-format",
"variant_calls/strucvars/ingested-vcf",
"",
),
(
"application/octet-stream+x-tabix-tbi-index",
"variant_calls/strucvars/ingested-tbi",
".tbi",
),
)
]


class CaseImportBackgroundJobExecutor:
Expand Down Expand Up @@ -1246,8 +1359,11 @@ def _run_qc_file_import(self, case: Case):

def _run_seqvars_import(self, case: Case):
self.caseimportbackgroundjob.add_log_entry("running sequence variant import...")
SeqvarImportExecutor(case, bgjob=self.caseimportbackgroundjob).run()
SeqvarsImportExecutor(case, bgjob=self.caseimportbackgroundjob).run()
self.caseimportbackgroundjob.add_log_entry("... done with sequence variant import")
self.caseimportbackgroundjob.add_log_entry("running structural variant import...")
StrucvarsImportExecutor(case, bgjob=self.caseimportbackgroundjob).run()
self.caseimportbackgroundjob.add_log_entry("... done with structural variant import")

def _run_strucvars_import(self, case: Case):
self.caseimportbackgroundjob.add_log_entry("strucvars annotation not implemented yet")
Expand Down
3 changes: 3 additions & 0 deletions cases_import/tests/data/singleton_strucvars.yaml
Git LFS file not shown
Loading

0 comments on commit 79cc5e7

Please sign in to comment.