Skip to content

Commit

Permalink
ARROW-6089: [Rust] [DataFusion] Implement physical plan for "selectio…
Browse files Browse the repository at this point in the history
…n" operator

This PR implements the physical execution plan for the selection operator (the WHERE clause in a SQL query).

In order to have working tests, I also had to implement some subset of expressions (column reference, literal value, comparison expressions, and CAST). However, the goal of this PR is not to add complete support for all expressions but to implement the Selection operator. I will create separate JIRA/PRs for adding support for other expressions and data types in the physical query plan.

Closes #5320 from andygrove/ARROW-6089 and squashes the following commits:

6cad327 <Andy Grove> Implement selection operator

Authored-by: Andy Grove <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
  • Loading branch information
andygrove committed Sep 25, 2019
1 parent 4fe330a commit 7f2d637
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 5 deletions.
54 changes: 49 additions & 5 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ use crate::execution::filter::FilterRelation;
use crate::execution::limit::LimitRelation;
use crate::execution::physical_plan::common;
use crate::execution::physical_plan::datasource::DatasourceExec;
use crate::execution::physical_plan::expressions::{Column, Sum};
use crate::execution::physical_plan::expressions::{
BinaryExpr, CastExpr, Column, Literal, Sum,
};
use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
use crate::execution::physical_plan::merge::MergeExec;
use crate::execution::physical_plan::projection::ProjectionExec;
use crate::execution::physical_plan::selection::SelectionExec;
use crate::execution::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr};
use crate::execution::projection::ProjectRelation;
use crate::execution::relation::{DataSourceRelation, Relation};
Expand Down Expand Up @@ -280,6 +283,12 @@ impl ExecutionContext {
schema.clone(),
)?))
}
LogicalPlan::Selection { input, expr, .. } => {
let input = self.create_physical_plan(input, batch_size)?;
let input_schema = input.as_ref().schema().clone();
let runtime_expr = self.create_physical_expr(expr, &input_schema)?;
Ok(Arc::new(SelectionExec::try_new(runtime_expr, input)?))
}
_ => Err(ExecutionError::General(
"Unsupported logical plan variant".to_string(),
)),
Expand All @@ -290,13 +299,25 @@ impl ExecutionContext {
pub fn create_physical_expr(
&self,
e: &Expr,
_input_schema: &Schema,
input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
match e {
Expr::Column(i) => Ok(Arc::new(Column::new(*i))),
_ => Err(ExecutionError::NotImplemented(
"Unsupported expression".to_string(),
)),
Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
Expr::BinaryExpr { left, op, right } => Ok(Arc::new(BinaryExpr::new(
self.create_physical_expr(left, input_schema)?,
op.clone(),
self.create_physical_expr(right, input_schema)?,
))),
Expr::Cast { expr, data_type } => Ok(Arc::new(CastExpr::try_new(
self.create_physical_expr(expr, input_schema)?,
input_schema,
data_type.clone(),
)?)),
other => Err(ExecutionError::NotImplemented(format!(
"Physical plan does not support logical expression {:?}",
other
))),
}
}

Expand Down Expand Up @@ -569,6 +590,29 @@ mod tests {
Ok(())
}

#[test]
fn parallel_selection() -> Result<()> {
let tmp_dir = TempDir::new("parallel_selection")?;
let partition_count = 4;
let mut ctx = create_ctx(&tmp_dir, partition_count)?;

let logical_plan =
ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?;
let logical_plan = ctx.optimize(&logical_plan)?;

let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;

let results = ctx.collect(physical_plan.as_ref())?;

// there should be one batch per partition
assert_eq!(results.len(), partition_count);

let row_count: usize = results.iter().map(|batch| batch.num_rows()).sum();
assert_eq!(row_count, 20);

Ok(())
}

#[test]
fn aggregate() -> Result<()> {
let results = execute("SELECT SUM(c1), SUM(c2) FROM test", 4)?;
Expand Down
1 change: 1 addition & 0 deletions rust/datafusion/src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,4 @@ pub mod expressions;
pub mod hash_aggregate;
pub mod merge;
pub mod projection;
pub mod selection;
184 changes: 184 additions & 0 deletions rust/datafusion/src/execution/physical_plan/selection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines the selection execution plan. A selection filters rows based on a predicate
use std::sync::{Arc, Mutex};

use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::{
BatchIterator, ExecutionPlan, Partition, PhysicalExpr,
};
use arrow::array::BooleanArray;
use arrow::compute::filter;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;

/// Execution plan for a Selection
pub struct SelectionExec {
/// The selection predicate expression
expr: Arc<dyn PhysicalExpr>,
/// The input plan
input: Arc<dyn ExecutionPlan>,
}

impl SelectionExec {
/// Create a selection on an input
pub fn try_new(
expr: Arc<dyn PhysicalExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
Ok(Self {
expr: expr.clone(),
input: input.clone(),
})
}
}

impl ExecutionPlan for SelectionExec {
/// Get the schema for this execution plan
fn schema(&self) -> Arc<Schema> {
// The selection operator does not make any changes to the schema of its input
self.input.schema()
}

/// Get the partitions for this execution plan
fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
let partitions: Vec<Arc<dyn Partition>> = self
.input
.partitions()?
.iter()
.map(|p| {
let expr = self.expr.clone();
let partition: Arc<dyn Partition> = Arc::new(SelectionPartition {
schema: self.input.schema(),
expr,
input: p.clone() as Arc<dyn Partition>,
});

partition
})
.collect();

Ok(partitions)
}
}

/// Represents a single partition of a Selection execution plan
struct SelectionPartition {
schema: Arc<Schema>,
expr: Arc<dyn PhysicalExpr>,
input: Arc<dyn Partition>,
}

impl Partition for SelectionPartition {
/// Execute the Selection
fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>> {
Ok(Arc::new(Mutex::new(SelectionIterator {
schema: self.schema.clone(),
expr: self.expr.clone(),
input: self.input.execute()?,
})))
}
}

/// Selection iterator
struct SelectionIterator {
schema: Arc<Schema>,
expr: Arc<dyn PhysicalExpr>,
input: Arc<Mutex<dyn BatchIterator>>,
}

impl BatchIterator for SelectionIterator {
/// Get the schema
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}

/// Get the next batch
fn next(&mut self) -> Result<Option<RecordBatch>> {
let mut input = self.input.lock().unwrap();
match input.next()? {
Some(batch) => {
// evaluate the selection predicate to get a boolean array
let predicate_result = self.expr.evaluate(&batch)?;

if let Some(f) = predicate_result.as_any().downcast_ref::<BooleanArray>()
{
// filter each array
let mut filtered_arrays = vec![];
for i in 0..batch.num_columns() {
let array = batch.column(i);
let filtered_array = filter(array.as_ref(), f)?;
filtered_arrays.push(filtered_array);
}
Ok(Some(RecordBatch::try_new(
batch.schema().clone(),
filtered_arrays,
)?))
} else {
Err(ExecutionError::InternalError(
"Predicate evaluated to non-boolean value".to_string(),
))
}
}
None => Ok(None),
}
}
}

#[cfg(test)]
mod tests {

use super::*;
use crate::execution::physical_plan::csv::CsvExec;
use crate::execution::physical_plan::expressions::*;
use crate::execution::physical_plan::ExecutionPlan;
use crate::logicalplan::{Operator, ScalarValue};
use crate::test;
use std::iter::Iterator;

#[test]
fn simple_predicate() -> Result<()> {
let schema = test::aggr_test_schema();

let partitions = 4;
let path = test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;

let csv = CsvExec::try_new(&path, schema, true, None, 1024)?;

let predicate: Arc<dyn PhysicalExpr> = binary(
binary(col(1), Operator::Gt, lit(ScalarValue::UInt32(1))),
Operator::And,
binary(col(1), Operator::Lt, lit(ScalarValue::UInt32(4))),
);

let selection: Arc<dyn ExecutionPlan> =
Arc::new(SelectionExec::try_new(predicate, Arc::new(csv))?);

let results = test::execute(selection.as_ref())?;

results
.iter()
.for_each(|batch| assert_eq!(13, batch.num_columns()));
let row_count: usize = results.iter().map(|batch| batch.num_rows()).sum();
assert_eq!(41, row_count);

Ok(())
}

}

0 comments on commit 7f2d637

Please sign in to comment.