Skip to content

Commit

Permalink
feat: adding async I/O, espec. for SV caller guessing
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe committed Oct 23, 2023
1 parent dcc4152 commit b1adffc
Show file tree
Hide file tree
Showing 23 changed files with 12,673 additions and 145 deletions.
353 changes: 342 additions & 11 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ path = "src/main.rs"
actix-web = "4.4"
annonars = "0.24"
anyhow = "1.0"
async-compression = { version = "0.4", features = ["tokio", "gzip"] }
bgzip = "0.3"
bio = "1.3"
biocommons-bioutils = "0.1.4"
Expand All @@ -41,6 +42,7 @@ csv = "1.3"
derivative = "2.2"
env_logger = "0.10"
flate2 = "1.0"
futures = "0.3.28"
hgvs = "0.12"
indexmap = { version = "2.0", features = ["serde"] }
indicatif = "0.17"
Expand All @@ -52,8 +54,8 @@ noodles-bgzf = "0.25"
noodles-core = "0.12"
noodles-csi = "0.25"
noodles-fasta = "0.30"
noodles-tabix = "0.30"
noodles-vcf = "0.41"
noodles-tabix = "0.31"
noodles-vcf = { version = "0.42", features = ["async"] }
parse-display = "0.8"
procfs = "0.15"
prost = "0.12"
Expand All @@ -69,6 +71,7 @@ serde_with = { version = "3.3", features = ["indexmap_2"] }
strum = { version = "0.25", features = ["derive"] }
tempfile = "3"
thousands = "0.2"
tokio = { version = "1.33", features = ["full"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = "0.3"
uuid = { version = "1.4", features = ["fast-rng", "serde"] }
Expand All @@ -78,6 +81,7 @@ zstd = "0.13"
prost-build = "0.12"

[dev-dependencies]
async-std = { version = "1.12", features = ["attributes"] }
csv = "1.3"
insta = { version = "1.34", features = ["yaml"] }
pretty_assertions = "1.4"
Expand Down
8 changes: 2 additions & 6 deletions src/annotate/seqvars/csq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,13 +738,9 @@ mod test {
#[case] spdi: &str,
#[case] expected_dist: i32,
) -> Result<(), anyhow::Error> {
crate::common::set_snapshot_suffix!("{}", spdi.replace(":", "-"));
crate::common::set_snapshot_suffix!("{}", spdi.replace(':', "-"));

let spdi = spdi
.split(":")
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
let spdi = spdi.split(':').map(|s| s.to_string()).collect::<Vec<_>>();

let tx_path = "tests/data/annotate/db/grch37/txs.bin.zst";
let tx_db = load_tx_db(tx_path)?;
Expand Down
107 changes: 53 additions & 54 deletions src/annotate/strucvars/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::path::Path;
use std::str::FromStr;
use std::{fs::File, io::BufWriter};

use crate::common::{open_read_maybe_gz, GenomeRelease};
use crate::common::noodles::{open_vcf_reader, AsyncVcfReader};
use crate::common::GenomeRelease;
use crate::ped::PedigreeByName;
use annonars::common::cli::CANONICAL;
use annonars::freqs::cli::import::reading::guess_assembly;
Expand All @@ -20,6 +21,7 @@ use chrono::Utc;
use clap::{Args as ClapArgs, Parser};
use flate2::write::GzEncoder;
use flate2::Compression;
use futures::TryStreamExt;
use noodles_bgzf::Writer as BgzfWriter;
use noodles_vcf::reader::Builder as VariantReaderBuilder;
use noodles_vcf::record::alternate_bases::Allele;
Expand Down Expand Up @@ -1626,13 +1628,14 @@ impl SvCaller {
}

/// Guess the `SvCaller` from the VCF file at the given path.
pub fn guess_sv_caller(reader: Box<dyn std::io::BufRead>) -> Result<SvCaller, anyhow::Error> {
let mut reader = noodles_vcf::reader::Builder.build_from_reader(reader)?;
let header = reader.read_header()?;
pub async fn guess_sv_caller(reader: &mut AsyncVcfReader) -> Result<SvCaller, anyhow::Error> {
let header = reader.read_header().await?;
let mut records = reader.records(&header);
let record = records
.next()
.ok_or(anyhow::anyhow!("No records found"))??;
.try_next()
.await
.map_err(|e| anyhow::anyhow!("Problem reading VCF records: {}", e))?

Check warning on line 1637 in src/annotate/strucvars/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/annotate/strucvars/mod.rs#L1637

Added line #L1637 was not covered by tests
.ok_or(anyhow::anyhow!("No records found"))?;

for caller in SvCaller::iter() {
if caller.caller_compatible(&header) {
Expand Down Expand Up @@ -2854,7 +2857,7 @@ pub fn read_and_cluster_for_contig(
/// * `args`: The command line arguments.
/// * `pedigree`: The pedigree of case.
/// * `header`: The input VCF header.
fn run_with_writer(
async fn run_with_writer(
writer: &mut dyn AnnotatedVcfWriter,
args: &Args,
pedigree: &PedigreeByName,
Expand Down Expand Up @@ -2886,8 +2889,8 @@ fn run_with_writer(
tracing::info!("Input VCF files to temporary files...");
for path_input in args.path_input_vcf.iter() {
tracing::debug!("processing VCF file {}", path_input);
let reader = open_read_maybe_gz(path_input)?;
let sv_caller = guess_sv_caller(reader)?;
let mut reader = open_vcf_reader(path_input).await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
tracing::debug!("guessed caller/version to be {:?}", &sv_caller);

let mut reader = noodles_vcf::reader::Builder.build_from_path(path_input)?;
Expand Down Expand Up @@ -2942,7 +2945,7 @@ fn run_with_writer(
}

/// Main entry point for `annotate strucvars` sub command.
pub fn run(_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Error> {
pub async fn run(_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Error> {
tracing::info!("config = {:#?}", &args);
// Load the pedigree.
tracing::info!("Loading pedigree...");
Expand Down Expand Up @@ -2979,13 +2982,13 @@ pub fn run(_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Err
);
writer.set_assembly(assembly);
writer.set_pedigree(&pedigree);
run_with_writer(&mut writer, &args, &pedigree, &header)?;
run_with_writer(&mut writer, &args, &pedigree, &header).await?;

Check warning on line 2985 in src/annotate/strucvars/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/annotate/strucvars/mod.rs#L2985

Added line #L2985 was not covered by tests
} else {
let mut writer =
noodles_vcf::Writer::new(File::create(path_output_vcf).map(BufWriter::new)?);
writer.set_assembly(assembly);
writer.set_pedigree(&pedigree);
run_with_writer(&mut writer, &args, &pedigree, &header)?;
run_with_writer(&mut writer, &args, &pedigree, &header).await?;
}
} else {
let path_output_tsv = args
Expand All @@ -2997,7 +3000,7 @@ pub fn run(_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Err
writer.set_assembly(assembly);
writer.set_pedigree(&pedigree);

run_with_writer(&mut writer, &args, &pedigree, &header)?;
run_with_writer(&mut writer, &args, &pedigree, &header).await?;
}

Ok(())
Expand Down Expand Up @@ -3150,7 +3153,7 @@ mod test {
VarFishStrucvarTsvRecord,
},
},
common::{open_read_maybe_gz, GenomeRelease},
common::{noodles::open_vcf_reader, GenomeRelease},
ped::{Disease, Individual, PedigreeByName, Sex},
};

Expand Down Expand Up @@ -3375,8 +3378,8 @@ mod test {
)
}

#[test]
fn vcf_to_jsonl_with_detection() -> Result<(), anyhow::Error> {
#[tokio::test]
async fn vcf_to_jsonl_with_detection() -> Result<(), anyhow::Error> {
let keys = &[
"delly2",
"dragen-cnv",
Expand All @@ -3391,7 +3394,8 @@ mod test {
let path_expected_txt = format!("tests/data/annotate/strucvars/{}-min.out.jsonl", key);
let samples = vcf_samples(&path_input_vcf)?;
let pedigree: PedigreeByName = sample_ped(&samples);
let sv_caller = guess_sv_caller(open_read_maybe_gz(&path_input_vcf)?)?;
let mut reader = open_vcf_reader(&path_input_vcf).await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
let converter = build_vcf_record_converter(&sv_caller, &samples);

run_test_vcf_to_jsonl(
Expand All @@ -3405,71 +3409,65 @@ mod test {
Ok(())
}

#[test]
fn guess_sv_caller_delly() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/delly2-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_delly() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/delly2-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_dragen_sv() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/dragen-sv-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_dragen_sv() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/dragen-sv-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_dragen_cnv() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/dragen-cnv-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_dragen_cnv() -> Result<(), anyhow::Error> {
let mut reader =
open_vcf_reader("tests/data/annotate/strucvars/dragen-cnv-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_gcnv() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/gcnv-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_gcnv() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/gcnv-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_manta() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/manta-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_manta() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/manta-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_melt() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/melt-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_melt() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/melt-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_popdel() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/popdel-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_popdel() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/popdel-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
Expand Down Expand Up @@ -3814,7 +3812,8 @@ mod test {
#[rstest]
#[case(true)]
#[case(false)]
fn test_with_maelstrom_reader(#[case] is_tsv: bool) -> Result<(), anyhow::Error> {
#[tokio::test]
async fn test_with_maelstrom_reader(#[case] is_tsv: bool) -> Result<(), anyhow::Error> {
let temp = TempDir::default();

let args_common = crate::common::Args {
Expand Down Expand Up @@ -3853,7 +3852,7 @@ mod test {
rng_seed: Some(42),
};

run(&args_common, &args)?;
run(&args_common, &args).await?;

let expected = std::fs::read_to_string(format!(
"tests/data/annotate/strucvars/maelstrom/delly2-min-with-maelstrom{}",
Expand Down
4 changes: 4 additions & 0 deletions src/common/io/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! Common, IO-related code.
pub mod std;
pub mod tokio;
Loading

0 comments on commit b1adffc

Please sign in to comment.