Skip to content

Commit

Permalink
Implement selection operator
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Sep 24, 2019
1 parent b780c56 commit 6cad327
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 5 deletions.
54 changes: 49 additions & 5 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ use crate::execution::filter::FilterRelation;
use crate::execution::limit::LimitRelation;
use crate::execution::physical_plan::common;
use crate::execution::physical_plan::datasource::DatasourceExec;
use crate::execution::physical_plan::expressions::{Column, Sum};
use crate::execution::physical_plan::expressions::{
BinaryExpr, CastExpr, Column, Literal, Sum,
};
use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
use crate::execution::physical_plan::merge::MergeExec;
use crate::execution::physical_plan::projection::ProjectionExec;
use crate::execution::physical_plan::selection::SelectionExec;
use crate::execution::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr};
use crate::execution::projection::ProjectRelation;
use crate::execution::relation::{DataSourceRelation, Relation};
Expand Down Expand Up @@ -280,6 +283,12 @@ impl ExecutionContext {
schema.clone(),
)?))
}
LogicalPlan::Selection { input, expr, .. } => {
let input = self.create_physical_plan(input, batch_size)?;
let input_schema = input.as_ref().schema().clone();
let runtime_expr = self.create_physical_expr(expr, &input_schema)?;
Ok(Arc::new(SelectionExec::try_new(runtime_expr, input)?))
}
_ => Err(ExecutionError::General(
"Unsupported logical plan variant".to_string(),
)),
Expand All @@ -290,13 +299,25 @@ impl ExecutionContext {
pub fn create_physical_expr(
&self,
e: &Expr,
_input_schema: &Schema,
input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
match e {
Expr::Column(i) => Ok(Arc::new(Column::new(*i))),
_ => Err(ExecutionError::NotImplemented(
"Unsupported expression".to_string(),
)),
Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
Expr::BinaryExpr { left, op, right } => Ok(Arc::new(BinaryExpr::new(
self.create_physical_expr(left, input_schema)?,
op.clone(),
self.create_physical_expr(right, input_schema)?,
))),
Expr::Cast { expr, data_type } => Ok(Arc::new(CastExpr::try_new(
self.create_physical_expr(expr, input_schema)?,
input_schema,
data_type.clone(),
)?)),
other => Err(ExecutionError::NotImplemented(format!(
"Physical plan does not support logical expression {:?}",
other
))),
}
}

Expand Down Expand Up @@ -569,6 +590,29 @@ mod tests {
Ok(())
}

#[test]
fn parallel_selection() -> Result<()> {
let tmp_dir = TempDir::new("parallel_selection")?;
let partition_count = 4;
let mut ctx = create_ctx(&tmp_dir, partition_count)?;

let logical_plan =
ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?;
let logical_plan = ctx.optimize(&logical_plan)?;

let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;

let results = ctx.collect(physical_plan.as_ref())?;

// there should be one batch per partition
assert_eq!(results.len(), partition_count);

let row_count: usize = results.iter().map(|batch| batch.num_rows()).sum();
assert_eq!(row_count, 20);

Ok(())
}

#[test]
fn aggregate() -> Result<()> {
let results = execute("SELECT SUM(c1), SUM(c2) FROM test", 4)?;
Expand Down
1 change: 1 addition & 0 deletions rust/datafusion/src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,4 @@ pub mod expressions;
pub mod hash_aggregate;
pub mod merge;
pub mod projection;
pub mod selection;
184 changes: 184 additions & 0 deletions rust/datafusion/src/execution/physical_plan/selection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines the selection execution plan. A selection filters rows based on a predicate
use std::sync::{Arc, Mutex};

use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::{
BatchIterator, ExecutionPlan, Partition, PhysicalExpr,
};
use arrow::array::BooleanArray;
use arrow::compute::filter;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;

/// Execution plan for a Selection
pub struct SelectionExec {
/// The selection predicate expression
expr: Arc<dyn PhysicalExpr>,
/// The input plan
input: Arc<dyn ExecutionPlan>,
}

impl SelectionExec {
/// Create a selection on an input
pub fn try_new(
expr: Arc<dyn PhysicalExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
Ok(Self {
expr: expr.clone(),
input: input.clone(),
})
}
}

impl ExecutionPlan for SelectionExec {
/// Get the schema for this execution plan
fn schema(&self) -> Arc<Schema> {
// The selection operator does not make any changes to the schema of its input
self.input.schema()
}

/// Get the partitions for this execution plan
fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
let partitions: Vec<Arc<dyn Partition>> = self
.input
.partitions()?
.iter()
.map(|p| {
let expr = self.expr.clone();
let partition: Arc<dyn Partition> = Arc::new(SelectionPartition {
schema: self.input.schema(),
expr,
input: p.clone() as Arc<dyn Partition>,
});

partition
})
.collect();

Ok(partitions)
}
}

/// Represents a single partition of a Selection execution plan
struct SelectionPartition {
schema: Arc<Schema>,
expr: Arc<dyn PhysicalExpr>,
input: Arc<dyn Partition>,
}

impl Partition for SelectionPartition {
/// Execute the Selection
fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
Ok(Arc::new(Mutex::new(SelectionIterator {
schema: self.schema.clone(),
expr: self.expr.clone(),
input: self.input.execute()?,
})))
}
}

/// Selection iterator
struct SelectionIterator {
schema: Arc<Schema>,
expr: Arc<dyn PhysicalExpr>,
input: Arc<Mutex<dyn BatchIterator>>,
}

impl BatchIterator for SelectionIterator {
/// Get the schema
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}

/// Get the next batch
fn next(&mut self) -> Result<Option<RecordBatch>> {
let mut input = self.input.lock().unwrap();
match input.next()? {
Some(batch) => {
// evaluate the selection predicate to get a boolean array
let predicate_result = self.expr.evaluate(&batch)?;

if let Some(f) = predicate_result.as_any().downcast_ref::<BooleanArray>()
{
// filter each array
let mut filtered_arrays = vec![];
for i in 0..batch.num_columns() {
let array = batch.column(i);
let filtered_array = filter(array.as_ref(), f)?;
filtered_arrays.push(filtered_array);
}
Ok(Some(RecordBatch::try_new(
batch.schema().clone(),
filtered_arrays,
)?))
} else {
Err(ExecutionError::InternalError(
"Predicate evaluated to non-boolean value".to_string(),
))
}
}
None => Ok(None),
}
}
}

#[cfg(test)]
mod tests {

use super::*;
use crate::execution::physical_plan::csv::CsvExec;
use crate::execution::physical_plan::expressions::*;
use crate::execution::physical_plan::ExecutionPlan;
use crate::logicalplan::{Operator, ScalarValue};
use crate::test;
use std::iter::Iterator;

#[test]
fn simple_predicate() -> Result<()> {
let schema = test::aggr_test_schema();

let partitions = 4;
let path = test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;

let csv = CsvExec::try_new(&path, schema, true, None, 1024)?;

let predicate: Arc<dyn PhysicalExpr> = binary(
binary(col(1), Operator::Gt, lit(ScalarValue::UInt32(1))),
Operator::And,
binary(col(1), Operator::Lt, lit(ScalarValue::UInt32(4))),
);

let selection: Arc<dyn ExecutionPlan> =
Arc::new(SelectionExec::try_new(predicate, Arc::new(csv))?);

let results = test::execute(selection.as_ref())?;

results
.iter()
.for_each(|batch| assert_eq!(13, batch.num_columns()));
let row_count: usize = results.iter().map(|batch| batch.num_rows()).sum();
assert_eq!(41, row_count);

Ok(())
}

}

0 comments on commit 6cad327

Please sign in to comment.