Skip to content

Commit

Permalink
fix: seqvars aggregate async runtime error (#519)
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe committed Oct 30, 2024
1 parent dc7ab84 commit be72fd9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async fn main() -> Result<(), anyhow::Error> {
SeqvarsCommands::Aggregate(args) => {
// Note that aggregate is not async as it uses Rayon and will
// block internally for the read files.
seqvars::aggregate::run(&cli.common, args)?;
seqvars::aggregate::run(&cli.common, args).await?;
}
SeqvarsCommands::Ingest(args) => {
seqvars::ingest::run(&cli.common, args).await?;
Expand Down
49 changes: 29 additions & 20 deletions src/seqvars/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,28 +315,17 @@ async fn import_vcf(
}

/// Perform the parallel import of VCF files.
fn vcf_import(
async fn vcf_import(
db: &Arc<rocksdb::TransactionDB<rocksdb::MultiThreaded>>,
path_input: &[&str],
cf_counts: &str,
cf_carriers: &str,
genomebuild: crate::common::GenomeRelease,
) -> Result<(), anyhow::Error> {
path_input
.par_iter()
.map(|path_input| {
// We create a Tokio scheduler for the current file as we need it
// to wait / block for the VCF import running in the current Rayon
// thread.
tokio::runtime::Builder::new_current_thread()
.build()
.map_err(|e| {
anyhow::anyhow!(
"building Tokio runtime for VCF file {} failed: {}",
path_input,
e
)
})?
let handle = tokio::runtime::Handle::current();
path_input.par_iter().try_for_each(|path_input| {
tokio::task::block_in_place(|| {
handle
.block_on(import_vcf(
db,
path_input,
Expand All @@ -346,12 +335,31 @@ fn vcf_import(
))
.map_err(|e| anyhow::anyhow!("processing VCF file {} failed: {}", path_input, e))
})
.collect::<Result<Vec<_>, _>>()
.map(|_| ())
// // We create a Tokio scheduler for the current file as we need it
// // to wait / block for the VCF import running in the current Rayon
// // thread.
// tokio::runtime::Builder::new_current_thread()
// .build()
// .map_err(|e| {
// anyhow::anyhow!(
// "building Tokio runtime for VCF file {} failed: {}",
// path_input,
// e
// )
// })?
// .block_on(import_vcf(
// db,
// path_input,
// cf_counts,
// cf_carriers,
// genomebuild,
// ))
// .map_err(|e| anyhow::anyhow!("processing VCF file {} failed: {}", path_input, e))
})
}

/// Main entry point for `seqvars aggregate` sub command.
pub fn run(args_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Error> {
pub async fn run(args_common: &crate::common::Args, args: &Args) -> Result<(), anyhow::Error> {
let before_anything = std::time::Instant::now();
tracing::info!("args_common = {:#?}", &args_common);
tracing::info!("args = {:#?}", &args);
Expand Down Expand Up @@ -420,7 +428,8 @@ pub fn run(args_common: &crate::common::Args, args: &Args) -> Result<(), anyhow:
&args.cf_counts,
&args.cf_carriers,
args.genomebuild,
)?;
)
.await?;
tracing::info!(
"... done importing VCF files in {:?}",
before_import.elapsed()
Expand Down

0 comments on commit be72fd9

Please sign in to comment.