Skip to content

Commit

Permalink
fix rebase conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Sep 20, 2019
1 parent fbd2278 commit 27c2280
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 32 deletions.
49 changes: 26 additions & 23 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ use crate::execution::filter::FilterRelation;
use crate::execution::limit::LimitRelation;
use crate::execution::physical_plan::common;
use crate::execution::physical_plan::datasource::DatasourceExec;
use crate::execution::physical_plan::expressions::{BinaryExpr, CastExpr, Column, Literal,};
use crate::execution::physical_plan::expressions::{
BinaryExpr, CastExpr, Column, Literal, Sum,
};
use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
use crate::execution::physical_plan::merge::MergeExec;
use crate::execution::physical_plan::projection::ProjectionExec;
Expand Down Expand Up @@ -587,6 +589,29 @@ mod tests {
Ok(())
}

#[test]
fn parallel_selection() -> Result<()> {
let tmp_dir = TempDir::new("parallel_selection")?;
let partition_count = 4;
let mut ctx = create_ctx(&tmp_dir, partition_count)?;

let logical_plan =
ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?;
let logical_plan = ctx.optimize(&logical_plan)?;

let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;

let results = ctx.collect(physical_plan.as_ref())?;

// there should be one batch per partition
assert_eq!(results.len(), partition_count);

let row_count: usize = results.iter().map(|batch| batch.num_rows()).sum();
assert_eq!(row_count, 20);

Ok(())
}

#[test]
fn aggregate() -> Result<()> {
let results = execute("SELECT SUM(c1), SUM(c2) FROM test", 4)?;
Expand Down Expand Up @@ -628,28 +653,6 @@ mod tests {
ctx.collect(physical_plan.as_ref())
}

fn parallel_selection() -> Result<()> {
let tmp_dir = TempDir::new("parallel_selection")?;
let partition_count = 4;
let mut ctx = create_ctx(&tmp_dir, partition_count)?;

let logical_plan =
ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?;
let logical_plan = ctx.optimize(&logical_plan)?;

let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;

let results = ctx.collect(physical_plan.as_ref())?;

// there should be one batch per partition
assert_eq!(results.len(), partition_count);

let row_count: usize = results.iter().map(|batch| batch.num_rows()).sum();
assert_eq!(row_count, 20);

Ok(())
}

/// Generate a partitioned CSV file and register it with an execution context
fn create_ctx(tmp_dir: &TempDir, partition_count: usize) -> Result<ExecutionContext> {
let mut ctx = ExecutionContext::new();
Expand Down
24 changes: 15 additions & 9 deletions rust/datafusion/src/execution/physical_plan/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,21 @@ use std::rc::Rc;
use std::sync::Arc;

use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::logicalplan::{ScalarValue, Operator};
use arrow::array::{ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array, UInt32Builder};
use arrow::compute::kernels::comparison::{lt, lt_eq, gt, gt_eq, eq, neq};
use crate::execution::physical_plan::{
Accumulator, AggregateExpr, PhysicalExpr, PhysicalExprRef,
};
use crate::logicalplan::{Operator, ScalarValue};
use arrow::array::{
ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
Int64Array, Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::array::{
Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
Int8Builder, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder,
};
use arrow::compute::kernels::boolean::{and, or};
use arrow::compute::kernels::cast::cast;
use arrow::compute::kernels::comparison::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;

Expand Down Expand Up @@ -478,11 +489,6 @@ impl PhysicalExpr for CastExpr {
}
}

/// Create a column expression
pub fn col(i: usize) -> PhysicalExprRef {
Arc::new(Column::new(i))
}

/// Create a literal expression
pub fn lit(value: ScalarValue) -> PhysicalExprRef {
Arc::new(Literal::new(value))
Expand Down

0 comments on commit 27c2280

Please sign in to comment.