Skip to content

Commit

Permalink
chore: Dispatch to the in-memory engine for multifile sources (#20860)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jan 23, 2025
1 parent b333ab7 commit 6d01c91
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
2 changes: 1 addition & 1 deletion crates/polars-mem-engine/src/executors/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::*;
///
/// Executors have other executors as input. By having a tree of executors we can execute the
/// physical plan until the last executor is evaluated.
pub trait Executor: Send {
pub trait Executor: Send + Sync {
fn execute(&mut self, cache: &mut ExecutionState) -> PolarsResult<DataFrame>;
}

Expand Down
28 changes: 25 additions & 3 deletions crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use polars_core::prelude::{InitHashMaps, PlHashMap, PlIndexMap};
use polars_core::schema::Schema;
use polars_error::PolarsResult;
use polars_io::RowIndex;
use polars_mem_engine::create_physical_plan;
use polars_plan::plans::expr_ir::{ExprIR, OutputName};
use polars_plan::plans::{AExpr, FileScan, FunctionIR, IR};
use polars_plan::prelude::{FileType, SinkType};
Expand Down Expand Up @@ -365,13 +366,34 @@ pub fn lower_ir(
PhysNodeKind::InMemorySource {
df: Arc::new(DataFrame::empty_with_schema(output_schema.as_ref())),
}
} else if scan_sources.len() > 1 || hive_parts.is_some() {
// @TODO: At the moment, we materialize for Hive. I would also not like to do this,
// but at the moment it is this or panicking.
//
// This is very much a hack, forgive me please.
let phys_node = phys_sm.insert(PhysNode {
output_schema: Arc::new(Schema::default()),
kind: PhysNodeKind::InputIndependentSelect {
selectors: Vec::new(),
},
});
let input = PhysStream::first(phys_node);
let in_memory_physical_plan = create_physical_plan(node, ir_arena, expr_arena)?;
let in_memory_physical_plan =
Arc::new(std::sync::Mutex::new(in_memory_physical_plan));

PhysNodeKind::InMemoryMap {
input,
map: Arc::new(move |_| {
let mut in_memory_physical_plan = in_memory_physical_plan.lock().unwrap();
in_memory_physical_plan.execute(&mut Default::default())
}),
}
} else {
#[cfg(feature = "ipc")]
if matches!(scan_type, FileScan::Ipc { .. }) {
// @TODO: All the things the IPC source does not support yet.
if hive_parts.is_some()
|| scan_sources.is_cloud_url()
|| file_options.allow_missing_columns
if scan_sources.is_cloud_url()
|| file_options.slice.is_some_and(|(offset, _)| offset < 0)
{
todo!();
Expand Down

0 comments on commit 6d01c91

Please sign in to comment.