Skip to content

Commit

Permalink
feat: integrate seqvars inhouse rocksdb in worker call (#2069) (#2070)
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe authored Nov 1, 2024
1 parent 65cbca2 commit 0d5e380
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 27 deletions.
1 change: 1 addition & 0 deletions backend/config/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"*.clear_*": {"queue": "maintenance"},
"*.compute_*": {"queue": "maintenance"},
"*.sync_*": {"queue": "maintenance"},
"*.run_*inhousedbbuild*": {"queue": "maintenance"},
}

# Explicitely set the name of the default queue to default (is celery).
Expand Down
3 changes: 3 additions & 0 deletions backend/config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,9 @@
# Path to database for the worker (base database with sub entries for mehari etc.).
WORKER_DB_PATH = env.str("VARFISH_WORKER_DB_PATH", "/data/varfish-static/data")

# Writeable path to database for the worker (e.g., for in-house data).
WORKER_RW_PATH = env.str("VARFISH_WORKER_RW_PATH", "/data/varfish-dynamic/data")

# Path to executable for worker.
WORKER_EXE_PATH = env.str("VARFISH_WORKER_EXE_PATH", "varfish-server-worker")

Expand Down
34 changes: 34 additions & 0 deletions backend/seqvars/management/commands/buildseqvarsinhousedb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Manually (re-)build the seqvars inhouse database."""

import traceback

from django.core.management.base import BaseCommand, CommandError

from seqvars.models.base import SeqvarsInhouseDbBuildBackgroundJob
from seqvars.models.executors import run_seqvarsbuildinhousedbbackgroundjob


class Command(BaseCommand):
#: Help message displayed on the command line.
help = "(Re-) build the seqvars inhouse database."

def handle(self, *_args, **options):
"""Entrypoint from command line"""

try:
self._handle()
except Exception as e:
self.stderr.write(
self.style.ERROR(
"A problem occured (see below).\n\n--- BEGIN TRACEBACK ---\n"
f"{traceback.format_exc()}--- END TRACEBACK ---\n"
)
)
raise CommandError("Could not re-build the seqvars inhouse db.") from e

def _handle(self):
# Create a new job to execute.
job = SeqvarsInhouseDbBuildBackgroundJob.objects.create_full()
self.stderr.write(self.style.SUCCESS("Executing seqvars inhouse db build job..."))
run_seqvarsbuildinhousedbbackgroundjob(pk=job.pk)
self.stderr.write(self.style.SUCCESS("... done executing job"))
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Generated by Django 3.2.25 on 2024-10-30 07:38

import uuid

import bgjobs.models
from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
("bgjobs", "0006_auto_20200526_1657"),
("seqvars", "0014_seqvarsquerypresetsset_is_factory_default"),
]

operations = [
migrations.CreateModel(
name="SeqvarsInhouseDbBuildBackgroundJob",
fields=[
(
"id",
models.AutoField(
auto_created=True, primary_key=True, serialize=False, verbose_name="ID"
),
),
("sodar_uuid", models.UUIDField(default=uuid.uuid4, unique=True)),
(
"bg_job",
models.ForeignKey(
help_text="Background job for state etc.",
on_delete=django.db.models.deletion.CASCADE,
related_name="seqvarsinhousedbbuildbackgroundjob",
to="bgjobs.backgroundjob",
),
),
],
options={
"ordering": ["-pk"],
},
bases=(bgjobs.models.JobModelMessageMixin, models.Model),
),
]
58 changes: 56 additions & 2 deletions backend/seqvars/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import uuid as uuid_object

from bgjobs.models import BackgroundJob, JobModelMessageMixin
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import models, transaction
from django_pydantic_field.v2.fields import PydanticSchemaField as SchemaField
Expand Down Expand Up @@ -2431,7 +2432,7 @@ class Meta:


class SeqvarsQueryExecutionBackgroundJobManager(models.Manager):
"""Custom manager class that allows to create a ``SeqvarsQueryExeuctionBackgroundJob``
"""Custom manager class that allows to create a ``SeqvarsQueryExecutionBackgroundJob``
together with the backing ``BackgroundJob``.
"""

Expand Down Expand Up @@ -2486,7 +2487,7 @@ class SeqvarsQueryExecutionBackgroundJob(JobModelMessageMixin, models.Model):
on_delete=models.CASCADE,
)

#: The case import action to perform.
#: The query execution to run for.
seqvarsqueryexecution = models.ForeignKey(
SeqvarsQueryExecution, on_delete=models.CASCADE, null=False
)
Expand All @@ -2496,3 +2497,56 @@ def get_human_readable_type(self):

class Meta:
ordering = ["-pk"]


class SeqvarsInhouseDbBuildBackgroundJobManager(models.Manager):
"""Custom manager class that allows to create a ``SeqvarsInhouseDbBuildBackgroundJob``
together with the backing ``BackgroundJob``.
"""

@transaction.atomic
def create_full(self):
bg_job = BackgroundJob.objects.create(
name="Building seqvars inhouse DB",
project=None,
job_type=SeqvarsInhouseDbBuildBackgroundJob.spec_name,
user=User.objects.get(username=settings.PROJECTROLES_ADMIN_OWNER),
)
instance = super().create(
bg_job=bg_job,
)
return instance


class SeqvarsInhouseDbBuildBackgroundJob(JobModelMessageMixin, models.Model):
"""Background job for re-building the seqvars inhouse DB."""

# We use a custom manager that provides creation together with the ``BackgroundJob``.
objects: SeqvarsInhouseDbBuildBackgroundJobManager = SeqvarsInhouseDbBuildBackgroundJobManager()

#: Task description for logging.
task_desc = "Seqvars Query Execution"

#: String identifying model in BackgroundJob.
spec_name = "seqvars.runinhousedbbuild"

#: The SODAR UUID.
sodar_uuid = models.UUIDField(
default=uuid_object.uuid4,
unique=True,
)

#: The background job for state management etc.
bg_job = models.ForeignKey(
BackgroundJob,
null=False,
related_name="seqvarsinhousedbbuildbackgroundjob",
help_text="Background job for state etc.",
on_delete=models.CASCADE,
)

def get_human_readable_type(self):
return self.task_desc

class Meta:
ordering = ["-pk"]
164 changes: 144 additions & 20 deletions backend/seqvars/models/executors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import datetime
import os
import pathlib
import subprocess
import sys
import tempfile
Expand All @@ -8,12 +10,14 @@
from django.conf import settings
from django.utils import timezone
from google.protobuf.json_format import MessageToJson, Parse
from projectroles.templatetags.projectroles_common_tags import get_app_setting

from cases_files.models import PedigreeInternalFile
from cases_import.models.executors import FileSystemOptions, FileSystemWrapper, uuid_frag
from seqvars.models.base import (
DataSourceInfoPydantic,
DataSourceInfosPydantic,
SeqvarsInhouseDbBuildBackgroundJob,
SeqvarsQueryExecution,
SeqvarsQueryExecutionBackgroundJob,
SeqvarsResultRow,
Expand All @@ -25,10 +29,33 @@
seqvars_output_record_from_protobuf,
)
from seqvars.protos.output_pb2 import OutputHeader, OutputRecord
from variants.models.case import Case


class CaseImportBackgroundJobExecutor:
"""Implementation of ``CaseImportBackgroundJob`` execution."""
def aws_config_env_internal() -> dict[str, str]:
"""Build AWS config directory fragment for internal storage."""
return {
"AWS_ACCESS_KEY_ID": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.access_key,
"AWS_SECRET_ACCESS_KEY": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.secret_key,
"AWS_ENDPOINT_URL": (
f"http://{settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.host}"
f":{settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.port}"
),
"AWS_REGION": "us-east-1",
}


def run_worker(*, args: list[str], env: typing.Dict[str, str] | None = None):
"""Run the worker with the given arguments.
The worker will create a new VCF file and a TBI file.
"""
cmd = [settings.WORKER_EXE_PATH, *args]
subprocess.check_call(cmd, env=env)


class SeqvarsQueryExecutionBackgroundJobExecutor:
"""Implementation of ``SeqvarsQueryExecutionBackgroundJob`` execution."""

def __init__(self, job_pk: int):
#: Job record primary key.
Expand Down Expand Up @@ -128,37 +155,36 @@ def execute_query(self, tmpdir: str) -> str:
"--path-output",
f"{bucket}/{self.path_internal_results}",
]
# Expand with inhouse-database if existing.
worker_rw_path = pathlib.Path(settings.WORKER_DB_PATH)
path_inhouse = (
worker_rw_path
/ "worker"
/ "seqvars"
/ "inhouse"
/ vcf_genome_release
/ "active"
/ "rocksdb"
)
if path_inhouse.exists():
args.extend(["--path-inhouse-db", str(path_inhouse)])
# Setup environment so the worker can access the internal S3 storage.
env = {
**dict(os.environ.items()),
"LC_ALL": "C",
"AWS_ACCESS_KEY_ID": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.access_key,
"AWS_SECRET_ACCESS_KEY": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.secret_key,
"AWS_ENDPOINT_URL": (
f"http://{settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.host}"
f":{settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.port}"
),
"AWS_REGION": "us-east-1",
**aws_config_env_internal(),
}
# Actualy execute query execution with worker.
try:
self.run_worker(
run_worker(
args=args,
env=env,
)
except Exception:
pass
print("Error while executing worker / importing results", file=sys.stderr)

return vcf_genome_release

def run_worker(self, *, args: list[str], env: typing.Dict[str, str] | None = None):
"""Run the worker with the given arguments.
The worker will create a new VCF file and a TBI file.
"""
cmd = [settings.WORKER_EXE_PATH, *args]
subprocess.check_call(cmd, env=env)

def load_results(self, *, genome_release: str):
"""Load the results from the internal storage and store in database."""
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
Expand Down Expand Up @@ -205,5 +231,103 @@ def load_results(self, *, genome_release: str):

def run_seqvarsqueryexecutionbackgroundjob(*, pk: int):
"""Execute the work for a ``SeqvarsQueryExecutionBackgroundJob``."""
executor = CaseImportBackgroundJobExecutor(pk)
executor = SeqvarsQueryExecutionBackgroundJobExecutor(pk)
executor.run()


class InhouseDbBuildBackgroundJobExecutor:
"""Implementation of ``SeqvarsInhouseDbBuildBackgroundJob`` execution."""

def __init__(self, job_pk: int):
#: Job record primary key.
self.job_pk = job_pk
#: The ``SeqvarsQueryExecutionBackgroundJob`` object itself.
self.bgjob: SeqvarsInhouseDbBuildBackgroundJob = (
SeqvarsInhouseDbBuildBackgroundJob.objects.get(pk=self.job_pk)
)

def run(self):
"""Execute building the inhouse database."""
self.run_for_genome_release(genome_release="grch37")
self.run_for_genome_release(genome_release="grch38")

def run_for_genome_release(self, *, genome_release: typing.Literal["grch37", "grch38"]):
"""Execute building the inhouse database for the given genome release."""
# Ensure the output path is present.
worker_rw_path = pathlib.Path(settings.WORKER_DB_PATH)
name = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_path = worker_rw_path / "worker" / "seqvars" / "inhouse" / genome_release / name
output_path.mkdir(parents=True, exist_ok=True)
# Prepare the file with paths in S3 for the worker.
num_cases = self.prepare_paths_for_genome_release(
genome_release=genome_release, output_path=output_path
)
if not num_cases:
print(f"No cases to process for {genome_release}, skipping inhouse database build.")
return
# Create arguments to use.
args = [
"seqvars",
"aggregate",
"--genomebuild",
genome_release,
"--path-out-rocksdb",
str(output_path / "rocksdb"),
"--path-input",
f"@{output_path / 'paths.txt'}",
]
# Setup environment so the worker can access the internal S3 storage.
env = {
**dict(os.environ.items()),
"LC_ALL": "C",
**aws_config_env_internal(),
}
# Actualy execute query execution with worker.
try:
print(" ".join(args))
run_worker(
args=args,
env=env,
)
except Exception:
print("Error while executing worker / importing results", file=sys.stderr)
return

# Atomically update the "active" symlink for the release using Unix `rename(2)`.
# This will not work on Windows.
print("Updating symlinks...")
output_path_with_suffix = output_path.with_suffix(".active")
print(f"ln -sr {output_path_with_suffix} {output_path}")
output_path_with_suffix.symlink_to(output_path.relative_to(output_path.parent))
print(f"rename {output_path_with_suffix} {output_path}")
output_path_with_suffix.rename(output_path.with_name("active"))

def prepare_paths_for_genome_release(
self, *, genome_release: typing.Literal["grch37", "grch38"], output_path: pathlib.Path
) -> int:
"""Prepare the paths file for the worker.
For this, we loop over all V2 cases that have the matching inhouse release.
"""
bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket
paths = []
for case in Case.objects.filter(case_version=2).prefetch_related("project").iterator():
if not get_app_setting("variants", "exclude_from_inhouse_db", project=case.project):
seqvars_file = PedigreeInternalFile.objects.filter(
case=case,
designation="variant_calls/seqvars/ingested-vcf",
)[0]
if seqvars_file.genomebuild == genome_release:
paths.append(f"{bucket}/{seqvars_file.path}")
paths.sort()

with open(output_path / "paths.txt", "wt") as f:
f.write("\n".join(paths) + "\n")

return len(paths)


def run_seqvarsbuildinhousedbbackgroundjob(*, pk: int):
"""Execute the work for a ``SeqvarsInhouseDbBuildBackgroundJob``."""
executor = InhouseDbBuildBackgroundJobExecutor(pk)
executor.run()
Loading

0 comments on commit 0d5e380

Please sign in to comment.