Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make reading from S3 work and bump mehari #452

Merged
merged 3 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
370 changes: 180 additions & 190 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ ext-sort = { version = "0.1", features = ["memory-limit", "bytesize"] }
fastrand = "2.1"
flate2 = "1.0"
futures = "0.3.30"
hgvs = "0.17"
hgvs = "0.17.2"
indexmap = { version = "2.5", features = ["serde"] }
itertools = "0.13"
log = "0.4"
mehari = "0.26.1"
mehari = "0.28.1"
multimap = "0.10"
pbjson = "0.7"
pbjson-types = "0.7"
Expand Down
64 changes: 29 additions & 35 deletions src/seqvars/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

pub mod ds;

use mehari::common::noodles::open_vcf_reader;
use futures::TryStreamExt as _;
use mehari::common::noodles::NoodlesVariantReader as _;
use noodles::vcf;
use rayon::prelude::*;
use std::{str::FromStr as _, sync::Arc};
Expand Down Expand Up @@ -196,7 +197,7 @@ async fn import_vcf(
cf_carriers: &str,
genomebuild: crate::common::GenomeRelease,
) -> Result<(), anyhow::Error> {
let mut input_reader = open_vcf_reader(path_input)
let mut input_reader = common::noodles::open_vcf_reader(path_input)
.await
.map_err(|e| anyhow::anyhow!("could not open file {} for reading: {}", path_input, e))?;
let input_header = input_reader.read_header().await?;
Expand All @@ -206,16 +207,9 @@ async fn import_vcf(

let (pedigree, case_uuid) = common::extract_pedigree_and_case_uuid(&input_header)?;
let mut prev = std::time::Instant::now();
let mut record_buf = vcf::variant::RecordBuf::default();
loop {
let bytes_read = input_reader
.read_record_buf(&input_header, &mut record_buf)
.await
.map_err(|e| anyhow::anyhow!("problem reading VCF file {}: {}", path_input, e))?;
if bytes_read == 0 {
break; // EOF
}

let mut records = input_reader.records(&input_header).await;
while let Some(record_buf) = records.try_next().await? {
// Obtain counts from the current variant.
let (this_counts_data, this_carrier_data) = handle_record(
&record_buf,
Expand All @@ -238,37 +232,37 @@ async fn import_vcf(

// Read data for variant from database.
let mut db_counts_data = transaction.get_cf(&cf_counts, key.clone()).map_err(|e| {
anyhow::anyhow!(
"problem acessing counts data for variant {:?}: {} (non-existing would be fine)",
&vcf_var,
e
)
})?.map(|buffer| ds::Counts::from_vec(&buffer)).unwrap_or_default();
anyhow::anyhow!(
"problem acessing counts data for variant {:?}: {} (non-existing would be fine)",
&vcf_var,
e
)
})?.map(|buffer| ds::Counts::from_vec(&buffer)).unwrap_or_default();
let mut db_carrier_data = transaction.get_cf(&cf_carriers, key.clone()).map_err(|e| {
anyhow::anyhow!(
"problem acessing carrier data for variant {:?}: {} (non-existing would be fine)",
&vcf_var,
e
)
})?
.map(|buffer| ds::CarrierList::try_from(buffer.as_slice()))
.transpose()
.map_err(|e| {
anyhow::anyhow!(
"problem decoding carrier data for variant {:?}: {}",
&vcf_var,
e
)
})?
.unwrap_or_default();
anyhow::anyhow!(
"problem acessing carrier data for variant {:?}: {} (non-existing would be fine)",
&vcf_var,
e
)
})?
.map(|buffer| ds::CarrierList::try_from(buffer.as_slice()))
.transpose()
.map_err(|e| {
anyhow::anyhow!(
"problem decoding carrier data for variant {:?}: {}",
&vcf_var,
e
)
})?
.unwrap_or_default();

// Aggregate the data.
db_counts_data.aggregate(this_counts_data);
db_carrier_data.aggregate(this_carrier_data);

// Write data for variant back to database.
transaction
.put_cf(&cf_counts, key.clone(), &db_counts_data.to_vec())
.put_cf(&cf_counts, key.clone(), db_counts_data.to_vec())
.map_err(|e| {
anyhow::anyhow!(
"problem writing counts data for variant {:?}: {}",
Expand All @@ -277,7 +271,7 @@ async fn import_vcf(
)
})?;
transaction
.put_cf(&cf_carriers, key.clone(), &db_carrier_data.to_vec())
.put_cf(&cf_carriers, key.clone(), db_carrier_data.to_vec())
.map_err(|e| {
anyhow::anyhow!(
"problem writing carrier data for variant {:?}: {}",
Expand Down
60 changes: 16 additions & 44 deletions src/seqvars/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ use crate::{
common::{self, genotype_to_string, strip_gt_leading_slash, worker_version, GenomeRelease},
flush_and_shutdown,
};
use mehari::annotate::seqvars::provider::Provider as MehariProvider;
use mehari::common::noodles::{open_vcf_reader, open_vcf_writer, AsyncVcfReader, AsyncVcfWriter};
use futures::TryStreamExt as _;
use mehari::common::noodles::{open_vcf_writer, AsyncVcfWriter};
use mehari::{
annotate::seqvars::provider::Provider as MehariProvider,
common::noodles::{NoodlesVariantReader as _, VariantReader},
};
use noodles::vcf;
use thousands::Separable;
use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -281,7 +285,7 @@ fn copy_format(
/// Process the variants from `input_reader` to `output_writer`.
async fn process_variants(
output_writer: &mut AsyncVcfWriter,
input_reader: &mut AsyncVcfReader,
input_reader: &mut VariantReader,
output_header: &vcf::Header,
input_header: &vcf::Header,
id_mapping: &Option<indexmap::IndexMap<String, String>>,
Expand All @@ -302,10 +306,7 @@ async fn process_variants(
["meta", "autosomal", "gonosomal", "mitochondrial"],
false,
)?;

let cf_autosomal = db_freq.cf_handle("autosomal").unwrap();
let cf_gonosomal = db_freq.cf_handle("gonosomal").unwrap();
let cf_mtdna = db_freq.cf_handle("mitochondrial").unwrap();
let freq_anno = mehari::annotate::seqvars::FrequencyAnnotator::new(db_freq);

// Open the ClinVar RocksDB database in read only mode.
tracing::info!("Opening ClinVar database");
Expand All @@ -318,8 +319,7 @@ async fn process_variants(
let options = rocksdb::Options::default();
let db_clinvar =
rocksdb::DB::open_cf_for_read_only(&options, &rocksdb_path, ["meta", "clinvar"], false)?;

let cf_clinvar = db_clinvar.cf_handle("clinvar").unwrap();
let clinvar_anno = mehari::annotate::seqvars::ClinvarAnnotator::new(db_clinvar);

// Open the serialized transcripts.
tracing::info!("Opening transcript database");
Expand Down Expand Up @@ -366,17 +366,9 @@ async fn process_variants(
let start = std::time::Instant::now();
let mut prev = std::time::Instant::now();
let mut total_written = 0usize;
let mut input_record = vcf::variant::RecordBuf::default();
let known_format_keys = KNOWN_FORMAT_KEYS.get_or_init(Default::default);
loop {
let bytes_read = input_reader
.read_record_buf(input_header, &mut input_record)
.await
.map_err(|e| anyhow::anyhow!("problem reading VCF file: {}", e))?;
if bytes_read == 0 {
break; // EOF
}

let mut records = input_reader.records(input_header).await;
while let Some(input_record) = records.try_next().await? {
for (allele_no, alt_allele) in input_record.alternate_bases().as_ref().iter().enumerate() {
let allele_no = allele_no + 1;
// Construct record with first few fields describing one variant allele.
Expand Down Expand Up @@ -424,26 +416,11 @@ async fn process_variants(

// Annotate with frequency.
if mehari::annotate::seqvars::CHROM_AUTO.contains(vcf_var.chrom.as_str()) {
mehari::annotate::seqvars::annotate_record_auto(
&db_freq,
&cf_autosomal,
&key,
&mut output_record,
)?;
freq_anno.annotate_record_auto(&key, &mut output_record)?;
} else if mehari::annotate::seqvars::CHROM_XY.contains(vcf_var.chrom.as_str()) {
mehari::annotate::seqvars::annotate_record_xy(
&db_freq,
&cf_gonosomal,
&key,
&mut output_record,
)?;
freq_anno.annotate_record_xy(&key, &mut output_record)?;
} else if mehari::annotate::seqvars::CHROM_MT.contains(vcf_var.chrom.as_str()) {
mehari::annotate::seqvars::annotate_record_mt(
&db_freq,
&cf_mtdna,
&key,
&mut output_record,
)?;
freq_anno.annotate_record_mt(&key, &mut output_record)?;
} else {
tracing::debug!(
"Record @{:?} on non-canonical chromosome, skipping.",
Expand All @@ -452,12 +429,7 @@ async fn process_variants(
}

// Annotate with ClinVar information.
mehari::annotate::seqvars::annotate_record_clinvar(
&db_clinvar,
&cf_clinvar,
&key,
&mut output_record,
)?;
clinvar_anno.annotate_record_clinvar(&key, &mut output_record)?;
}

let annonars::common::keys::Var {
Expand Down Expand Up @@ -527,7 +499,7 @@ pub async fn run(args_common: &crate::common::Args, args: &Args) -> Result<(), a
tracing::info!("pedigre = {:#?}", &pedigree);

tracing::info!("opening input file...");
let mut input_reader = open_vcf_reader(&args.path_in)
let mut input_reader = common::noodles::open_vcf_reader(&args.path_in)
.await
.map_err(|e| anyhow::anyhow!("could not build VCF reader: {}", e))?;

Expand Down
18 changes: 6 additions & 12 deletions src/seqvars/prefilter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

use std::io::BufRead;

use futures::TryStreamExt as _;
use mehari::annotate::seqvars::ann::AnnField;
use mehari::common::noodles::{open_vcf_reader, open_vcf_writer, AsyncVcfReader, AsyncVcfWriter};
use mehari::common::noodles::{open_vcf_writer, AsyncVcfWriter, NoodlesVariantReader as _};
use noodles::vcf;
use thousands::Separable;
use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -140,7 +141,7 @@ fn get_freq_and_distance(

/// Perform the actual prefiltration.
async fn run_filtration(
input_reader: &mut AsyncVcfReader,
input_reader: &mut mehari::common::noodles::VariantReader,
input_header: &vcf::Header,
output_writers: &mut [AsyncVcfWriter],
output_headers: &[vcf::Header],
Expand All @@ -149,16 +150,9 @@ async fn run_filtration(
let start = std::time::Instant::now();
let mut prev = std::time::Instant::now();
let mut total_written = 0usize;
let mut input_record = vcf::variant::RecordBuf::default();
loop {
let bytes_read = input_reader
.read_record_buf(input_header, &mut input_record)
.await
.map_err(|e| anyhow::anyhow!("problem reading VCF file: {}", e))?;
if bytes_read == 0 {
break; // EOF
}

let mut records = input_reader.records(input_header).await;
while let Some(input_record) = records.try_next().await? {
let (frequency, exon_distance) = get_freq_and_distance(&input_record)?;
if let Some(exon_distance) = exon_distance {
for ((writer_params, output_writer), output_header) in params
Expand Down Expand Up @@ -204,7 +198,7 @@ pub async fn run(args_common: &crate::common::Args, args: &Args) -> Result<(), a
tracing::info!("loading prefilter params...");
let params_list = load_params(&args.params)?;
tracing::info!("opening input file...");
let mut reader = open_vcf_reader(&args.path_in)
let mut reader = common::noodles::open_vcf_reader(&args.path_in)
.await
.map_err(|e| anyhow::anyhow!("could not open input file: {}", e))?;
let header = reader
Expand Down
26 changes: 9 additions & 17 deletions src/seqvars/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ use clap::{command, Parser};
use ext_sort::LimitedBufferBuilder;
use ext_sort::{ExternalSorter, ExternalSorterBuilder};

use futures::TryStreamExt as _;
use itertools::Itertools as _;
use mehari::annotate::seqvars::CHROM_TO_CHROM_NO;
use noodles::vcf;
use mehari::common::noodles::NoodlesVariantReader as _;
use rand_core::{RngCore, SeedableRng};
use schema::data::{TryFromVcf as _, VariantRecord};
use schema::query::{CaseQuery, GenotypeChoice, RecessiveMode, SampleGenotypeChoice};
Expand All @@ -27,7 +28,6 @@ use crate::common;
use crate::pbs::varfish::v1::seqvars::output as pbs_output;
use crate::pbs::varfish::v1::seqvars::query as pbs_query;
use crate::{common::trace_rss_now, common::GenomeRelease};
use mehari::common::noodles::open_vcf_reader;

use self::annonars::Annotator;
use self::sorting::{ByCoordinate, ByHgncId};
Expand Down Expand Up @@ -271,9 +271,11 @@ async fn run_query(
let mut uuid_buf = [0u8; 16];

// Open VCF file, create reader, and read header.
let mut input_reader = open_vcf_reader(&args.path_input).await.map_err(|e| {
anyhow::anyhow!("could not open file {} for reading: {}", args.path_input, e)
})?;
let mut input_reader = common::noodles::open_vcf_reader(&args.path_input)
.await
.map_err(|e| {
anyhow::anyhow!("could not open file {} for reading: {}", args.path_input, e)
})?;
let input_header = input_reader.read_header().await?;

let path_unsorted = tmp_dir.path().join("unsorted.jsonl");
Expand All @@ -289,18 +291,8 @@ async fn run_query(
.map(std::io::BufWriter::new)
.map_err(|e| anyhow::anyhow!("could not create temporary unsorted file: {}", e))?;

let mut record_buf = vcf::variant::RecordBuf::default();
loop {
let bytes_read = input_reader
.read_record_buf(&input_header, &mut record_buf)
.await
.map_err(|e| {
anyhow::anyhow!("problem reading VCF file {}: {}", &args.path_input, e)
})?;
if bytes_read == 0 {
break; // EOF
}

let mut records = input_reader.records(&input_header).await;
while let Some(record_buf) = records.try_next().await? {
stats.count_total += 1;
let record_seqvar = VariantRecord::try_from_vcf(&record_buf, &input_header)
.map_err(|e| anyhow::anyhow!("could not parse VCF record: {}", e))?;
Expand Down
Loading
Loading