Skip to content

Commit

Permalink
fix: integrate changest to worker ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe committed Jun 6, 2024
1 parent e209961 commit cfc17a9
Show file tree
Hide file tree
Showing 6 changed files with 685 additions and 207 deletions.
33 changes: 33 additions & 0 deletions cases/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import typing
import uuid as uuid_object

Expand Down Expand Up @@ -171,3 +172,35 @@ def write_pedigree_as_plink(pedigree: Pedigree, outputf: typing.TextIO, family_n
"\t".join(row),
file=outputf,
)


def write_id_mapping_json(
identifier_map: typing.Dict[str, typing.Dict[str, str]],
pedigree: Pedigree,
outputf: typing.TextIO,
):
"""Write a pedigree as a PLINK file.
:param identifier_map: Mapping from file path to dict mapping name in PED to name in VCF.
:param pedigree: The pedigree to write.
:param outputf: The output file.
"""
mappings = []
for path, ped_to_vcf in identifier_map.items():
mappings.append(
{
"path": path,
"entries": [
{
"src": vcf_name,
"dst": ped_name,
}
for ped_name, vcf_name in ped_to_vcf.items()
],
}
)
json.dump(
obj={"mappings": mappings},
fp=outputf,
indent=2,
)
171 changes: 102 additions & 69 deletions cases_import/models/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@
from projectroles.models import Project
import pydantic

from cases.models import Disease, Individual, Pedigree, PhenotypicFeature, write_pedigree_as_plink
from cases.models import (
Disease,
Individual,
Pedigree,
PhenotypicFeature,
write_id_mapping_json,
write_pedigree_as_plink,
)
from cases_files.models import (
AbstractFile,
IndividualExternalFile,
Expand Down Expand Up @@ -665,6 +672,7 @@ def _import_ngsbits_qc_mappingqc(
class VariantImportExecutorBase(FileImportExecutorBase):
"""Base class for variant import."""

max_vcf_files: typing.Optional[int]
var_type: str

def __init__(self, case: Case, bgjob: CaseImportBackgroundJob):
Expand All @@ -674,8 +682,9 @@ def __init__(self, case: Case, bgjob: CaseImportBackgroundJob):
self.case = case
#: The background job, used for logging and getting unique internal paths
self.bgjob = bgjob
#: The `FileSystemOptions` for the internal storage.
# Shortcut to storage settings
storage_settings = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE
#: The `FileSystemOptions` for the internal storage
self.internal_fs_options = FileSystemOptions(
protocol="s3",
host=storage_settings.host,
Expand All @@ -698,7 +707,7 @@ def run(self) -> typing.List[PedigreeInternalFile]:
else:
return []

def copy_external_internal(self) -> typing.Optional[PedigreeInternalFile]:
def copy_external_internal(self) -> typing.List[PedigreeInternalFile]:
"""Copy the external VCF file to the internal storage.
:return: the corresponding `PedigreeInternalFile` object
Expand All @@ -712,56 +721,71 @@ def copy_external_internal(self) -> typing.Optional[PedigreeInternalFile]:
file_attributes__variant_type=self.var_type,
mimetype="text/plain+x-bgzip+x-variant-call-format",
)
if extfile_qs.count() > 1:
if self.max_vcf_files is not None and extfile_qs.count() > self.max_vcf_files:
raise ValueError(
f"expected at most one {self.var_type} VCF file, found {extfile_qs.count()}"
f"expected at most {self.max_vcf_files} {self.var_type} VCF file(s), found {extfile_qs.count()}"
)
elif extfile_qs.count() == 0:
return None
extfile = extfile_qs.first()

# Copy the file from the external to the internal storage.
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
path_int = (
f"case-data/{uuid_frag(self.case.sodar_uuid)}/{self.bgjob.sodar_uuid}/"
f"{self.var_type}/external-copy.vcf.gz"
)
path_int_full = f"s3://{bucket}/{path_int}"
with (
self.external_fs.open(extfile.path, "rb") as inputf,
self.internal_fs.open(path_int_full, "wb") as outputf,
):
shutil.copyfileobj(inputf, outputf)

# Create the `PedigreeInternalFile` record after copying is complete.
return PedigreeInternalFile.objects.create(
case=self.case,
path=path_int,
genomebuild=extfile.genomebuild,
mimetype=extfile.mimetype,
file_attributes=extfile.file_attributes,
identifier_map=extfile.identifier_map,
# is copy of the original VCF file
designation=f"variant_calls/{self.var_type}/orig-copy",
# checksum=extfile.checksum, # TODO
pedigree=self.case.pedigree_obj,
)
result = []
for idx, extfile in enumerate(extfile_qs):
# Copy the file from the external to the internal storage.
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
path_int = (
f"case-data/{uuid_frag(self.case.sodar_uuid)}/{self.bgjob.sodar_uuid}/"
f"{self.var_type}/external-copy-{idx}.vcf.gz"
)
path_int_full = f"s3://{bucket}/{path_int}"
path_ext = extfile.path
with (
self.external_fs.open(path_ext, "rb") as inputf,
self.internal_fs.open(path_int_full, "wb") as outputf,
):
shutil.copyfileobj(inputf, outputf)
# Create the `PedigreeInternalFile` record after copying is complete.
result.append(
PedigreeInternalFile.objects.create(
case=self.case,
path=path_int,
genomebuild=extfile.genomebuild,
mimetype=extfile.mimetype,
file_attributes=extfile.file_attributes,
identifier_map=extfile.identifier_map,
# is copy of the original VCF file
designation=f"variant_calls/{self.var_type}/orig-copy",
# checksum=extfile.checksum, # TODO
pedigree=self.case.pedigree_obj,
)
)
return result

def annotate_outer(self, vcf_on_s3: PedigreeExternalFile) -> typing.List[PedigreeExternalFile]:
def annotate_outer(
self, vcf_on_s3: typing.List[PedigreeExternalFile]
) -> typing.List[PedigreeExternalFile]:
"""Annotate the VCF file from the internal storage.
Will write temporary PLINK PED file and then call the actual annotation functin.
"""
with tempfile.NamedTemporaryFile(mode="w+t") as tmpf:
write_pedigree_as_plink(self.case.pedigree_obj, tmpf)
tmpf.flush()
return self.annotate(vcf_on_s3, path_ped=tmpf.name)
with (
tempfile.NamedTemporaryFile(mode="w+t", suffix=".ped") as tmpf_ped,
tempfile.NamedTemporaryFile(mode="w+t", suffix=".json") as tmpf_id_map,
):
write_pedigree_as_plink(self.case.pedigree_obj, tmpf_ped)
tmpf_ped.flush()
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
write_id_mapping_json(
{f"{bucket}/{entry.path}": entry.identifier_map for entry in vcf_on_s3},
self.case.pedigree_obj,
tmpf_id_map,
)
tmpf_id_map.flush()
return self.annotate(vcf_on_s3, path_ped=tmpf_ped.name, path_id_map=tmpf_id_map.name)

def annotate(
self, vcf_on_s3: PedigreeExternalFile, path_ped: str
self, vcf_on_s3: typing.List[PedigreeExternalFile], path_ped: str, path_id_map: str
) -> typing.List[PedigreeExternalFile]:
_ = vcf_on_s3
_ = path_ped
_ = path_id_map
raise NotImplementedError

def run_worker(self, args: list[str], env: typing.Dict[str, str] | None = None):
Expand All @@ -776,6 +800,7 @@ def run_worker(self, args: list[str], env: typing.Dict[str, str] | None = None):
class SeqvarsImportExecutor(VariantImportExecutorBase):
"""Run the import of sequence variant import."""

max_vcf_files = 1
var_type = "seqvars"

def run(self) -> typing.List[PedigreeInternalFile]:
Expand All @@ -791,10 +816,12 @@ def run(self) -> typing.List[PedigreeInternalFile]:
return int_on_s3

def annotate(
self, vcf_on_s3: PedigreeExternalFile, path_ped: str
self, vcf_on_s3: typing.List[PedigreeExternalFile], path_ped: str, path_id_map: str
) -> typing.List[PedigreeExternalFile]:
"""Implementation of sequence variant annotation."""
# Path create path of the new fiel.
assert len(vcf_on_s3) == 1, "ensured earlier"

# Path create path of the new file.
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
path_out = (
f"case-data/{uuid_frag(self.case.sodar_uuid)}/{self.bgjob.sodar_uuid}/"
Expand All @@ -809,16 +836,20 @@ def annotate(
"--case-uuid",
str(self.case.sodar_uuid),
"--genomebuild",
vcf_on_s3.genomebuild,
vcf_on_s3[0].genomebuild,
"--path-mehari-db",
f"{settings.WORKER_DB_PATH}/mehari",
"--path-ped",
path_ped,
"--path-in",
vcf_on_s3.path,
f"{bucket}/{vcf_on_s3[0].path}",
"--path-out",
f"{bucket}/{path_out}",
"--id-mapping",
f"@{path_id_map}",
]
# if settings.DEBUG: # XXX remove this
# args += ["--max-var-count", "1000"]
# Setup environment so the worker can access the internal S3 storage.
endpoint_host = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.host
endpoint_port = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.port
Expand All @@ -828,7 +859,7 @@ def annotate(
"AWS_ACCESS_KEY_ID": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.access_key,
"AWS_SECRET_ACCESS_KEY": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.secret_key,
"AWS_ENDPOINT_URL": f"http://{endpoint_host}:{endpoint_port}",
# "AWS_REGION": "us-east-1",
"AWS_REGION": "us-east-1",
}
# Actually execute the worker.
self.run_worker(args=args, env=env)
Expand All @@ -837,9 +868,10 @@ def annotate(
PedigreeInternalFile.objects.create(
case=self.case,
path=f"{path_out}{suffix}",
genomebuild=vcf_on_s3.genomebuild,
genomebuild=vcf_on_s3[0].genomebuild,
mimetype=mimetype,
identifier_map=vcf_on_s3.identifier_map,
# NB: no identifier map as ingest fixed it
identifier_map={},
designation=designation,
file_attributes={},
# checksum=extfile.checksum, # TODO
Expand All @@ -863,21 +895,20 @@ def prefilter_seqvars_outer(self, ingested_on_s3: PedigreeInternalFile):
"""Writes out the prefilter configuration JSON to file and then calls the actual
prefiltration.
"""
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
with tempfile.NamedTemporaryFile(mode="w+t") as tmpf:
configs: list[PrefilterConfig] = settings.VARFISH_CASE_IMPORT_SEQVARS_PREFILTER_CONFIGS
out_lst = []
for idx, config in enumerate(configs):
dirname = os.path.dirname(ingested_on_s3.path)
prefilter_path = f"{dirname}/prefiltered-{idx}.vcf.gz"
out_lst.append(
PrefilterConfig(
**{
**config.dict(),
"prefilter_path": prefilter_path,
}
)
prefilter_path = f"{bucket}/{dirname}/prefiltered-{idx}.vcf.gz"
config = PrefilterConfig(
**{
**config.dict(),
"prefilter_path": prefilter_path,
}
)
json.dump([obj.dict() for obj in out_lst], tmpf)
print(config.model_dump_json(), file=tmpf)
tmpf.flush()
self.prefilter_seqvars(
ingested_on_s3=ingested_on_s3, configs=out_lst, path_config=tmpf.name
Expand All @@ -888,13 +919,14 @@ def prefilter_seqvars(
):
"""Run prefiltration of sequence variants."""
# Create arguments to use.
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
args = [
"seqvars",
"prefilter",
"--params",
f"@{path_config}",
"--path-in",
ingested_on_s3.path,
f"{bucket}/{ingested_on_s3.path}",
]
# Setup environment so the worker can access the internal S3 storage.
endpoint_host = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.host
Expand All @@ -905,7 +937,7 @@ def prefilter_seqvars(
"AWS_ACCESS_KEY_ID": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.access_key,
"AWS_SECRET_ACCESS_KEY": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.secret_key,
"AWS_ENDPOINT_URL": f"http://{endpoint_host}:{endpoint_port}",
# "AWS_REGION": "us-east-1",
"AWS_REGION": "us-east-1",
}
# Actually execute the worker.
self.run_worker(args=args, env=env)
Expand Down Expand Up @@ -943,12 +975,15 @@ def prefilter_seqvars(
class StrucvarsImportExecutor(VariantImportExecutorBase):
"""Run the import of structural variant import."""

max_vcf_files = 10
var_type = "strucvars"

def annotate(
self, vcf_on_s3: PedigreeExternalFile, path_ped: str
self, vcf_on_s3: typing.List[PedigreeExternalFile], path_ped: str, path_id_map: str
) -> typing.List[PedigreeExternalFile]:
"""Implementation of structural variant annotation."""
assert len(vcf_on_s3) > 0, "ensured earlier"

# Path create path of the new fiel.
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
path_out = (
Expand All @@ -964,16 +999,13 @@ def annotate(
"--case-uuid",
str(self.case.sodar_uuid),
"--genomebuild",
vcf_on_s3.genomebuild,
"--path-mehari-db",
f"{settings.WORKER_DB_PATH}/mehari",
vcf_on_s3[0].genomebuild,
"--path-ped",
path_ped,
"--path-in",
vcf_on_s3.path,
"--path-out",
f"{bucket}/{path_out}",
]
for entry in vcf_on_s3:
args += ["--path-in", f"{bucket}/{entry.path}"]
args += ["--path-out", f"{bucket}/{path_out}", "--id-mapping", f"@{path_id_map}"]
# Setup environment so the worker can access the internal S3 storage.
endpoint_host = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.host
endpoint_port = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.port
Expand All @@ -983,7 +1015,7 @@ def annotate(
"AWS_ACCESS_KEY_ID": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.access_key,
"AWS_SECRET_ACCESS_KEY": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.secret_key,
"AWS_ENDPOINT_URL": f"http://{endpoint_host}:{endpoint_port}",
# "AWS_REGION": "us-east-1",
"AWS_REGION": "us-east-1",
}
# Actually execute the worker.
self.run_worker(args=args, env=env)
Expand All @@ -992,9 +1024,10 @@ def annotate(
PedigreeInternalFile.objects.create(
case=self.case,
path=f"{path_out}{suffix}",
genomebuild=vcf_on_s3.genomebuild,
genomebuild=vcf_on_s3[0].genomebuild,
mimetype=mimetype,
identifier_map=vcf_on_s3.identifier_map,
# NB: no identifier map as ingest fixed it
identifier_map={},
designation=designation,
file_attributes={},
# checksum=extfile.checksum, # TODO
Expand Down
Loading

0 comments on commit cfc17a9

Please sign in to comment.