Skip to content

Commit

Permalink
fix(MemTable): make it cancel-safe and fix parallelism (#5197)
Browse files Browse the repository at this point in the history
* fix(MemTable): make it cancel-safe and fix parallelism

* refactor: use external error for join-error
  • Loading branch information
DDtKey authored Feb 7, 2023
1 parent 816a0f8 commit f63b972
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::error::{DataFusionError, Result};
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
use crate::physical_plan::common;
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
Expand Down Expand Up @@ -76,20 +77,22 @@ impl MemTable {
.map(|part_i| {
let task = state.task_ctx();
let exec = exec.clone();
tokio::spawn(async move {
let task = tokio::spawn(async move {
let stream = exec.execute(part_i, task)?;
common::collect(stream).await
})
});

AbortOnDropSingle::new(task)
})
// this collect *is needed* so that the join below can
// switch between tasks
.collect::<Vec<_>>();

let mut data: Vec<Vec<RecordBatch>> =
Vec::with_capacity(exec.output_partitioning().partition_count());
for task in tasks {
let result = task.await.expect("MemTable::load could not join task")?;
data.push(result);

for result in futures::future::join_all(tasks).await {
data.push(result.map_err(|e| DataFusionError::External(Box::new(e)))??)
}

let exec = MemoryExec::try_new(&data, schema.clone(), None)?;
Expand Down

0 comments on commit f63b972

Please sign in to comment.