From be72fd986d4d9c324e8f38f871bd0828a8f9de21 Mon Sep 17 00:00:00 2001 From: Manuel Holtgrewe Date: Wed, 30 Oct 2024 10:11:19 +0100 Subject: [PATCH] fix: seqvars aggregate async runtime error (#519) --- src/main.rs | 2 +- src/seqvars/aggregate/mod.rs | 49 +++++++++++++++++++++--------------- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/src/main.rs b/src/main.rs index 7be439cc..a7379bc7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -100,7 +100,7 @@ async fn main() -> Result<(), anyhow::Error> { SeqvarsCommands::Aggregate(args) => { // Note that aggregate is not async as it uses Rayon and will // block internally for the read files. - seqvars::aggregate::run(&cli.common, args)?; + seqvars::aggregate::run(&cli.common, args).await?; } SeqvarsCommands::Ingest(args) => { seqvars::ingest::run(&cli.common, args).await?; diff --git a/src/seqvars/aggregate/mod.rs b/src/seqvars/aggregate/mod.rs index bcdbdbe8..132f936b 100644 --- a/src/seqvars/aggregate/mod.rs +++ b/src/seqvars/aggregate/mod.rs @@ -315,28 +315,17 @@ async fn import_vcf( } /// Perform the parallel import of VCF files. -fn vcf_import( +async fn vcf_import( db: &Arc>, path_input: &[&str], cf_counts: &str, cf_carriers: &str, genomebuild: crate::common::GenomeRelease, ) -> Result<(), anyhow::Error> { - path_input - .par_iter() - .map(|path_input| { - // We create a Tokio scheduler for the current file as we need it - // to wait / block for the VCF import running in the current Rayon - // thread. - tokio::runtime::Builder::new_current_thread() - .build() - .map_err(|e| { - anyhow::anyhow!( - "building Tokio runtime for VCF file {} failed: {}", - path_input, - e - ) - })? + let handle = tokio::runtime::Handle::current(); + path_input.par_iter().try_for_each(|path_input| { + tokio::task::block_in_place(|| { + handle .block_on(import_vcf( db, path_input, @@ -346,12 +335,31 @@ fn vcf_import( )) .map_err(|e| anyhow::anyhow!("processing VCF file {} failed: {}", path_input, e)) }) - .collect::, _>>() - .map(|_| ()) + // // We create a Tokio scheduler for the current file as we need it + // // to wait / block for the VCF import running in the current Rayon + // // thread. + // tokio::runtime::Builder::new_current_thread() + // .build() + // .map_err(|e| { + // anyhow::anyhow!( + // "building Tokio runtime for VCF file {} failed: {}", + // path_input, + // e + // ) + // })? + // .block_on(import_vcf( + // db, + // path_input, + // cf_counts, + // cf_carriers, + // genomebuild, + // )) + // .map_err(|e| anyhow::anyhow!("processing VCF file {} failed: {}", path_input, e)) + }) } /// Main entry point for `seqvars aggregate` sub command. -pub fn run(args_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Error> { +pub async fn run(args_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Error> { let before_anything = std::time::Instant::now(); tracing::info!("args_common = {:#?}", &args_common); tracing::info!("args = {:#?}", &args); @@ -420,7 +428,8 @@ pub fn run(args_common: &crate::common::Args, args: &Args) -> Result<(), anyhow: &args.cf_counts, &args.cf_carriers, args.genomebuild, - )?; + ) + .await?; tracing::info!( "... done importing VCF files in {:?}", before_import.elapsed()