diff --git a/crates/polars-mem-engine/src/executors/executor.rs b/crates/polars-mem-engine/src/executors/executor.rs index 5a13cd03ed1..75524fc49cd 100644 --- a/crates/polars-mem-engine/src/executors/executor.rs +++ b/crates/polars-mem-engine/src/executors/executor.rs @@ -7,7 +7,7 @@ use super::*; /// /// Executors have other executors as input. By having a tree of executors we can execute the /// physical plan until the last executor is evaluated. -pub trait Executor: Send { +pub trait Executor: Send + Sync { fn execute(&mut self, cache: &mut ExecutionState) -> PolarsResult; } diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index f2f4d31aad8..c1d583c0812 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -5,6 +5,7 @@ use polars_core::prelude::{InitHashMaps, PlHashMap, PlIndexMap}; use polars_core::schema::Schema; use polars_error::PolarsResult; use polars_io::RowIndex; +use polars_mem_engine::create_physical_plan; use polars_plan::plans::expr_ir::{ExprIR, OutputName}; use polars_plan::plans::{AExpr, FileScan, FunctionIR, IR}; use polars_plan::prelude::{FileType, SinkType}; @@ -365,13 +366,34 @@ pub fn lower_ir( PhysNodeKind::InMemorySource { df: Arc::new(DataFrame::empty_with_schema(output_schema.as_ref())), } + } else if scan_sources.len() > 1 || hive_parts.is_some() { + // @TODO: At the moment, we materialize for Hive. I would also not like to do this, + // but at the moment it is this or panicking. + // + // This is very much a hack, forgive me please. + let phys_node = phys_sm.insert(PhysNode { + output_schema: Arc::new(Schema::default()), + kind: PhysNodeKind::InputIndependentSelect { + selectors: Vec::new(), + }, + }); + let input = PhysStream::first(phys_node); + let in_memory_physical_plan = create_physical_plan(node, ir_arena, expr_arena)?; + let in_memory_physical_plan = + Arc::new(std::sync::Mutex::new(in_memory_physical_plan)); + + PhysNodeKind::InMemoryMap { + input, + map: Arc::new(move |_| { + let mut in_memory_physical_plan = in_memory_physical_plan.lock().unwrap(); + in_memory_physical_plan.execute(&mut Default::default()) + }), + } } else { #[cfg(feature = "ipc")] if matches!(scan_type, FileScan::Ipc { .. }) { // @TODO: All the things the IPC source does not support yet. - if hive_parts.is_some() - || scan_sources.is_cloud_url() - || file_options.allow_missing_columns + if scan_sources.is_cloud_url() || file_options.slice.is_some_and(|(offset, _)| offset < 0) { todo!();