From f63b97246ba9ee4a11baf15ff1001333f230b39b Mon Sep 17 00:00:00 2001 From: Artem Medvedev Date: Tue, 7 Feb 2023 11:54:43 +0100 Subject: [PATCH] fix(MemTable): make it cancel-safe and fix parallelism (#5197) * fix(MemTable): make it cancel-safe and fix parallelism * refactor: use external error for join-error --- datafusion/core/src/datasource/memory.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 0b7cd6d5b185..38588fd870e2 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -32,6 +32,7 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::SessionState; use crate::logical_expr::Expr; use crate::physical_plan::common; +use crate::physical_plan::common::AbortOnDropSingle; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::{repartition::RepartitionExec, Partitioning}; @@ -76,10 +77,12 @@ impl MemTable { .map(|part_i| { let task = state.task_ctx(); let exec = exec.clone(); - tokio::spawn(async move { + let task = tokio::spawn(async move { let stream = exec.execute(part_i, task)?; common::collect(stream).await - }) + }); + + AbortOnDropSingle::new(task) }) // this collect *is needed* so that the join below can // switch between tasks @@ -87,9 +90,9 @@ impl MemTable { let mut data: Vec> = Vec::with_capacity(exec.output_partitioning().partition_count()); - for task in tasks { - let result = task.await.expect("MemTable::load could not join task")?; - data.push(result); + + for result in futures::future::join_all(tasks).await { + data.push(result.map_err(|e| DataFusionError::External(Box::new(e)))??) } let exec = MemoryExec::try_new(&data, schema.clone(), None)?;