Skip to content

Commit

Permalink
ARROW-6656: [Rust][Datafusion] Add MAX, MIN expressions
Browse files Browse the repository at this point in the history
This is a naive implementation, just copy-paste, may need some refactor :)

Closes apache#5557 from alippai/ARROW-6656 and squashes the following commits:

3be218a <Andy Grove> Update to use new trait definition
85eed74 <Adam Lippai> ARROW-6656:  Add MIN expression
9f98de7 <Adam Lippai> ARROW-6656:  Add MAX expression

Lead-authored-by: Adam Lippai <[email protected]>
Co-authored-by: Andy Grove <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
  • Loading branch information
alippai and andygrove committed Oct 4, 2019
1 parent 461ff53 commit bf68fa8
Show file tree
Hide file tree
Showing 2 changed files with 537 additions and 1 deletion.
64 changes: 63 additions & 1 deletion rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::execution::limit::LimitRelation;
use crate::execution::physical_plan::common;
use crate::execution::physical_plan::datasource::DatasourceExec;
use crate::execution::physical_plan::expressions::{
BinaryExpr, CastExpr, Column, Count, Literal, Sum,
BinaryExpr, CastExpr, Column, Count, Literal, Max, Min, Sum,
};
use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
use crate::execution::physical_plan::merge::MergeExec;
Expand Down Expand Up @@ -333,6 +333,12 @@ impl ExecutionContext {
"sum" => Ok(Arc::new(Sum::new(
self.create_physical_expr(&args[0], input_schema)?,
))),
"max" => Ok(Arc::new(Max::new(
self.create_physical_expr(&args[0], input_schema)?,
))),
"min" => Ok(Arc::new(Min::new(
self.create_physical_expr(&args[0], input_schema)?,
))),
"count" => Ok(Arc::new(Count::new(
self.create_physical_expr(&args[0], input_schema)?,
))),
Expand Down Expand Up @@ -630,6 +636,34 @@ mod tests {
Ok(())
}

#[test]
fn aggregate_max() -> Result<()> {
let results = execute("SELECT MAX(c1), MAX(c2) FROM test", 4)?;
assert_eq!(results.len(), 1);

let batch = &results[0];
let expected: Vec<&str> = vec!["3,10"];
let mut rows = test::format_batch(&batch);
rows.sort();
assert_eq!(rows, expected);

Ok(())
}

#[test]
fn aggregate_min() -> Result<()> {
let results = execute("SELECT MIN(c1), MIN(c2) FROM test", 4)?;
assert_eq!(results.len(), 1);

let batch = &results[0];
let expected: Vec<&str> = vec!["0,1"];
let mut rows = test::format_batch(&batch);
rows.sort();
assert_eq!(rows, expected);

Ok(())
}

#[test]
fn aggregate_grouped() -> Result<()> {
let results = execute("SELECT c1, SUM(c2) FROM test GROUP BY c1", 4)?;
Expand All @@ -644,6 +678,34 @@ mod tests {
Ok(())
}

#[test]
fn aggregate_grouped_max() -> Result<()> {
let results = execute("SELECT c1, MAX(c2) FROM test GROUP BY c1", 4)?;
assert_eq!(results.len(), 1);

let batch = &results[0];
let expected: Vec<&str> = vec!["0,10", "1,10", "2,10", "3,10"];
let mut rows = test::format_batch(&batch);
rows.sort();
assert_eq!(rows, expected);

Ok(())
}

#[test]
fn aggregate_grouped_min() -> Result<()> {
let results = execute("SELECT c1, MIN(c2) FROM test GROUP BY c1", 4)?;
assert_eq!(results.len(), 1);

let batch = &results[0];
let expected: Vec<&str> = vec!["0,1", "1,1", "2,1", "3,1"];
let mut rows = test::format_batch(&batch);
rows.sort();
assert_eq!(rows, expected);

Ok(())
}

#[test]
fn count_basic() -> Result<()> {
let results = execute("SELECT COUNT(c1), COUNT(c2) FROM test", 1)?;
Expand Down
Loading

0 comments on commit bf68fa8

Please sign in to comment.