diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index aded5f979e39b..5364cab8bc25b 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -37,7 +37,9 @@ use crate::execution::filter::FilterRelation; use crate::execution::limit::LimitRelation; use crate::execution::physical_plan::common; use crate::execution::physical_plan::datasource::DatasourceExec; -use crate::execution::physical_plan::expressions::{BinaryExpr, CastExpr, Column, Literal,}; +use crate::execution::physical_plan::expressions::{ + BinaryExpr, CastExpr, Column, Literal, Sum, +}; use crate::execution::physical_plan::hash_aggregate::HashAggregateExec; use crate::execution::physical_plan::merge::MergeExec; use crate::execution::physical_plan::projection::ProjectionExec; @@ -587,6 +589,29 @@ mod tests { Ok(()) } + #[test] + fn parallel_selection() -> Result<()> { + let tmp_dir = TempDir::new("parallel_selection")?; + let partition_count = 4; + let mut ctx = create_ctx(&tmp_dir, partition_count)?; + + let logical_plan = + ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?; + let logical_plan = ctx.optimize(&logical_plan)?; + + let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?; + + let results = ctx.collect(physical_plan.as_ref())?; + + // there should be one batch per partition + assert_eq!(results.len(), partition_count); + + let row_count: usize = results.iter().map(|batch| batch.num_rows()).sum(); + assert_eq!(row_count, 20); + + Ok(()) + } + #[test] fn aggregate() -> Result<()> { let results = execute("SELECT SUM(c1), SUM(c2) FROM test", 4)?; @@ -628,28 +653,6 @@ mod tests { ctx.collect(physical_plan.as_ref()) } - fn parallel_selection() -> Result<()> { - let tmp_dir = TempDir::new("parallel_selection")?; - let partition_count = 4; - let mut ctx = create_ctx(&tmp_dir, partition_count)?; - - let logical_plan = - ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?; - let logical_plan = ctx.optimize(&logical_plan)?; - - let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?; - - let results = ctx.collect(physical_plan.as_ref())?; - - // there should be one batch per partition - assert_eq!(results.len(), partition_count); - - let row_count: usize = results.iter().map(|batch| batch.num_rows()).sum(); - assert_eq!(row_count, 20); - - Ok(()) - } - /// Generate a partitioned CSV file and register it with an execution context fn create_ctx(tmp_dir: &TempDir, partition_count: usize) -> Result { let mut ctx = ExecutionContext::new(); diff --git a/rust/datafusion/src/execution/physical_plan/expressions.rs b/rust/datafusion/src/execution/physical_plan/expressions.rs index 52bc1427268bf..ecd1234f38d13 100644 --- a/rust/datafusion/src/execution/physical_plan/expressions.rs +++ b/rust/datafusion/src/execution/physical_plan/expressions.rs @@ -22,10 +22,21 @@ use std::rc::Rc; use std::sync::Arc; use crate::error::{ExecutionError, Result}; -use crate::execution::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; -use crate::logicalplan::{ScalarValue, Operator}; -use arrow::array::{ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array, UInt32Builder}; -use arrow::compute::kernels::comparison::{lt, lt_eq, gt, gt_eq, eq, neq}; +use crate::execution::physical_plan::{ + Accumulator, AggregateExpr, PhysicalExpr, PhysicalExprRef, +}; +use crate::logicalplan::{Operator, ScalarValue}; +use arrow::array::{ + ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, + Int64Array, Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array, +}; +use arrow::array::{ + Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, + Int8Builder, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder, +}; +use arrow::compute::kernels::boolean::{and, or}; +use arrow::compute::kernels::cast::cast; +use arrow::compute::kernels::comparison::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; @@ -478,11 +489,6 @@ impl PhysicalExpr for CastExpr { } } -/// Create a column expression -pub fn col(i: usize) -> PhysicalExprRef { - Arc::new(Column::new(i)) -} - /// Create a literal expression pub fn lit(value: ScalarValue) -> PhysicalExprRef { Arc::new(Literal::new(value))