Skip to content

Commit

Permalink
Implement projection logix
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Aug 6, 2019
1 parent 1875902 commit d1ede3c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
2 changes: 1 addition & 1 deletion rust/datafusion/src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub trait BatchIterator: Send + Sync {
/// Expression that can be evaluated against a RecordBatch
pub trait PhysicalExpr: Send + Sync {
/// Evaluate an expression against a RecordBatch
fn evaluate(&self, batch: RecordBatch) -> Result<ArrayRef>;
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
}

pub mod projection;
34 changes: 29 additions & 5 deletions rust/datafusion/src/execution/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::execution::physical_plan::{
BatchIterator, ExecutionPlan, Partition, PhysicalExpr,
};
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;

/// Execution plan for a projection
pub struct ProjectionExec {
Expand All @@ -53,6 +54,7 @@ impl ExecutionPlan for ProjectionExec {
.map(|p| {
let expr = self.expr.clone();
let projection: Arc<Partition> = Arc::new(ProjectionPartition {
schema: self.schema.clone(),
expr,
input: p.clone() as Arc<Partition>,
});
Expand All @@ -67,17 +69,39 @@ impl ExecutionPlan for ProjectionExec {

/// Represents a single partition of a projection execution plan
struct ProjectionPartition {
schema: Arc<Schema>,
expr: Vec<Arc<dyn PhysicalExpr>>,
input: Arc<dyn Partition>,
}

impl Partition for ProjectionPartition {
/// Execute the projection
fn execute(&self) -> Result<Arc<dyn BatchIterator>> {
// execute the input partition and get an iterator
let it = self.input.execute()?;
//TODO wrap the iterator in a new one that performs the projection by evaluating the
// expressions against the batches
Ok(it)
Ok(Arc::new(ProjectionIterator {
schema: self.schema.clone(),
expr: self.expr.clone(),
input: self.input.execute()?,
}))
}
}

/// Projection iterator
struct ProjectionIterator {
schema: Arc<Schema>,
expr: Vec<Arc<dyn PhysicalExpr>>,
input: Arc<dyn BatchIterator>,
}

impl BatchIterator for ProjectionIterator {
/// Get the next batch
fn next(&self) -> Result<Option<RecordBatch>> {
match self.input.next()? {
Some(batch) => {
let arrays: Result<Vec<_>> =
self.expr.iter().map(|expr| expr.evaluate(&batch)).collect();
Ok(Some(RecordBatch::try_new(self.schema.clone(), arrays?)?))
}
None => Ok(None),
}
}
}

0 comments on commit d1ede3c

Please sign in to comment.