diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs index 6b953245661b3..7aedd7902eb9e 100644 --- a/rust/datafusion/src/execution/physical_plan/mod.rs +++ b/rust/datafusion/src/execution/physical_plan/mod.rs @@ -47,7 +47,7 @@ pub trait BatchIterator: Send + Sync { /// Expression that can be evaluated against a RecordBatch pub trait PhysicalExpr: Send + Sync { /// Evaluate an expression against a RecordBatch - fn evaluate(&self, batch: RecordBatch) -> Result; + fn evaluate(&self, batch: &RecordBatch) -> Result; } pub mod projection; diff --git a/rust/datafusion/src/execution/physical_plan/projection.rs b/rust/datafusion/src/execution/physical_plan/projection.rs index 0028dd8c0207d..3e01668a10a4b 100644 --- a/rust/datafusion/src/execution/physical_plan/projection.rs +++ b/rust/datafusion/src/execution/physical_plan/projection.rs @@ -27,6 +27,7 @@ use crate::execution::physical_plan::{ BatchIterator, ExecutionPlan, Partition, PhysicalExpr, }; use arrow::datatypes::Schema; +use arrow::record_batch::RecordBatch; /// Execution plan for a projection pub struct ProjectionExec { @@ -53,6 +54,7 @@ impl ExecutionPlan for ProjectionExec { .map(|p| { let expr = self.expr.clone(); let projection: Arc = Arc::new(ProjectionPartition { + schema: self.schema.clone(), expr, input: p.clone() as Arc, }); @@ -67,6 +69,7 @@ impl ExecutionPlan for ProjectionExec { /// Represents a single partition of a projection execution plan struct ProjectionPartition { + schema: Arc, expr: Vec>, input: Arc, } @@ -74,10 +77,31 @@ struct ProjectionPartition { impl Partition for ProjectionPartition { /// Execute the projection fn execute(&self) -> Result> { - // execute the input partition and get an iterator - let it = self.input.execute()?; - //TODO wrap the iterator in a new one that performs the projection by evaluating the - // expressions against the batches - Ok(it) + Ok(Arc::new(ProjectionIterator { + schema: self.schema.clone(), + expr: self.expr.clone(), + input: self.input.execute()?, + })) + } +} + +/// Projection iterator +struct ProjectionIterator { + schema: Arc, + expr: Vec>, + input: Arc, +} + +impl BatchIterator for ProjectionIterator { + /// Get the next batch + fn next(&self) -> Result> { + match self.input.next()? { + Some(batch) => { + let arrays: Result> = + self.expr.iter().map(|expr| expr.evaluate(&batch)).collect(); + Ok(Some(RecordBatch::try_new(self.schema.clone(), arrays?)?)) + } + None => Ok(None), + } } }