From 0d5e380336644b66781310cc945ef4ac826291fa Mon Sep 17 00:00:00 2001 From: Manuel Holtgrewe Date: Fri, 1 Nov 2024 09:37:24 +0100 Subject: [PATCH] feat: integrate seqvars inhouse rocksdb in worker call (#2069) (#2070) --- backend/config/celery.py | 1 + backend/config/settings/base.py | 3 + .../commands/buildseqvarsinhousedb.py | 34 ++++ ...0015_seqvarsinhousedbbuildbackgroundjob.py | 43 +++++ backend/seqvars/models/base.py | 58 ++++++- backend/seqvars/models/executors.py | 164 +++++++++++++++--- backend/seqvars/plugins.py | 6 +- backend/seqvars/tasks.py | 19 ++ .../views/SeqvarsQuery/SeqvarsQuery.vue | 4 +- utils/docker/Dockerfile | 2 +- 10 files changed, 307 insertions(+), 27 deletions(-) create mode 100644 backend/seqvars/management/commands/buildseqvarsinhousedb.py create mode 100644 backend/seqvars/migrations/0015_seqvarsinhousedbbuildbackgroundjob.py diff --git a/backend/config/celery.py b/backend/config/celery.py index 9dc07e506..befe962df 100644 --- a/backend/config/celery.py +++ b/backend/config/celery.py @@ -32,6 +32,7 @@ "*.clear_*": {"queue": "maintenance"}, "*.compute_*": {"queue": "maintenance"}, "*.sync_*": {"queue": "maintenance"}, + "*.run_*inhousedbbuild*": {"queue": "maintenance"}, } # Explicitely set the name of the default queue to default (is celery). diff --git a/backend/config/settings/base.py b/backend/config/settings/base.py index ea570342e..5c4d45c40 100644 --- a/backend/config/settings/base.py +++ b/backend/config/settings/base.py @@ -502,6 +502,9 @@ # Path to database for the worker (base database with sub entries for mehari etc.). WORKER_DB_PATH = env.str("VARFISH_WORKER_DB_PATH", "/data/varfish-static/data") +# Writeable path to database for the worker (e.g., for in-house data). +WORKER_RW_PATH = env.str("VARFISH_WORKER_RW_PATH", "/data/varfish-dynamic/data") + # Path to executable for worker. WORKER_EXE_PATH = env.str("VARFISH_WORKER_EXE_PATH", "varfish-server-worker") diff --git a/backend/seqvars/management/commands/buildseqvarsinhousedb.py b/backend/seqvars/management/commands/buildseqvarsinhousedb.py new file mode 100644 index 000000000..57edab2cb --- /dev/null +++ b/backend/seqvars/management/commands/buildseqvarsinhousedb.py @@ -0,0 +1,34 @@ +"""Manually (re-)build the seqvars inhouse database.""" + +import traceback + +from django.core.management.base import BaseCommand, CommandError + +from seqvars.models.base import SeqvarsInhouseDbBuildBackgroundJob +from seqvars.models.executors import run_seqvarsbuildinhousedbbackgroundjob + + +class Command(BaseCommand): + #: Help message displayed on the command line. + help = "(Re-) build the seqvars inhouse database." + + def handle(self, *_args, **options): + """Entrypoint from command line""" + + try: + self._handle() + except Exception as e: + self.stderr.write( + self.style.ERROR( + "A problem occured (see below).\n\n--- BEGIN TRACEBACK ---\n" + f"{traceback.format_exc()}--- END TRACEBACK ---\n" + ) + ) + raise CommandError("Could not re-build the seqvars inhouse db.") from e + + def _handle(self): + # Create a new job to execute. + job = SeqvarsInhouseDbBuildBackgroundJob.objects.create_full() + self.stderr.write(self.style.SUCCESS("Executing seqvars inhouse db build job...")) + run_seqvarsbuildinhousedbbackgroundjob(pk=job.pk) + self.stderr.write(self.style.SUCCESS("... done executing job")) diff --git a/backend/seqvars/migrations/0015_seqvarsinhousedbbuildbackgroundjob.py b/backend/seqvars/migrations/0015_seqvarsinhousedbbuildbackgroundjob.py new file mode 100644 index 000000000..43ba19ce2 --- /dev/null +++ b/backend/seqvars/migrations/0015_seqvarsinhousedbbuildbackgroundjob.py @@ -0,0 +1,43 @@ +# Generated by Django 3.2.25 on 2024-10-30 07:38 + +import uuid + +import bgjobs.models +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ("bgjobs", "0006_auto_20200526_1657"), + ("seqvars", "0014_seqvarsquerypresetsset_is_factory_default"), + ] + + operations = [ + migrations.CreateModel( + name="SeqvarsInhouseDbBuildBackgroundJob", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, primary_key=True, serialize=False, verbose_name="ID" + ), + ), + ("sodar_uuid", models.UUIDField(default=uuid.uuid4, unique=True)), + ( + "bg_job", + models.ForeignKey( + help_text="Background job for state etc.", + on_delete=django.db.models.deletion.CASCADE, + related_name="seqvarsinhousedbbuildbackgroundjob", + to="bgjobs.backgroundjob", + ), + ), + ], + options={ + "ordering": ["-pk"], + }, + bases=(bgjobs.models.JobModelMessageMixin, models.Model), + ), + ] diff --git a/backend/seqvars/models/base.py b/backend/seqvars/models/base.py index 2ad46eb03..c57d57871 100644 --- a/backend/seqvars/models/base.py +++ b/backend/seqvars/models/base.py @@ -22,6 +22,7 @@ import uuid as uuid_object from bgjobs.models import BackgroundJob, JobModelMessageMixin +from django.conf import settings from django.contrib.auth import get_user_model from django.db import models, transaction from django_pydantic_field.v2.fields import PydanticSchemaField as SchemaField @@ -2431,7 +2432,7 @@ class Meta: class SeqvarsQueryExecutionBackgroundJobManager(models.Manager): - """Custom manager class that allows to create a ``SeqvarsQueryExeuctionBackgroundJob`` + """Custom manager class that allows to create a ``SeqvarsQueryExecutionBackgroundJob`` together with the backing ``BackgroundJob``. """ @@ -2486,7 +2487,7 @@ class SeqvarsQueryExecutionBackgroundJob(JobModelMessageMixin, models.Model): on_delete=models.CASCADE, ) - #: The case import action to perform. + #: The query execution to run for. seqvarsqueryexecution = models.ForeignKey( SeqvarsQueryExecution, on_delete=models.CASCADE, null=False ) @@ -2496,3 +2497,56 @@ def get_human_readable_type(self): class Meta: ordering = ["-pk"] + + +class SeqvarsInhouseDbBuildBackgroundJobManager(models.Manager): + """Custom manager class that allows to create a ``SeqvarsInhouseDbBuildBackgroundJob`` + together with the backing ``BackgroundJob``. + """ + + @transaction.atomic + def create_full(self): + bg_job = BackgroundJob.objects.create( + name="Building seqvars inhouse DB", + project=None, + job_type=SeqvarsInhouseDbBuildBackgroundJob.spec_name, + user=User.objects.get(username=settings.PROJECTROLES_ADMIN_OWNER), + ) + instance = super().create( + bg_job=bg_job, + ) + return instance + + +class SeqvarsInhouseDbBuildBackgroundJob(JobModelMessageMixin, models.Model): + """Background job for re-building the seqvars inhouse DB.""" + + # We use a custom manager that provides creation together with the ``BackgroundJob``. + objects: SeqvarsInhouseDbBuildBackgroundJobManager = SeqvarsInhouseDbBuildBackgroundJobManager() + + #: Task description for logging. + task_desc = "Seqvars Query Execution" + + #: String identifying model in BackgroundJob. + spec_name = "seqvars.runinhousedbbuild" + + #: The SODAR UUID. + sodar_uuid = models.UUIDField( + default=uuid_object.uuid4, + unique=True, + ) + + #: The background job for state management etc. + bg_job = models.ForeignKey( + BackgroundJob, + null=False, + related_name="seqvarsinhousedbbuildbackgroundjob", + help_text="Background job for state etc.", + on_delete=models.CASCADE, + ) + + def get_human_readable_type(self): + return self.task_desc + + class Meta: + ordering = ["-pk"] diff --git a/backend/seqvars/models/executors.py b/backend/seqvars/models/executors.py index 55dc6269d..eb97e8673 100644 --- a/backend/seqvars/models/executors.py +++ b/backend/seqvars/models/executors.py @@ -1,4 +1,6 @@ +import datetime import os +import pathlib import subprocess import sys import tempfile @@ -8,12 +10,14 @@ from django.conf import settings from django.utils import timezone from google.protobuf.json_format import MessageToJson, Parse +from projectroles.templatetags.projectroles_common_tags import get_app_setting from cases_files.models import PedigreeInternalFile from cases_import.models.executors import FileSystemOptions, FileSystemWrapper, uuid_frag from seqvars.models.base import ( DataSourceInfoPydantic, DataSourceInfosPydantic, + SeqvarsInhouseDbBuildBackgroundJob, SeqvarsQueryExecution, SeqvarsQueryExecutionBackgroundJob, SeqvarsResultRow, @@ -25,10 +29,33 @@ seqvars_output_record_from_protobuf, ) from seqvars.protos.output_pb2 import OutputHeader, OutputRecord +from variants.models.case import Case -class CaseImportBackgroundJobExecutor: - """Implementation of ``CaseImportBackgroundJob`` execution.""" +def aws_config_env_internal() -> dict[str, str]: + """Build AWS config directory fragment for internal storage.""" + return { + "AWS_ACCESS_KEY_ID": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.access_key, + "AWS_SECRET_ACCESS_KEY": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.secret_key, + "AWS_ENDPOINT_URL": ( + f"http://{settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.host}" + f":{settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.port}" + ), + "AWS_REGION": "us-east-1", + } + + +def run_worker(*, args: list[str], env: typing.Dict[str, str] | None = None): + """Run the worker with the given arguments. + + The worker will create a new VCF file and a TBI file. + """ + cmd = [settings.WORKER_EXE_PATH, *args] + subprocess.check_call(cmd, env=env) + + +class SeqvarsQueryExecutionBackgroundJobExecutor: + """Implementation of ``SeqvarsQueryExecutionBackgroundJob`` execution.""" def __init__(self, job_pk: int): #: Job record primary key. @@ -128,37 +155,36 @@ def execute_query(self, tmpdir: str) -> str: "--path-output", f"{bucket}/{self.path_internal_results}", ] + # Expand with inhouse-database if existing. + worker_rw_path = pathlib.Path(settings.WORKER_DB_PATH) + path_inhouse = ( + worker_rw_path + / "worker" + / "seqvars" + / "inhouse" + / vcf_genome_release + / "active" + / "rocksdb" + ) + if path_inhouse.exists(): + args.extend(["--path-inhouse-db", str(path_inhouse)]) # Setup environment so the worker can access the internal S3 storage. env = { **dict(os.environ.items()), "LC_ALL": "C", - "AWS_ACCESS_KEY_ID": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.access_key, - "AWS_SECRET_ACCESS_KEY": settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.secret_key, - "AWS_ENDPOINT_URL": ( - f"http://{settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.host}" - f":{settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.port}" - ), - "AWS_REGION": "us-east-1", + **aws_config_env_internal(), } # Actualy execute query execution with worker. try: - self.run_worker( + run_worker( args=args, env=env, ) except Exception: - pass + print("Error while executing worker / importing results", file=sys.stderr) return vcf_genome_release - def run_worker(self, *, args: list[str], env: typing.Dict[str, str] | None = None): - """Run the worker with the given arguments. - - The worker will create a new VCF file and a TBI file. - """ - cmd = [settings.WORKER_EXE_PATH, *args] - subprocess.check_call(cmd, env=env) - def load_results(self, *, genome_release: str): """Load the results from the internal storage and store in database.""" bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket @@ -205,5 +231,103 @@ def load_results(self, *, genome_release: str): def run_seqvarsqueryexecutionbackgroundjob(*, pk: int): """Execute the work for a ``SeqvarsQueryExecutionBackgroundJob``.""" - executor = CaseImportBackgroundJobExecutor(pk) + executor = SeqvarsQueryExecutionBackgroundJobExecutor(pk) + executor.run() + + +class InhouseDbBuildBackgroundJobExecutor: + """Implementation of ``SeqvarsInhouseDbBuildBackgroundJob`` execution.""" + + def __init__(self, job_pk: int): + #: Job record primary key. + self.job_pk = job_pk + #: The ``SeqvarsQueryExecutionBackgroundJob`` object itself. + self.bgjob: SeqvarsInhouseDbBuildBackgroundJob = ( + SeqvarsInhouseDbBuildBackgroundJob.objects.get(pk=self.job_pk) + ) + + def run(self): + """Execute building the inhouse database.""" + self.run_for_genome_release(genome_release="grch37") + self.run_for_genome_release(genome_release="grch38") + + def run_for_genome_release(self, *, genome_release: typing.Literal["grch37", "grch38"]): + """Execute building the inhouse database for the given genome release.""" + # Ensure the output path is present. + worker_rw_path = pathlib.Path(settings.WORKER_DB_PATH) + name = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + output_path = worker_rw_path / "worker" / "seqvars" / "inhouse" / genome_release / name + output_path.mkdir(parents=True, exist_ok=True) + # Prepare the file with paths in S3 for the worker. + num_cases = self.prepare_paths_for_genome_release( + genome_release=genome_release, output_path=output_path + ) + if not num_cases: + print(f"No cases to process for {genome_release}, skipping inhouse database build.") + return + # Create arguments to use. + args = [ + "seqvars", + "aggregate", + "--genomebuild", + genome_release, + "--path-out-rocksdb", + str(output_path / "rocksdb"), + "--path-input", + f"@{output_path / 'paths.txt'}", + ] + # Setup environment so the worker can access the internal S3 storage. + env = { + **dict(os.environ.items()), + "LC_ALL": "C", + **aws_config_env_internal(), + } + # Actualy execute query execution with worker. + try: + print(" ".join(args)) + run_worker( + args=args, + env=env, + ) + except Exception: + print("Error while executing worker / importing results", file=sys.stderr) + return + + # Atomically update the "active" symlink for the release using Unix `rename(2)`. + # This will not work on Windows. + print("Updating symlinks...") + output_path_with_suffix = output_path.with_suffix(".active") + print(f"ln -sr {output_path_with_suffix} {output_path}") + output_path_with_suffix.symlink_to(output_path.relative_to(output_path.parent)) + print(f"rename {output_path_with_suffix} {output_path}") + output_path_with_suffix.rename(output_path.with_name("active")) + + def prepare_paths_for_genome_release( + self, *, genome_release: typing.Literal["grch37", "grch38"], output_path: pathlib.Path + ) -> int: + """Prepare the paths file for the worker. + + For this, we loop over all V2 cases that have the matching inhouse release. + """ + bucket = settings.VARFISH_CASE_IMPORT_INTERNAL_STORAGE.bucket + paths = [] + for case in Case.objects.filter(case_version=2).prefetch_related("project").iterator(): + if not get_app_setting("variants", "exclude_from_inhouse_db", project=case.project): + seqvars_file = PedigreeInternalFile.objects.filter( + case=case, + designation="variant_calls/seqvars/ingested-vcf", + )[0] + if seqvars_file.genomebuild == genome_release: + paths.append(f"{bucket}/{seqvars_file.path}") + paths.sort() + + with open(output_path / "paths.txt", "wt") as f: + f.write("\n".join(paths) + "\n") + + return len(paths) + + +def run_seqvarsbuildinhousedbbackgroundjob(*, pk: int): + """Execute the work for a ``SeqvarsInhouseDbBuildBackgroundJob``.""" + executor = InhouseDbBuildBackgroundJobExecutor(pk) executor.run() diff --git a/backend/seqvars/plugins.py b/backend/seqvars/plugins.py index 4560fd18c..a6321a0ce 100644 --- a/backend/seqvars/plugins.py +++ b/backend/seqvars/plugins.py @@ -1,6 +1,9 @@ from bgjobs.plugins import BackgroundJobsPluginPoint -from seqvars.models.base import SeqvarsQueryExecutionBackgroundJob +from seqvars.models.base import ( + SeqvarsInhouseDbBuildBackgroundJob, + SeqvarsQueryExecutionBackgroundJob, +) class BackgroundJobsPlugin(BackgroundJobsPluginPoint): @@ -11,6 +14,7 @@ class BackgroundJobsPlugin(BackgroundJobsPluginPoint): job_specs = { SeqvarsQueryExecutionBackgroundJob.spec_name: SeqvarsQueryExecutionBackgroundJob, + SeqvarsInhouseDbBuildBackgroundJob.spec_name: SeqvarsInhouseDbBuildBackgroundJob, } def get_extra_data_link(self, _extra_data, _name): diff --git a/backend/seqvars/tasks.py b/backend/seqvars/tasks.py index 81512805a..a79444dde 100644 --- a/backend/seqvars/tasks.py +++ b/backend/seqvars/tasks.py @@ -1,3 +1,5 @@ +from celery.schedules import crontab + from config.celery import app from seqvars.models import executors @@ -8,3 +10,20 @@ def run_seqvarsqueryexecutionbackgroundjob(_self, *, seqvarsqueryexecutionbackgr return executors.run_seqvarsqueryexecutionbackgroundjob( pk=seqvarsqueryexecutionbackgroundjob_pk ) + + +@app.task(bind=True) +def run_seqvarsinhousedbbuildbackgroundjob(_self, *, seqvarsinhousedbbuildbackgroundjob_pk: int): + """Task to execute a ``cases_import.models.SeqvarsInhouseDbBuildBackgroundJob``.""" + return executors.run_seqvarsinhousedbbuildbackgroundjob( + pk=seqvarsinhousedbbuildbackgroundjob_pk + ) + + +@app.on_after_finalize.connect +def setup_periodic_tasks(sender, **_kwargs): + """Register periodic tasks""" + # Regularly rebuild the in-house database. + sender.add_periodic_task( + schedule=crontab(day_of_week=1, minute=11), sig=run_seqvarsinhousedbbuildbackgroundjob.s() + ) diff --git a/frontend/src/seqvars/views/SeqvarsQuery/SeqvarsQuery.vue b/frontend/src/seqvars/views/SeqvarsQuery/SeqvarsQuery.vue index 95a0701d4..5b6e6600a 100644 --- a/frontend/src/seqvars/views/SeqvarsQuery/SeqvarsQuery.vue +++ b/frontend/src/seqvars/views/SeqvarsQuery/SeqvarsQuery.vue @@ -63,9 +63,7 @@ const sessionUuid = computed( const selectedPresetSetVersionDetails = computed< SeqvarsQueryPresetsSetVersionDetails | undefined >(() => { - return Array.from(seqvarsPresetsStore.presetSetVersions.values()).filter( - (entry) => !entry.presetsset.is_factory_default, - )[0] + return Array.from(seqvarsPresetsStore.presetSetVersions.values())[0] }) /** The UUID of the currently selected query in the query results view. */ diff --git a/utils/docker/Dockerfile b/utils/docker/Dockerfile index fbfa58255..0d86f65d8 100644 --- a/utils/docker/Dockerfile +++ b/utils/docker/Dockerfile @@ -145,7 +145,7 @@ ADD https://github.com/ufoscout/docker-compose-wait/releases/download/2.7.3/wait RUN chmod +x /usr/local/bin/wait # Copy the worker from the dedicated worker build. -COPY --from=ghcr.io/varfish-org/varfish-server-worker:0.16.0 /usr/local/bin/varfish-server-worker /usr/local/bin/varfish-server-worker +COPY --from=ghcr.io/varfish-org/varfish-server-worker:0.17.0 /usr/local/bin/varfish-server-worker /usr/local/bin/varfish-server-worker # Copy virtual env from python-deps stage COPY --from=python-deps /usr/src/app/.venv /usr/src/app/.venv