Skip to content

Commit

Permalink
feat: adjust varfish-server to unified gnomAD SV dbs (#1395)
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe committed Feb 27, 2024
1 parent 5dbddde commit e78ff45
Show file tree
Hide file tree
Showing 14 changed files with 992 additions and 644 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ docs:

.PHONY: celery
celery:
celery -A config.celery_app worker -l info --concurrency=4 --beat
pipenv run watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- \
celery -A config.celery_app worker -l info --concurrency=4 --beat

.PHONY: geticons
geticons:
Expand Down
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ tblib = "~=3.0.0"
# packages for local development
ipdb = "*"
jedi = "==0.19.1"
watchdog = "*"
werkzeug = "~=3.0.0"

[ldap-packages]
Expand Down
1,143 changes: 620 additions & 523 deletions Pipfile.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@
SV_CLEANUP_BUILDING_SV_SETS = env.int("VARFISH_SV_CLEANUP_BUILDING_SV_SETS", 48)

# Path to database for the worker (base database with sub entries for mehari etc.).
WORKER_DB_PATH = env.str("VARFISH_WORKER_DB_PATH", "")
WORKER_DB_PATH = env.str("VARFISH_WORKER_DB_PATH", "/data/varfish-static/data")

# Path to executable for worker.
WORKER_EXE_PATH = env.str("VARFISH_WORKER_EXE_PATH", "varfish-server-worker")
Expand Down
5 changes: 5 additions & 0 deletions config/settings/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@
# ------------------------------------------------------------------------------
TEST_RUNNER = "snapshottest.django.TestRunner"

# VarFish Worker
# ------------------------------------------------------------------------------

WORKER_DB_PATH = env.str("VARFISH_WORKER_DB_PATH", ".dev/volumes/varfish-static/data")

# Your local stuff: Below this line define 3rd party library settings
# ------------------------------------------------------------------------------

Expand Down
16 changes: 16 additions & 0 deletions svs/migrations/0024_clear_sv_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Generated by Django 3.2.24 on 2024-02-27 13:41

from django.db import migrations


class Migration(migrations.Migration):
dependencies = [
("svs", "0023_int_to_bigint"),
]

operations = [
migrations.RunSQL(
r"""TRUNCATE TABLE "svs_svquery" CASCADE""",
"",
)
]
286 changes: 257 additions & 29 deletions svs/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import subprocess
from tempfile import TemporaryDirectory
import traceback
from typing import Dict, List
import uuid
import uuid as uuid_object

Expand All @@ -16,6 +17,7 @@
from django.db import models, transaction
from django.utils import timezone
from projectroles.models import Project
from pydantic import BaseModel
from sqlalchemy import and_

from svs.models.queries import SvQuery, SvQueryResultRow, SvQueryResultSet
Expand Down Expand Up @@ -91,6 +93,75 @@ def create_sv_query_bg_job(case, svquery, user):
)


class Contig(BaseModel):
"""A genome contig."""

name: str
length: int


CONTIGS_GRCH38: Dict[str, Contig] = {
"1": Contig(name="chr1", length=248956422),
"2": Contig(name="chr2", length=242193529),
"3": Contig(name="chr3", length=198295559),
"4": Contig(name="chr4", length=190214555),
"5": Contig(name="chr5", length=181538259),
"6": Contig(name="chr6", length=170805979),
"7": Contig(name="chr7", length=159345973),
"8": Contig(name="chr8", length=145138636),
"9": Contig(name="chr9", length=138394717),
"10": Contig(name="chr10", length=133797422),
"11": Contig(name="chr11", length=135086622),
"12": Contig(name="chr12", length=133275309),
"13": Contig(name="chr13", length=114364328),
"14": Contig(name="chr14", length=107043718),
"15": Contig(name="chr15", length=101991189),
"16": Contig(name="chr16", length=90338345),
"17": Contig(name="chr17", length=83257441),
"18": Contig(name="chr18", length=80373285),
"19": Contig(name="chr19", length=58617616),
"20": Contig(name="chr20", length=64444167),
"21": Contig(name="chr21", length=46709983),
"22": Contig(name="chr22", length=50818468),
"X": Contig(name="chrX", length=156040895),
"Y": Contig(name="chrY", length=57227415),
"M": Contig(name="chrM", length=16569),
}

CONTIGS_GRCH37: Dict[str, Contig] = {
"1": Contig(name="1", length=249250621),
"2": Contig(name="2", length=243199373),
"3": Contig(name="3", length=198022430),
"4": Contig(name="4", length=191154276),
"5": Contig(name="5", length=180915260),
"6": Contig(name="6", length=171115067),
"7": Contig(name="7", length=159138663),
"8": Contig(name="8", length=146364022),
"9": Contig(name="9", length=141213431),
"10": Contig(name="10", length=135534747),
"11": Contig(name="11", length=135006516),
"12": Contig(name="12", length=133851895),
"13": Contig(name="13", length=115169878),
"14": Contig(name="14", length=107349540),
"15": Contig(name="15", length=102531392),
"16": Contig(name="16", length=90354753),
"17": Contig(name="17", length=81195210),
"18": Contig(name="18", length=78077248),
"19": Contig(name="19", length=59128983),
"20": Contig(name="20", length=63025520),
"21": Contig(name="21", length=48129895),
"22": Contig(name="22", length=51304566),
"X": Contig(name="X", length=155270560),
"Y": Contig(name="Y", length=59373566),
"M": Contig(name="MT", length=16569),
}

CONTIGS: Dict[str, Dict[str, Contig]] = {
"GRCh37": CONTIGS_GRCH37,
"GRCh38": CONTIGS_GRCH38,
}


SV_RECORDS_HEADER = (
"release",
"chromosome",
Expand All @@ -109,6 +180,100 @@ def create_sv_query_bg_job(case, svquery, user):
)


def _write_header(outputf, case, callers):
"""Write out header to to the given output file for the given release."""
worker_version = (
subprocess.check_output([settings.WORKER_EXE_PATH, "--version"])
.decode("utf-8")
.split()[-1]
.strip()
)
print(worker_version)

sample_lines = []
for entry in case.pedigree:
sex = {
1: "Male",
2: "Female",
}.get("sex", "Unknown")
disease = {
1: "Unaffected",
2: "Affected",
}.get("disease", "Unknown")
sample_lines.append(f"##SAMPLE=<ID={entry['patient']},Sex={sex},Disease={disease}>")

pedigree_lines = []
for entry in case.pedigree:
if entry.get("father", "0") != "0":
father = f",Father={entry['father']}"
else:
father = ""
if entry.get("mother", "0") != "0":
mother = f",Mother={entry['mother']}"
else:
mother = ""
pedigree_lines.append(f"##PEDIGREE=<ID={entry['patient']}{father}{mother}>")

caller_lines = []
for caller in callers:
name, version = caller.split("v")
caller_lines.append(
f'##x-varfish-version=<ID=orig-caller-{name},Name="{name}",Version="{version}">'
)

print(
"\n".join(
[
"##fileformat=VCFv4.4",
"##fileDate=%s" % datetime.today().strftime("%Y%m%d"),
'##INFO=<ID=IMPRECISE,Number=0,Type=Flag,Description="Imprecise structural variation">',
'##INFO=<ID=END,Number=1,Type=Integer,Description="End position of the longest variant described in this record">',
'##INFO=<ID=SVTYPE,Number=1,Type=String,Description="Type of structural variant">',
'##INFO=<ID=SVLEN,Number=A,Type=Integer,Description="Length of structural variant">',
'##INFO=<ID=SVCLAIM,Number=A,Type=String,Description="Claim made by the structural variant call. Valid values are D, J, DJ for abundance, adjacency and both respectively">',
'##INFO=<ID=callers,Number=.,Type=String,Description="Callers that called the variant">',
'##INFO=<ID=chr2,Number=1,Type=String,Description="Second chromosome, if not equal to CHROM">',
"##INFO=<ID=annsv,Number=1,Type=String,Description=\"Effect annotations: 'Allele | Annotation | Gene_Name | Gene_ID'\">",
'##FILTER=<ID=PASS,Description="All filters passed">',
'##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Conditional genotype quality">',
'##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">',
'##FORMAT=<ID=pec,Number=1,Type=Integer,Description="Total coverage with paired-end reads">',
'##FORMAT=<ID=pev,Number=1,Type=Integer,Description="Paired-end reads supporting the variant">',
'##FORMAT=<ID=src,Number=1,Type=Integer,Description="Total coverage with split reads">',
'##FORMAT=<ID=srv,Number=1,Type=Integer,Description="Split reads supporting the variant">',
'##FORMAT=<ID=amq,Number=1,Type=Float,Description="Average mapping quality over the variant">',
'##FORMAT=<ID=cn,Number=1,Type=Integer,Description="Copy number of the variant in the sample">',
'##FORMAT=<ID=anc,Number=1,Type=Float,Description="Average normalized coverage over the variant in the sample">',
'##FORMAT=<ID=pc,Number=1,Type=Integer,Description="Point count (windows/targets/probes)">',
'##ALT=<ID=DEL,Description="Deletion">',
'##ALT=<ID=DUP,Description="Duplication">',
'##ALT=<ID=INS,Description="Insertion">',
'##ALT=<ID=CNV,Description="Copy Number Variation">',
'##ALT=<ID=INV,Description="Inversion">',
]
+ [
f'##contig=<ID={contig.name},length={contig.length},assembly="{case.release}",species="Homo sapiens">'
for contig in CONTIGS[case.release].values()
]
+ sample_lines
+ pedigree_lines
+ [
f"##x-varfish-genome-build={case.release}",
f"##x-varfish-case-uuid={case.sodar_uuid}",
f'##x-varfish-version=<ID=varfish-server-worker,Version="{worker_version}">',
]
+ caller_lines
+ [
"\t".join(
["#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO", "FORMAT"]
+ [member["patient"] for member in case.pedigree]
)
]
),
file=outputf,
)


def run_sv_query_bg_job(pk):
"""Execute a query for SVs."""
filter_job = FilterSvBgJob.objects.select_related("case", "svquery").get(id=pk)
Expand Down Expand Up @@ -149,57 +314,120 @@ def _inner(tmpdir):
"""Actual implementation moved into function so we can easily wrap this into try/catch"""
filter_job.add_log_entry("Starting SV database query")

#: Dump the SVs to a TSV file for processing by the worker
# Mapping of key from database JSON keys to VCF format keys.
format_db_to_vcf = {
"gq": "GQ",
"gt": "GT",
"pec": "pec",
"pev": "pev",
"src": "src",
"srv": "srv",
"amq": "amq",
"cn": "cn",
"anc": "anc",
"pc": "pc",
}

# Dump the SVs to a TSV file for processing by the worker
filter_job.add_log_entry("Dumping SVs and query to temporary files ...")
with open(os.path.join(tmpdir, "query.json"), "wt") as outputf:
# Replace empty value strings by None, works around issue with "" rather
# than numbers.
# Replace empty value strings by None, works around issue with empty string
# rather than numbers.
query_settings = {
key: None if value == "" else value
for key, value in query_model.query_settings.items()
}
print(json.dumps(query_settings), file=outputf)
with open(os.path.join(tmpdir, "input.tsv"), "wt") as outputf:
print(
"\t".join(SV_RECORDS_HEADER),
file=outputf,
)

samples = [member["patient"] for member in filter_job.case.pedigree]

with open(os.path.join(tmpdir, "input.vcf"), "wt") as outputf:
case_id = filter_job.case.id
variant_set_id = filter_job.case.latest_structural_variant_set_id
for record in StructuralVariant.objects.filter(case_id=case_id, set_id=variant_set_id):
arr = (
record.release,
records = StructuralVariant.objects.filter(case_id=case_id, set_id=variant_set_id)
callers = set()
for record in records:
for caller in record.caller.split(";"):
callers.add(caller)

_write_header(outputf, filter_job.case, list(sorted(callers)))
for record in records:
keys_in_row = set()
for gt in record.genotype.values():
keys_in_row.update(gt.keys())
keys_in_row = ["gt"] + [
x for x in sorted(keys_in_row & set(format_db_to_vcf.keys())) if x != "gt"
]

info = f"SVTYPE={record.sv_type};END={record.end};callers={record.caller.replace(';', ',')}"
if record.sv_type == "DEL":
info = f"{info};SVLEN={record.end - record.start};SVCLAIM=DJ"
alt = "<DEL>"
elif record.sv_type == "DUP":
info = f"{info};SVLEN={record.end - record.start};SVCLAIM=DJ"
alt = "<DUP>"
elif record.sv_type == "INV":
info = f"{info};SVLEN={record.end - record.start};SVCLAIM=J"
alt = "<INV>"
elif record.sv_type == "INS":
info = f"{info};SVLEN={record.end - record.start};SVCLAIM=J"
alt = "<INS>"
elif record.sv_type == "CNV":
info = f"{info};SVLEN={record.end - record.start};SVCLAIM=D"
alt = "<CNV>"
elif record.sv_type == "BND":
info = f"{info};chr2={record.chromosome2};SVCLAIM=J"
pe_orientation = record.pe_orientation or "NtoN"
if pe_orientation == "3to3":
alt = f"[{record.chromosome2}:{record.end}[N"
elif pe_orientation == "5to5":
alt = f"N]{record.chromosome2}:{record.end}]"
elif pe_orientation == "3to5" or pe_orientation == "NtoN":
alt = f"]{record.chromosome2}:{record.end}]N"
elif pe_orientation == "5to3":
alt = f"N[{record.chromosome2}:{record.end}["
else:
raise ValueError(f"Unexpected PE orientation: {pe_orientation}")
else:
raise ValueError(f"Unexpected SV type: {record.sv_type}")

arr = [
record.chromosome,
str(record.chromosome_no),
str(record.bin),
str(record.start),
record.caller,
record.sv_type,
record.sv_sub_type or record.sv_type,
record.chromosome2 or record.chromosome,
str(record.chromosome_no2 or record.chromosome_no),
str(record.bin2 or record.bin),
str(record.end),
record.pe_orientation or "NtoN",
json.dumps(record.genotype),
)
print("\t".join(arr), file=outputf)
record.start,
".",
"N",
alt,
".",
".",
info,
":".join([format_db_to_vcf[key] for key in keys_in_row]),
] + [
":".join([str(record.genotype[sample].get(key, ".")) for key in keys_in_row])
for sample in samples
]
print("\t".join(map(str, arr)), file=outputf)
with open(os.path.join(tmpdir, "input.vcf"), "rt") as inputf:
data = inputf.read()
print(data)
with open("/tmp/x.vcf", "wt") as outputf:
outputf.write(data)
filter_job.add_log_entry("... done dumping the SVs and query")

#: Actually run the worker
filter_job.add_log_entry("Run the worker on the SVs ...")
start_time = timezone.now()
cmd = [
settings.WORKER_EXE_PATH,
"sv",
"-vvv",
"strucvars",
"query",
"--path-db",
settings.WORKER_DB_PATH,
"--path-query-json",
os.path.join(tmpdir, "query.json"),
"--path-input-svs",
os.path.join(tmpdir, "input.tsv"),
"--path-output-svs",
"--path-input",
os.path.join(tmpdir, "input.vcf"),
"--path-output",
os.path.join(tmpdir, "output.tsv"),
"--genome-release",
filter_job.case.release.lower(),
Expand Down
Loading

0 comments on commit e78ff45

Please sign in to comment.