Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: integration of worker latest version #1680

Merged
merged 4 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions cases/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import typing
import uuid as uuid_object

Expand Down Expand Up @@ -171,3 +172,35 @@ def write_pedigree_as_plink(pedigree: Pedigree, outputf: typing.TextIO, family_n
"\t".join(row),
file=outputf,
)


def write_id_mapping_json(
identifier_map: typing.Dict[str, typing.Dict[str, str]],
pedigree: Pedigree,
outputf: typing.TextIO,
):
"""Write a pedigree as a PLINK file.

:param identifier_map: Mapping from file path to dict mapping name in PED to name in VCF.
:param pedigree: The pedigree to write.
:param outputf: The output file.
"""
mappings = []
for path, ped_to_vcf in identifier_map.items():
mappings.append(
{
"path": path,
"entries": [
{
"src": vcf_name,
"dst": ped_name,
}
for ped_name, vcf_name in ped_to_vcf.items()
],
}
)
json.dump(
obj={"mappings": mappings},
fp=outputf,
indent=2,
)
171 changes: 102 additions & 69 deletions cases_import/models/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@
from projectroles.models import Project
import pydantic

from cases.models import Disease, Individual, Pedigree, PhenotypicFeature, write_pedigree_as_plink
from cases.models import (
Disease,
Individual,
Pedigree,
PhenotypicFeature,
write_id_mapping_json,
write_pedigree_as_plink,
)
from cases_files.models import (
AbstractFile,
IndividualExternalFile,
Expand Down Expand Up @@ -665,6 +672,7 @@ def _import_ngsbits_qc_mappingqc(
class VariantImportExecutorBase(FileImportExecutorBase):
"""Base class for variant import."""

max_vcf_files: typing.Optional[int]
var_type: str

def __init__(self, case: Case, bgjob: CaseImportBackgroundJob):
Expand All @@ -674,8 +682,9 @@ def __init__(self, case: Case, bgjob: CaseImportBackgroundJob):
self.case = case
#: The background job, used for logging and getting unique internal paths
self.bgjob = bgjob
#: The `FileSystemOptions` for the internal storage.
# Shortcut to storage settings
storage_settings = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE
#: The `FileSystemOptions` for the internal storage
self.internal_fs_options = FileSystemOptions(
protocol="s3",
host=storage_settings.host,
Expand All @@ -698,7 +707,7 @@ def run(self) -> typing.List[PedigreeInternalFile]:
else:
return []

def copy_external_internal(self) -> typing.Optional[PedigreeInternalFile]:
def copy_external_internal(self) -> typing.List[PedigreeInternalFile]:
"""Copy the external VCF file to the internal storage.

:return: the corresponding `PedigreeInternalFile` object
Expand All @@ -712,56 +721,71 @@ def copy_external_internal(self) -> typing.Optional[PedigreeInternalFile]:
file_attributes__variant_type=self.var_type,
mimetype="text/plain+x-bgzip+x-variant-call-format",
)
if extfile_qs.count() > 1:
if self.max_vcf_files is not None and extfile_qs.count() > self.max_vcf_files:
raise ValueError(
f"expected at most one {self.var_type} VCF file, found {extfile_qs.count()}"
f"expected at most {self.max_vcf_files} {self.var_type} VCF file(s), found {extfile_qs.count()}"
)
elif extfile_qs.count() == 0:
return None
extfile = extfile_qs.first()

# Copy the file from the external to the internal storage.
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
path_int = (
f"case-data/{uuid_frag(self.case.sodar_uuid)}/{self.bgjob.sodar_uuid}/"
f"{self.var_type}/external-copy.vcf.gz"
)
path_int_full = f"s3://{bucket}/{path_int}"
with (
self.external_fs.open(extfile.path, "rb") as inputf,
self.internal_fs.open(path_int_full, "wb") as outputf,
):
shutil.copyfileobj(inputf, outputf)

# Create the `PedigreeInternalFile` record after copying is complete.
return PedigreeInternalFile.objects.create(
case=self.case,
path=path_int,
genomebuild=extfile.genomebuild,
mimetype=extfile.mimetype,
file_attributes=extfile.file_attributes,
identifier_map=extfile.identifier_map,
# is copy of the original VCF file
designation=f"variant_calls/{self.var_type}/orig-copy",
# checksum=extfile.checksum, # TODO
pedigree=self.case.pedigree_obj,
)
result = []
for idx, extfile in enumerate(extfile_qs):
# Copy the file from the external to the internal storage.
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
path_int = (
f"case-data/{uuid_frag(self.case.sodar_uuid)}/{self.bgjob.sodar_uuid}/"
f"{self.var_type}/external-copy-{idx}.vcf.gz"
)
path_int_full = f"s3://{bucket}/{path_int}"
path_ext = extfile.path
with (
self.external_fs.open(path_ext, "rb") as inputf,
self.internal_fs.open(path_int_full, "wb") as outputf,
):
shutil.copyfileobj(inputf, outputf)
# Create the `PedigreeInternalFile` record after copying is complete.
result.append(
PedigreeInternalFile.objects.create(
case=self.case,
path=path_int,
genomebuild=extfile.genomebuild,
mimetype=extfile.mimetype,
file_attributes=extfile.file_attributes,
identifier_map=extfile.identifier_map,
# is copy of the original VCF file
designation=f"variant_calls/{self.var_type}/orig-copy",
# checksum=extfile.checksum, # TODO
pedigree=self.case.pedigree_obj,
)
)
return result

def annotate_outer(self, vcf_on_s3: PedigreeExternalFile) -> typing.List[PedigreeExternalFile]:
def annotate_outer(
self, vcf_on_s3: typing.List[PedigreeExternalFile]
) -> typing.List[PedigreeExternalFile]:
"""Annotate the VCF file from the internal storage.

Will write temporary PLINK PED file and then call the actual annotation functin.
"""
with tempfile.NamedTemporaryFile(mode="w+t") as tmpf:
write_pedigree_as_plink(self.case.pedigree_obj, tmpf)
tmpf.flush()
return self.annotate(vcf_on_s3, path_ped=tmpf.name)
with (
tempfile.NamedTemporaryFile(mode="w+t", suffix=".ped") as tmpf_ped,
tempfile.NamedTemporaryFile(mode="w+t", suffix=".json") as tmpf_id_map,
):
write_pedigree_as_plink(self.case.pedigree_obj, tmpf_ped)
tmpf_ped.flush()
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
write_id_mapping_json(
{f"{bucket}/{entry.path}": entry.identifier_map for entry in vcf_on_s3},
self.case.pedigree_obj,
tmpf_id_map,
)
tmpf_id_map.flush()
return self.annotate(vcf_on_s3, path_ped=tmpf_ped.name, path_id_map=tmpf_id_map.name)

def annotate(
self, vcf_on_s3: PedigreeExternalFile, path_ped: str
self, vcf_on_s3: typing.List[PedigreeExternalFile], path_ped: str, path_id_map: str
) -> typing.List[PedigreeExternalFile]:
_ = vcf_on_s3
_ = path_ped
_ = path_id_map
raise NotImplementedError

def run_worker(self, args: list[str], env: typing.Dict[str, str] | None = None):
Expand All @@ -776,6 +800,7 @@ def run_worker(self, args: list[str], env: typing.Dict[str, str] | None = None):
class SeqvarsImportExecutor(VariantImportExecutorBase):
"""Run the import of sequence variant import."""

max_vcf_files = 1
var_type = "seqvars"

def run(self) -> typing.List[PedigreeInternalFile]:
Expand All @@ -791,10 +816,12 @@ def run(self) -> typing.List[PedigreeInternalFile]:
return int_on_s3

def annotate(
self, vcf_on_s3: PedigreeExternalFile, path_ped: str
self, vcf_on_s3: typing.List[PedigreeExternalFile], path_ped: str, path_id_map: str
) -> typing.List[PedigreeExternalFile]:
"""Implementation of sequence variant annotation."""
# Path create path of the new fiel.
assert len(vcf_on_s3) == 1, "ensured earlier"

# Path create path of the new file.
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
path_out = (
f"case-data/{uuid_frag(self.case.sodar_uuid)}/{self.bgjob.sodar_uuid}/"
Expand All @@ -809,16 +836,20 @@ def annotate(
"--case-uuid",
str(self.case.sodar_uuid),
"--genomebuild",
vcf_on_s3.genomebuild,
vcf_on_s3[0].genomebuild,
"--path-mehari-db",
f"{settings.WORKER_DB_PATH}/mehari",
"--path-ped",
path_ped,
"--path-in",
vcf_on_s3.path,
f"{bucket}/{vcf_on_s3[0].path}",
"--path-out",
f"{bucket}/{path_out}",
"--id-mapping",
f"@{path_id_map}",
]
# if settings.DEBUG: # XXX remove this
# args += ["--max-var-count", "1000"]
# Setup environment so the worker can access the internal S3 storage.
endpoint_host = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.host
endpoint_port = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.port
Expand All @@ -828,7 +859,7 @@ def annotate(
"AWS_ACCESS_KEY_ID": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.access_key,
"AWS_SECRET_ACCESS_KEY": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.secret_key,
"AWS_ENDPOINT_URL": f"http://{endpoint_host}:{endpoint_port}",
# "AWS_REGION": "us-east-1",
"AWS_REGION": "us-east-1",
}
# Actually execute the worker.
self.run_worker(args=args, env=env)
Expand All @@ -837,9 +868,10 @@ def annotate(
PedigreeInternalFile.objects.create(
case=self.case,
path=f"{path_out}{suffix}",
genomebuild=vcf_on_s3.genomebuild,
genomebuild=vcf_on_s3[0].genomebuild,
mimetype=mimetype,
identifier_map=vcf_on_s3.identifier_map,
# NB: no identifier map as ingest fixed it
identifier_map={},
designation=designation,
file_attributes={},
# checksum=extfile.checksum, # TODO
Expand All @@ -863,21 +895,20 @@ def prefilter_seqvars_outer(self, ingested_on_s3: PedigreeInternalFile):
"""Writes out the prefilter configuration JSON to file and then calls the actual
prefiltration.
"""
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
with tempfile.NamedTemporaryFile(mode="w+t") as tmpf:
configs: list[PrefilterConfig] = settings.VARFISH_CASE_IMPORT_SEQVARS_PREFILTER_CONFIGS
out_lst = []
for idx, config in enumerate(configs):
dirname = os.path.dirname(ingested_on_s3.path)
prefilter_path = f"{dirname}/prefiltered-{idx}.vcf.gz"
out_lst.append(
PrefilterConfig(
**{
**config.dict(),
"prefilter_path": prefilter_path,
}
)
prefilter_path = f"{bucket}/{dirname}/prefiltered-{idx}.vcf.gz"
config = PrefilterConfig(
**{
**config.dict(),
"prefilter_path": prefilter_path,
}
)
json.dump([obj.dict() for obj in out_lst], tmpf)
print(config.model_dump_json(), file=tmpf)
tmpf.flush()
self.prefilter_seqvars(
ingested_on_s3=ingested_on_s3, configs=out_lst, path_config=tmpf.name
Expand All @@ -888,13 +919,14 @@ def prefilter_seqvars(
):
"""Run prefiltration of sequence variants."""
# Create arguments to use.
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
args = [
"seqvars",
"prefilter",
"--params",
f"@{path_config}",
"--path-in",
ingested_on_s3.path,
f"{bucket}/{ingested_on_s3.path}",
]
# Setup environment so the worker can access the internal S3 storage.
endpoint_host = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.host
Expand All @@ -905,7 +937,7 @@ def prefilter_seqvars(
"AWS_ACCESS_KEY_ID": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.access_key,
"AWS_SECRET_ACCESS_KEY": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.secret_key,
"AWS_ENDPOINT_URL": f"http://{endpoint_host}:{endpoint_port}",
# "AWS_REGION": "us-east-1",
"AWS_REGION": "us-east-1",
}
# Actually execute the worker.
self.run_worker(args=args, env=env)
Expand Down Expand Up @@ -943,12 +975,15 @@ def prefilter_seqvars(
class StrucvarsImportExecutor(VariantImportExecutorBase):
"""Run the import of structural variant import."""

max_vcf_files = 10
var_type = "strucvars"

def annotate(
self, vcf_on_s3: PedigreeExternalFile, path_ped: str
self, vcf_on_s3: typing.List[PedigreeExternalFile], path_ped: str, path_id_map: str
) -> typing.List[PedigreeExternalFile]:
"""Implementation of structural variant annotation."""
assert len(vcf_on_s3) > 0, "ensured earlier"

# Path create path of the new fiel.
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
path_out = (
Expand All @@ -964,16 +999,13 @@ def annotate(
"--case-uuid",
str(self.case.sodar_uuid),
"--genomebuild",
vcf_on_s3.genomebuild,
"--path-mehari-db",
f"{settings.WORKER_DB_PATH}/mehari",
vcf_on_s3[0].genomebuild,
"--path-ped",
path_ped,
"--path-in",
vcf_on_s3.path,
"--path-out",
f"{bucket}/{path_out}",
]
for entry in vcf_on_s3:
args += ["--path-in", f"{bucket}/{entry.path}"]
args += ["--path-out", f"{bucket}/{path_out}", "--id-mapping", f"@{path_id_map}"]
# Setup environment so the worker can access the internal S3 storage.
endpoint_host = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.host
endpoint_port = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.port
Expand All @@ -983,7 +1015,7 @@ def annotate(
"AWS_ACCESS_KEY_ID": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.access_key,
"AWS_SECRET_ACCESS_KEY": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.secret_key,
"AWS_ENDPOINT_URL": f"http://{endpoint_host}:{endpoint_port}",
# "AWS_REGION": "us-east-1",
"AWS_REGION": "us-east-1",
}
# Actually execute the worker.
self.run_worker(args=args, env=env)
Expand All @@ -992,9 +1024,10 @@ def annotate(
PedigreeInternalFile.objects.create(
case=self.case,
path=f"{path_out}{suffix}",
genomebuild=vcf_on_s3.genomebuild,
genomebuild=vcf_on_s3[0].genomebuild,
mimetype=mimetype,
identifier_map=vcf_on_s3.identifier_map,
# NB: no identifier map as ingest fixed it
identifier_map={},
designation=designation,
file_attributes={},
# checksum=extfile.checksum, # TODO
Expand Down
Loading
Loading