Skip to content

Commit

Permalink
feat: adding async I/O, espec. for SV caller guessing (#230)
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe authored Oct 23, 2023
1 parent dcc4152 commit 33c0e8e
Show file tree
Hide file tree
Showing 33 changed files with 25,751 additions and 168 deletions.
411 changes: 385 additions & 26 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ path = "src/main.rs"
actix-web = "4.4"
annonars = "0.24"
anyhow = "1.0"
async-compression = { version = "0.4", features = ["tokio", "gzip"] }
bgzip = "0.3"
bio = "1.3"
biocommons-bioutils = "0.1.4"
Expand All @@ -41,19 +42,20 @@ csv = "1.3"
derivative = "2.2"
env_logger = "0.10"
flate2 = "1.0"
futures = "0.3.28"
hgvs = "0.12"
indexmap = { version = "2.0", features = ["serde"] }
indicatif = "0.17"
jsonl = "4.0"
lazy_static = "1.4"
log = "0.4"
nom = "7.1"
noodles-bgzf = "0.25"
noodles-bgzf = { version = "0.25", features = ["async"] }
noodles-core = "0.12"
noodles-csi = "0.25"
noodles-fasta = "0.30"
noodles-tabix = "0.30"
noodles-vcf = "0.41"
noodles-tabix = "0.31"
noodles-vcf = { version = "0.43", features = ["async"] }
parse-display = "0.8"
procfs = "0.15"
prost = "0.12"
Expand All @@ -69,6 +71,7 @@ serde_with = { version = "3.3", features = ["indexmap_2"] }
strum = { version = "0.25", features = ["derive"] }
tempfile = "3"
thousands = "0.2"
tokio = { version = "1.33", features = ["full"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = "0.3"
uuid = { version = "1.4", features = ["fast-rng", "serde"] }
Expand All @@ -78,7 +81,9 @@ zstd = "0.13"
prost-build = "0.12"

[dev-dependencies]
async-std = { version = "1.12", features = ["attributes"] }
csv = "1.3"
hxdmp = "0.2.1"
insta = { version = "1.34", features = ["yaml"] }
pretty_assertions = "1.4"
rstest = "0.18"
Expand Down
1 change: 1 addition & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// The custom build script, needed as we use prost.

fn main() {
println!("cargo:rerun-if-changed=src/db/create/txs/data.proto3");
prost_build::compile_protos(&["src/db/create/txs/data.proto3"], &["src/"]).unwrap();
}
8 changes: 2 additions & 6 deletions src/annotate/seqvars/csq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,13 +738,9 @@ mod test {
#[case] spdi: &str,
#[case] expected_dist: i32,
) -> Result<(), anyhow::Error> {
crate::common::set_snapshot_suffix!("{}", spdi.replace(":", "-"));
crate::common::set_snapshot_suffix!("{}", spdi.replace(':', "-"));

let spdi = spdi
.split(":")
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
let spdi = spdi.split(':').map(|s| s.to_string()).collect::<Vec<_>>();

let tx_path = "tests/data/annotate/db/grch37/txs.bin.zst";
let tx_db = load_tx_db(tx_path)?;
Expand Down
129 changes: 68 additions & 61 deletions src/annotate/strucvars/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::path::Path;
use std::str::FromStr;
use std::{fs::File, io::BufWriter};

use crate::common::{open_read_maybe_gz, GenomeRelease};
use crate::common::noodles::{open_vcf_reader, AsyncVcfReader};
use crate::common::GenomeRelease;
use crate::ped::PedigreeByName;
use annonars::common::cli::CANONICAL;
use annonars::freqs::cli::import::reading::guess_assembly;
Expand All @@ -20,6 +21,7 @@ use chrono::Utc;
use clap::{Args as ClapArgs, Parser};
use flate2::write::GzEncoder;
use flate2::Compression;
use futures::TryStreamExt;
use noodles_bgzf::Writer as BgzfWriter;
use noodles_vcf::reader::Builder as VariantReaderBuilder;
use noodles_vcf::record::alternate_bases::Allele;
Expand Down Expand Up @@ -1626,13 +1628,14 @@ impl SvCaller {
}

/// Guess the `SvCaller` from the VCF file at the given path.
pub fn guess_sv_caller(reader: Box<dyn std::io::BufRead>) -> Result<SvCaller, anyhow::Error> {
let mut reader = noodles_vcf::reader::Builder.build_from_reader(reader)?;
let header = reader.read_header()?;
pub async fn guess_sv_caller(reader: &mut AsyncVcfReader) -> Result<SvCaller, anyhow::Error> {
let header = reader.read_header().await?;
let mut records = reader.records(&header);
let record = records
.next()
.ok_or(anyhow::anyhow!("No records found"))??;
.try_next()
.await
.map_err(|e| anyhow::anyhow!("Problem reading VCF records: {}", e))?
.ok_or(anyhow::anyhow!("No records found"))?;

for caller in SvCaller::iter() {
if caller.caller_compatible(&header) {
Expand Down Expand Up @@ -2641,9 +2644,9 @@ pub fn build_vcf_record_converter<T: AsRef<str>>(
/// Convert the records in the VCF reader to the JSONL file per contig in `tmp_dir`.
///
/// Note that we will consider the "25 canonical" contigs only (chr1..chr22, chrX, chrY, chrM).
pub fn run_vcf_to_jsonl(
pub async fn run_vcf_to_jsonl(
pedigree: &PedigreeByName,
reader: &mut noodles_vcf::Reader<Box<dyn std::io::BufRead>>,
reader: &mut AsyncVcfReader,
header: &VcfHeader,
sv_caller: &SvCaller,
tmp_dir: &tempfile::TempDir,
Expand Down Expand Up @@ -2678,11 +2681,16 @@ pub fn run_vcf_to_jsonl(
let mapping = CHROM_TO_CHROM_NO.deref();
let mut uuid_buf = [0u8; 16];

for record in reader.records(header) {
let mut records = reader.records(header);
while let Some(record) = records
.try_next()
.await
.map_err(|e| anyhow::anyhow!("problem reading VCF record: {}", e))?
{
rng.fill_bytes(&mut uuid_buf);
let uuid = Uuid::from_bytes(uuid_buf);

let mut record = converter.convert(pedigree, &record?, uuid, GenomeRelease::Grch37)?;
let mut record = converter.convert(pedigree, &record, uuid, GenomeRelease::Grch37)?;
annotate_cov_mq(&mut record, cov_readers)?;
if let Some(chromosome_no) = mapping.get(&record.chromosome) {
let out_jsonl = &mut tmp_files[*chromosome_no as usize - 1];
Expand Down Expand Up @@ -2854,7 +2862,7 @@ pub fn read_and_cluster_for_contig(
/// * `args`: The command line arguments.
/// * `pedigree`: The pedigree of case.
/// * `header`: The input VCF header.
fn run_with_writer(
async fn run_with_writer(
writer: &mut dyn AnnotatedVcfWriter,
args: &Args,
pedigree: &PedigreeByName,
Expand Down Expand Up @@ -2886,12 +2894,14 @@ fn run_with_writer(
tracing::info!("Input VCF files to temporary files...");
for path_input in args.path_input_vcf.iter() {
tracing::debug!("processing VCF file {}", path_input);
let reader = open_read_maybe_gz(path_input)?;
let sv_caller = guess_sv_caller(reader)?;
let sv_caller = {
let mut reader = open_vcf_reader(path_input).await?;
guess_sv_caller(&mut reader).await?
};
tracing::debug!("guessed caller/version to be {:?}", &sv_caller);

let mut reader = noodles_vcf::reader::Builder.build_from_path(path_input)?;
let header: VcfHeader = reader.read_header()?;
let mut reader = open_vcf_reader(path_input).await?;
let header: VcfHeader = reader.read_header().await?;
run_vcf_to_jsonl(
pedigree,
&mut reader,
Expand All @@ -2900,7 +2910,8 @@ fn run_with_writer(
&tmp_dir,
&mut cov_readers,
&mut rng,
)?;
)
.await?;
}
tracing::info!("... done converting input files.");

Expand Down Expand Up @@ -2942,7 +2953,7 @@ fn run_with_writer(
}

/// Main entry point for `annotate strucvars` sub command.
pub fn run(_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Error> {
pub async fn run(_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Error> {
tracing::info!("config = {:#?}", &args);
// Load the pedigree.
tracing::info!("Loading pedigree...");
Expand Down Expand Up @@ -2979,13 +2990,13 @@ pub fn run(_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Err
);
writer.set_assembly(assembly);
writer.set_pedigree(&pedigree);
run_with_writer(&mut writer, &args, &pedigree, &header)?;
run_with_writer(&mut writer, &args, &pedigree, &header).await?;
} else {
let mut writer =
noodles_vcf::Writer::new(File::create(path_output_vcf).map(BufWriter::new)?);
writer.set_assembly(assembly);
writer.set_pedigree(&pedigree);
run_with_writer(&mut writer, &args, &pedigree, &header)?;
run_with_writer(&mut writer, &args, &pedigree, &header).await?;
}
} else {
let path_output_tsv = args
Expand All @@ -2997,7 +3008,7 @@ pub fn run(_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Err
writer.set_assembly(assembly);
writer.set_pedigree(&pedigree);

run_with_writer(&mut writer, &args, &pedigree, &header)?;
run_with_writer(&mut writer, &args, &pedigree, &header).await?;
}

Ok(())
Expand Down Expand Up @@ -3150,7 +3161,7 @@ mod test {
VarFishStrucvarTsvRecord,
},
},
common::{open_read_maybe_gz, GenomeRelease},
common::{noodles::open_vcf_reader, GenomeRelease},
ped::{Disease, Individual, PedigreeByName, Sex},
};

Expand Down Expand Up @@ -3375,8 +3386,8 @@ mod test {
)
}

#[test]
fn vcf_to_jsonl_with_detection() -> Result<(), anyhow::Error> {
#[tokio::test]
async fn vcf_to_jsonl_with_detection() -> Result<(), anyhow::Error> {
let keys = &[
"delly2",
"dragen-cnv",
Expand All @@ -3391,7 +3402,8 @@ mod test {
let path_expected_txt = format!("tests/data/annotate/strucvars/{}-min.out.jsonl", key);
let samples = vcf_samples(&path_input_vcf)?;
let pedigree: PedigreeByName = sample_ped(&samples);
let sv_caller = guess_sv_caller(open_read_maybe_gz(&path_input_vcf)?)?;
let mut reader = open_vcf_reader(&path_input_vcf).await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
let converter = build_vcf_record_converter(&sv_caller, &samples);

run_test_vcf_to_jsonl(
Expand All @@ -3405,71 +3417,65 @@ mod test {
Ok(())
}

#[test]
fn guess_sv_caller_delly() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/delly2-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_delly() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/delly2-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_dragen_sv() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/dragen-sv-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_dragen_sv() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/dragen-sv-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_dragen_cnv() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/dragen-cnv-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_dragen_cnv() -> Result<(), anyhow::Error> {
let mut reader =
open_vcf_reader("tests/data/annotate/strucvars/dragen-cnv-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_gcnv() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/gcnv-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_gcnv() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/gcnv-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_manta() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/manta-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_manta() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/manta-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_melt() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/melt-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_melt() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/melt-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_popdel() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/popdel-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_popdel() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/popdel-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
Expand Down Expand Up @@ -3814,7 +3820,8 @@ mod test {
#[rstest]
#[case(true)]
#[case(false)]
fn test_with_maelstrom_reader(#[case] is_tsv: bool) -> Result<(), anyhow::Error> {
#[tokio::test]
async fn test_with_maelstrom_reader(#[case] is_tsv: bool) -> Result<(), anyhow::Error> {
let temp = TempDir::default();

let args_common = crate::common::Args {
Expand Down Expand Up @@ -3853,7 +3860,7 @@ mod test {
rng_seed: Some(42),
};

run(&args_common, &args)?;
run(&args_common, &args).await?;

let expected = std::fs::read_to_string(format!(
"tests/data/annotate/strucvars/maelstrom/delly2-min-with-maelstrom{}",
Expand Down
4 changes: 4 additions & 0 deletions src/common/io/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! Common, IO-related code.
pub mod std;
pub mod tokio;
Loading

0 comments on commit 33c0e8e

Please sign in to comment.