Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding async I/O, espec. for SV caller guessing #230

Merged
merged 7 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
411 changes: 385 additions & 26 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ path = "src/main.rs"
actix-web = "4.4"
annonars = "0.24"
anyhow = "1.0"
async-compression = { version = "0.4", features = ["tokio", "gzip"] }
bgzip = "0.3"
bio = "1.3"
biocommons-bioutils = "0.1.4"
Expand All @@ -41,19 +42,20 @@ csv = "1.3"
derivative = "2.2"
env_logger = "0.10"
flate2 = "1.0"
futures = "0.3.28"
hgvs = "0.12"
indexmap = { version = "2.0", features = ["serde"] }
indicatif = "0.17"
jsonl = "4.0"
lazy_static = "1.4"
log = "0.4"
nom = "7.1"
noodles-bgzf = "0.25"
noodles-bgzf = { version = "0.25", features = ["async"] }
noodles-core = "0.12"
noodles-csi = "0.25"
noodles-fasta = "0.30"
noodles-tabix = "0.30"
noodles-vcf = "0.41"
noodles-tabix = "0.31"
noodles-vcf = { version = "0.43", features = ["async"] }
parse-display = "0.8"
procfs = "0.15"
prost = "0.12"
Expand All @@ -69,6 +71,7 @@ serde_with = { version = "3.3", features = ["indexmap_2"] }
strum = { version = "0.25", features = ["derive"] }
tempfile = "3"
thousands = "0.2"
tokio = { version = "1.33", features = ["full"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = "0.3"
uuid = { version = "1.4", features = ["fast-rng", "serde"] }
Expand All @@ -78,7 +81,9 @@ zstd = "0.13"
prost-build = "0.12"

[dev-dependencies]
async-std = { version = "1.12", features = ["attributes"] }
csv = "1.3"
hxdmp = "0.2.1"
insta = { version = "1.34", features = ["yaml"] }
pretty_assertions = "1.4"
rstest = "0.18"
Expand Down
1 change: 1 addition & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// The custom build script, needed as we use prost.

fn main() {
println!("cargo:rerun-if-changed=src/db/create/txs/data.proto3");
prost_build::compile_protos(&["src/db/create/txs/data.proto3"], &["src/"]).unwrap();
}
8 changes: 2 additions & 6 deletions src/annotate/seqvars/csq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,13 +738,9 @@ mod test {
#[case] spdi: &str,
#[case] expected_dist: i32,
) -> Result<(), anyhow::Error> {
crate::common::set_snapshot_suffix!("{}", spdi.replace(":", "-"));
crate::common::set_snapshot_suffix!("{}", spdi.replace(':', "-"));

let spdi = spdi
.split(":")
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
let spdi = spdi.split(':').map(|s| s.to_string()).collect::<Vec<_>>();

let tx_path = "tests/data/annotate/db/grch37/txs.bin.zst";
let tx_db = load_tx_db(tx_path)?;
Expand Down
129 changes: 68 additions & 61 deletions src/annotate/strucvars/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::path::Path;
use std::str::FromStr;
use std::{fs::File, io::BufWriter};

use crate::common::{open_read_maybe_gz, GenomeRelease};
use crate::common::noodles::{open_vcf_reader, AsyncVcfReader};
use crate::common::GenomeRelease;
use crate::ped::PedigreeByName;
use annonars::common::cli::CANONICAL;
use annonars::freqs::cli::import::reading::guess_assembly;
Expand All @@ -20,6 +21,7 @@ use chrono::Utc;
use clap::{Args as ClapArgs, Parser};
use flate2::write::GzEncoder;
use flate2::Compression;
use futures::TryStreamExt;
use noodles_bgzf::Writer as BgzfWriter;
use noodles_vcf::reader::Builder as VariantReaderBuilder;
use noodles_vcf::record::alternate_bases::Allele;
Expand Down Expand Up @@ -1626,13 +1628,14 @@ impl SvCaller {
}

/// Guess the `SvCaller` from the VCF file at the given path.
pub fn guess_sv_caller(reader: Box<dyn std::io::BufRead>) -> Result<SvCaller, anyhow::Error> {
let mut reader = noodles_vcf::reader::Builder.build_from_reader(reader)?;
let header = reader.read_header()?;
pub async fn guess_sv_caller(reader: &mut AsyncVcfReader) -> Result<SvCaller, anyhow::Error> {
let header = reader.read_header().await?;
let mut records = reader.records(&header);
let record = records
.next()
.ok_or(anyhow::anyhow!("No records found"))??;
.try_next()
.await
.map_err(|e| anyhow::anyhow!("Problem reading VCF records: {}", e))?
.ok_or(anyhow::anyhow!("No records found"))?;

for caller in SvCaller::iter() {
if caller.caller_compatible(&header) {
Expand Down Expand Up @@ -2641,9 +2644,9 @@ pub fn build_vcf_record_converter<T: AsRef<str>>(
/// Convert the records in the VCF reader to the JSONL file per contig in `tmp_dir`.
///
/// Note that we will consider the "25 canonical" contigs only (chr1..chr22, chrX, chrY, chrM).
pub fn run_vcf_to_jsonl(
pub async fn run_vcf_to_jsonl(
pedigree: &PedigreeByName,
reader: &mut noodles_vcf::Reader<Box<dyn std::io::BufRead>>,
reader: &mut AsyncVcfReader,
header: &VcfHeader,
sv_caller: &SvCaller,
tmp_dir: &tempfile::TempDir,
Expand Down Expand Up @@ -2678,11 +2681,16 @@ pub fn run_vcf_to_jsonl(
let mapping = CHROM_TO_CHROM_NO.deref();
let mut uuid_buf = [0u8; 16];

for record in reader.records(header) {
let mut records = reader.records(header);
while let Some(record) = records
.try_next()
.await
.map_err(|e| anyhow::anyhow!("problem reading VCF record: {}", e))?
{
rng.fill_bytes(&mut uuid_buf);
let uuid = Uuid::from_bytes(uuid_buf);

let mut record = converter.convert(pedigree, &record?, uuid, GenomeRelease::Grch37)?;
let mut record = converter.convert(pedigree, &record, uuid, GenomeRelease::Grch37)?;
annotate_cov_mq(&mut record, cov_readers)?;
if let Some(chromosome_no) = mapping.get(&record.chromosome) {
let out_jsonl = &mut tmp_files[*chromosome_no as usize - 1];
Expand Down Expand Up @@ -2854,7 +2862,7 @@ pub fn read_and_cluster_for_contig(
/// * `args`: The command line arguments.
/// * `pedigree`: The pedigree of case.
/// * `header`: The input VCF header.
fn run_with_writer(
async fn run_with_writer(
writer: &mut dyn AnnotatedVcfWriter,
args: &Args,
pedigree: &PedigreeByName,
Expand Down Expand Up @@ -2886,12 +2894,14 @@ fn run_with_writer(
tracing::info!("Input VCF files to temporary files...");
for path_input in args.path_input_vcf.iter() {
tracing::debug!("processing VCF file {}", path_input);
let reader = open_read_maybe_gz(path_input)?;
let sv_caller = guess_sv_caller(reader)?;
let sv_caller = {
let mut reader = open_vcf_reader(path_input).await?;
guess_sv_caller(&mut reader).await?
};
tracing::debug!("guessed caller/version to be {:?}", &sv_caller);

let mut reader = noodles_vcf::reader::Builder.build_from_path(path_input)?;
let header: VcfHeader = reader.read_header()?;
let mut reader = open_vcf_reader(path_input).await?;
let header: VcfHeader = reader.read_header().await?;
run_vcf_to_jsonl(
pedigree,
&mut reader,
Expand All @@ -2900,7 +2910,8 @@ fn run_with_writer(
&tmp_dir,
&mut cov_readers,
&mut rng,
)?;
)
.await?;
}
tracing::info!("... done converting input files.");

Expand Down Expand Up @@ -2942,7 +2953,7 @@ fn run_with_writer(
}

/// Main entry point for `annotate strucvars` sub command.
pub fn run(_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Error> {
pub async fn run(_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Error> {
tracing::info!("config = {:#?}", &args);
// Load the pedigree.
tracing::info!("Loading pedigree...");
Expand Down Expand Up @@ -2979,13 +2990,13 @@ pub fn run(_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Err
);
writer.set_assembly(assembly);
writer.set_pedigree(&pedigree);
run_with_writer(&mut writer, &args, &pedigree, &header)?;
run_with_writer(&mut writer, &args, &pedigree, &header).await?;
} else {
let mut writer =
noodles_vcf::Writer::new(File::create(path_output_vcf).map(BufWriter::new)?);
writer.set_assembly(assembly);
writer.set_pedigree(&pedigree);
run_with_writer(&mut writer, &args, &pedigree, &header)?;
run_with_writer(&mut writer, &args, &pedigree, &header).await?;
}
} else {
let path_output_tsv = args
Expand All @@ -2997,7 +3008,7 @@ pub fn run(_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Err
writer.set_assembly(assembly);
writer.set_pedigree(&pedigree);

run_with_writer(&mut writer, &args, &pedigree, &header)?;
run_with_writer(&mut writer, &args, &pedigree, &header).await?;
}

Ok(())
Expand Down Expand Up @@ -3150,7 +3161,7 @@ mod test {
VarFishStrucvarTsvRecord,
},
},
common::{open_read_maybe_gz, GenomeRelease},
common::{noodles::open_vcf_reader, GenomeRelease},
ped::{Disease, Individual, PedigreeByName, Sex},
};

Expand Down Expand Up @@ -3375,8 +3386,8 @@ mod test {
)
}

#[test]
fn vcf_to_jsonl_with_detection() -> Result<(), anyhow::Error> {
#[tokio::test]
async fn vcf_to_jsonl_with_detection() -> Result<(), anyhow::Error> {
let keys = &[
"delly2",
"dragen-cnv",
Expand All @@ -3391,7 +3402,8 @@ mod test {
let path_expected_txt = format!("tests/data/annotate/strucvars/{}-min.out.jsonl", key);
let samples = vcf_samples(&path_input_vcf)?;
let pedigree: PedigreeByName = sample_ped(&samples);
let sv_caller = guess_sv_caller(open_read_maybe_gz(&path_input_vcf)?)?;
let mut reader = open_vcf_reader(&path_input_vcf).await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
let converter = build_vcf_record_converter(&sv_caller, &samples);

run_test_vcf_to_jsonl(
Expand All @@ -3405,71 +3417,65 @@ mod test {
Ok(())
}

#[test]
fn guess_sv_caller_delly() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/delly2-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_delly() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/delly2-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_dragen_sv() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/dragen-sv-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_dragen_sv() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/dragen-sv-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_dragen_cnv() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/dragen-cnv-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_dragen_cnv() -> Result<(), anyhow::Error> {
let mut reader =
open_vcf_reader("tests/data/annotate/strucvars/dragen-cnv-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_gcnv() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/gcnv-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_gcnv() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/gcnv-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_manta() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/manta-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_manta() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/manta-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_melt() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/melt-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_melt() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/melt-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
}

#[test]
fn guess_sv_caller_popdel() -> Result<(), anyhow::Error> {
let sv_caller = guess_sv_caller(open_read_maybe_gz(
"tests/data/annotate/strucvars/popdel-min.vcf",
)?)?;
#[tokio::test]
async fn guess_sv_caller_popdel() -> Result<(), anyhow::Error> {
let mut reader = open_vcf_reader("tests/data/annotate/strucvars/popdel-min.vcf").await?;
let sv_caller = guess_sv_caller(&mut reader).await?;
insta::assert_debug_snapshot!(sv_caller);

Ok(())
Expand Down Expand Up @@ -3814,7 +3820,8 @@ mod test {
#[rstest]
#[case(true)]
#[case(false)]
fn test_with_maelstrom_reader(#[case] is_tsv: bool) -> Result<(), anyhow::Error> {
#[tokio::test]
async fn test_with_maelstrom_reader(#[case] is_tsv: bool) -> Result<(), anyhow::Error> {
let temp = TempDir::default();

let args_common = crate::common::Args {
Expand Down Expand Up @@ -3853,7 +3860,7 @@ mod test {
rng_seed: Some(42),
};

run(&args_common, &args)?;
run(&args_common, &args).await?;

let expected = std::fs::read_to_string(format!(
"tests/data/annotate/strucvars/maelstrom/delly2-min-with-maelstrom{}",
Expand Down
4 changes: 4 additions & 0 deletions src/common/io/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! Common, IO-related code.

pub mod std;
pub mod tokio;
Loading
Loading