diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 57525b387f07..9880c7cff6fb 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -18,39 +18,40 @@ //! Aggregates functionalities use crate::execution::context::TaskContext; -use crate::physical_plan::aggregates::no_grouping::AggregateStream; +use crate::physical_plan::aggregates::{ + no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream, +}; use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; use crate::physical_plan::{ - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; use arrow::array::ArrayRef; +use arrow::compute::DEFAULT_CAST_OPTIONS; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use datafusion_common::utils::longest_consecutive_prefix; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::Accumulator; -use datafusion_physical_expr::expressions::{Avg, CastExpr, Column, Sum}; use datafusion_physical_expr::{ - expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr, + aggregate::row_accumulator::RowAccumulator, + equivalence::project_equivalence_properties, + expressions::{Avg, CastExpr, Column, Sum}, + normalize_out_expr_with_columns_map, + utils::{convert_to_expr, get_indices_of_matching_exprs}, + AggregateExpr, PhysicalExpr, PhysicalSortExpr, }; use std::any::Any; use std::collections::HashMap; - -use arrow::compute::DEFAULT_CAST_OPTIONS; use std::sync::Arc; mod no_grouping; mod row_hash; -use crate::physical_plan::aggregates::row_hash::GroupedHashAggregateStream; -use crate::physical_plan::EquivalenceProperties; pub use datafusion_expr::AggregateFunction; -use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator; -use datafusion_physical_expr::equivalence::project_equivalence_properties; pub use datafusion_physical_expr::expressions::create_aggregate_expr; -use datafusion_physical_expr::normalize_out_expr_with_columns_map; /// Hash aggregate modes #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -72,6 +73,15 @@ pub enum AggregateMode { Single, } +/// Group By expression modes +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum GroupByOrderMode { + /// Some of the expressions in the GROUP BY clause have an ordering. + PartiallyOrdered, + /// All the expressions in the GROUP BY clause have orderings. + Ordered, +} + /// Represents `GROUP BY` clause in the plan (including the more general GROUPING SET) /// In the case of a simple `GROUP BY a, b` clause, this will contain the expression [a, b] /// and a single group [false, false]. @@ -184,6 +194,15 @@ impl From for SendableRecordBatchStream { } } +#[derive(Debug, Clone)] +pub(crate) struct AggregationOrdering { + mode: GroupByOrderMode, + /// Stores indices such that when we iterate with these indices, GROUP BY + /// expressions match input ordering. + order_indices: Vec, + ordering: Vec, +} + /// Hash aggregate execution plan #[derive(Debug)] pub struct AggregateExec { @@ -208,6 +227,99 @@ pub struct AggregateExec { columns_map: HashMap>, /// Execution Metrics metrics: ExecutionPlanMetricsSet, + /// Stores mode, and output ordering information for the `AggregateExec`. + aggregation_ordering: Option, +} + +/// Calculates the working mode for `GROUP BY` queries. +/// - If no GROUP BY expression has an ordering, returns `None`. +/// - If some GROUP BY expressions have an ordering, returns `Some(GroupByOrderMode::PartiallyOrdered)`. +/// - If all GROUP BY expressions have orderings, returns `Some(GroupByOrderMode::Ordered)`. +fn get_working_mode( + input: &Arc, + group_by: &PhysicalGroupBy, +) -> Option<(GroupByOrderMode, Vec)> { + if group_by.groups.len() > 1 { + // We do not currently support streaming execution if we have more + // than one group (e.g. we have grouping sets). + return None; + }; + + let output_ordering = input.output_ordering().unwrap_or(&[]); + // Since direction of the ordering is not important for GROUP BY columns, + // we convert PhysicalSortExpr to PhysicalExpr in the existing ordering. + let ordering_exprs = convert_to_expr(output_ordering); + let groupby_exprs = group_by + .expr + .iter() + .map(|(item, _)| item.clone()) + .collect::>(); + // Find where each expression of the GROUP BY clause occurs in the existing + // ordering (if it occurs): + let mut ordered_indices = + get_indices_of_matching_exprs(&groupby_exprs, &ordering_exprs, || { + input.equivalence_properties() + }); + ordered_indices.sort(); + // Find out how many expressions of the existing ordering define ordering + // for expressions in the GROUP BY clause. For example, if the input is + // ordered by a, b, c, d and we group by b, a, d; the result below would be. + // 2, meaning 2 elements (a, b) among the GROUP BY columns define ordering. + let first_n = longest_consecutive_prefix(ordered_indices); + if first_n == 0 { + // No GROUP by columns are ordered, we can not do streaming execution. + return None; + } + let ordered_exprs = ordering_exprs[0..first_n].to_vec(); + // Find indices for the GROUP BY expressions such that when we iterate with + // these indices, we would match existing ordering. For the example above, + // this would produce 1, 0; meaning 1st and 0th entries (a, b) among the + // GROUP BY expressions b, a, d match input ordering. + let ordered_group_by_indices = + get_indices_of_matching_exprs(&ordered_exprs, &groupby_exprs, || { + input.equivalence_properties() + }); + Some(if first_n == group_by.expr.len() { + (GroupByOrderMode::Ordered, ordered_group_by_indices) + } else { + (GroupByOrderMode::PartiallyOrdered, ordered_group_by_indices) + }) +} + +fn calc_aggregation_ordering( + input: &Arc, + group_by: &PhysicalGroupBy, +) -> Option { + get_working_mode(input, group_by).map(|(mode, order_indices)| { + let existing_ordering = input.output_ordering().unwrap_or(&[]); + let out_group_expr = output_group_expr_helper(group_by); + // Calculate output ordering information for the operator: + let out_ordering = order_indices + .iter() + .zip(existing_ordering) + .map(|(idx, input_col)| PhysicalSortExpr { + expr: out_group_expr[*idx].clone(), + options: input_col.options, + }) + .collect::>(); + AggregationOrdering { + mode, + order_indices, + ordering: out_ordering, + } + }) +} + +/// Grouping expressions as they occur in the output schema +fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec> { + // Update column indices. Since the group by columns come first in the output schema, their + // indices are simply 0..self.group_expr(len). + group_by + .expr() + .iter() + .enumerate() + .map(|(index, (_, name))| Arc::new(Column::new(name, index)) as _) + .collect() } impl AggregateExec { @@ -240,6 +352,8 @@ impl AggregateExec { }; } + let aggregation_ordering = calc_aggregation_ordering(&input, &group_by); + Ok(AggregateExec { mode, group_by, @@ -250,6 +364,7 @@ impl AggregateExec { input_schema, columns_map, metrics: ExecutionPlanMetricsSet::new(), + aggregation_ordering, }) } @@ -265,16 +380,7 @@ impl AggregateExec { /// Grouping expressions as they occur in the output schema pub fn output_group_expr(&self) -> Vec> { - // Update column indices. Since the group by columns come first in the output schema, their - // indices are simply 0..self.group_expr(len). - self.group_by - .expr() - .iter() - .enumerate() - .map(|(index, (_col, name))| { - Arc::new(expressions::Column::new(name, index)) as Arc - }) - .collect() + output_group_expr_helper(&self.group_by) } /// Aggregate expressions @@ -305,6 +411,7 @@ impl AggregateExec { let batch_size = context.session_config().batch_size(); let input = self.input.execute(partition, Arc::clone(&context))?; let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + if self.group_by.expr.is_empty() { Ok(StreamType::AggregateStream(AggregateStream::new( self.mode, @@ -329,6 +436,7 @@ impl AggregateExec { batch_size, context, partition, + self.aggregation_ordering.clone(), )?, )) } @@ -373,20 +481,27 @@ impl ExecutionPlan for AggregateExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { if children[0] { - Err(DataFusionError::Plan( - "Aggregate Error: `GROUP BY` clause (including the more general GROUPING SET) is not supported for unbounded inputs.".to_string(), - )) + if self.aggregation_ordering.is_none() { + // Cannot run without breaking pipeline. + Err(DataFusionError::Plan( + "Aggregate Error: `GROUP BY` clauses with columns without ordering and GROUPING SETS are not supported for unbounded inputs.".to_string(), + )) + } else { + Ok(true) + } } else { Ok(false) } } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None + self.aggregation_ordering + .as_ref() + .map(|item: &AggregationOrdering| item.ordering.as_slice()) } fn required_input_distribution(&self) -> Vec { @@ -801,13 +916,14 @@ mod tests { use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use crate::from_slice::FromSlice; use crate::physical_plan::aggregates::{ - AggregateExec, AggregateMode, PhysicalGroupBy, + get_working_mode, AggregateExec, AggregateMode, PhysicalGroupBy, }; use crate::physical_plan::expressions::{col, Avg}; - use crate::test::assert_is_pending; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; + use crate::test::{assert_is_pending, csv_exec_sorted}; use crate::{assert_batches_sorted_eq, physical_plan::common}; use arrow::array::{Float64Array, UInt32Array}; + use arrow::compute::{concat_batches, SortOptions}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result, ScalarValue}; @@ -819,6 +935,7 @@ mod tests { use std::task::{Context, Poll}; use super::StreamType; + use crate::physical_plan::aggregates::GroupByOrderMode::{Ordered, PartiallyOrdered}; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::{ ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, @@ -826,6 +943,92 @@ mod tests { }; use crate::prelude::SessionContext; + // Generate a schema which consists of 5 columns (a, b, c, d, e) + fn create_test_schema() -> Result { + let a = Field::new("a", DataType::Int32, true); + let b = Field::new("b", DataType::Int32, true); + let c = Field::new("c", DataType::Int32, true); + let d = Field::new("d", DataType::Int32, true); + let e = Field::new("e", DataType::Int32, true); + let schema = Arc::new(Schema::new(vec![a, b, c, d, e])); + + Ok(schema) + } + + /// make PhysicalSortExpr with default options + fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { + sort_expr_options(name, schema, SortOptions::default()) + } + + /// PhysicalSortExpr with specified options + fn sort_expr_options( + name: &str, + schema: &Schema, + options: SortOptions, + ) -> PhysicalSortExpr { + PhysicalSortExpr { + expr: col(name, schema).unwrap(), + options, + } + } + + #[tokio::test] + async fn test_get_working_mode() -> Result<()> { + let test_schema = create_test_schema()?; + // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC NULLS FIRST + // Column d, e is not ordered. + let sort_exprs = vec![ + sort_expr("a", &test_schema), + sort_expr("b", &test_schema), + sort_expr("c", &test_schema), + ]; + let input = csv_exec_sorted(&test_schema, sort_exprs, true); + + // test cases consists of vector of tuples. Where each tuple represents a single test case. + // First field in the tuple is Vec where each element in the vector represents GROUP BY columns + // For instance `vec!["a", "b"]` corresponds to GROUP BY a, b + // Second field in the tuple is Option, which corresponds to expected algorithm mode. + // None represents that existing ordering is not sufficient to run executor with any one of the algorithms + // (We need to add SortExec to be able to run it). + // Some(GroupByOrderMode) represents, we can run algorithm with existing ordering; and algorithm should work in + // GroupByOrderMode. + let test_cases = vec![ + (vec!["a"], Some((Ordered, vec![0]))), + (vec!["b"], None), + (vec!["c"], None), + (vec!["b", "a"], Some((Ordered, vec![1, 0]))), + (vec!["c", "b"], None), + (vec!["c", "a"], Some((PartiallyOrdered, vec![1]))), + (vec!["c", "b", "a"], Some((Ordered, vec![2, 1, 0]))), + (vec!["d", "a"], Some((PartiallyOrdered, vec![1]))), + (vec!["d", "b"], None), + (vec!["d", "c"], None), + (vec!["d", "b", "a"], Some((PartiallyOrdered, vec![2, 1]))), + (vec!["d", "c", "b"], None), + (vec!["d", "c", "a"], Some((PartiallyOrdered, vec![2]))), + ( + vec!["d", "c", "b", "a"], + Some((PartiallyOrdered, vec![3, 2, 1])), + ), + ]; + for (case_idx, test_case) in test_cases.iter().enumerate() { + let (group_by_columns, expected) = &test_case; + let mut group_by_exprs = vec![]; + for col_name in group_by_columns { + group_by_exprs.push((col(col_name, &test_schema)?, col_name.to_string())); + } + let group_bys = PhysicalGroupBy::new_single(group_by_exprs); + let res = get_working_mode(&input, &group_bys); + assert_eq!( + res, *expected, + "Unexpected result for in unbounded test case#: {:?}, case: {:?}", + case_idx, test_case + ); + } + + Ok(()) + } + /// some mock data to aggregates fn some_data() -> (Arc, Vec) { // define a schema. @@ -940,9 +1143,7 @@ mod tests { let result = common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?; - assert_eq!(result.len(), 1); - - let batch = &result[0]; + let batch = concat_batches(&result[0].schema(), &result)?; assert_eq!(batch.num_columns(), 3); assert_eq!(batch.num_rows(), 12); @@ -1037,9 +1238,7 @@ mod tests { let result = common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?; - assert_eq!(result.len(), 1); - - let batch = &result[0]; + let batch = concat_batches(&result[0].schema(), &result)?; assert_eq!(batch.num_columns(), 2); assert_eq!(batch.num_rows(), 3); diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index bf1846ae983d..1fe7ccd61af6 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -24,35 +24,41 @@ use std::task::{Context, Poll}; use std::vec; use ahash::RandomState; -use arrow::row::{OwnedRow, RowConverter, SortField}; -use datafusion_physical_expr::hash_utils::create_hashes; use futures::ready; use futures::stream::{Stream, StreamExt}; +use hashbrown::raw::RawTable; +use itertools::izip; use crate::execution::context::TaskContext; use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use crate::physical_plan::aggregates::{ evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AccumulatorItem, - AggregateMode, PhysicalGroupBy, RowAccumulatorItem, + AggregateMode, AggregationOrdering, GroupByOrderMode, PhysicalGroupBy, + RowAccumulatorItem, }; use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; -use arrow::array::*; -use arrow::compute::{cast, filter}; + +use arrow::array::{ + new_null_array, Array, ArrayRef, BooleanArray, PrimitiveArray, UInt32Builder, +}; +use arrow::compute::{cast, filter, SortColumn}; use arrow::datatypes::{DataType, Schema, UInt32Type}; +use arrow::row::{OwnedRow, RowConverter, SortField}; use arrow::{compute, datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::cast::as_boolean_array; -use datafusion_common::utils::get_arrayref_at_indices; +use datafusion_common::utils::{ + evaluate_partition_ranges, get_arrayref_at_indices, get_row_at_idx, +}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::Accumulator; +use datafusion_physical_expr::hash_utils::create_hashes; use datafusion_row::accessor::RowAccessor; use datafusion_row::layout::RowLayout; use datafusion_row::reader::{read_row, RowReader}; use datafusion_row::MutableRecordBatch; -use hashbrown::raw::RawTable; -use itertools::izip; /// Grouping aggregate with row-format aggregation states inside. /// @@ -105,6 +111,8 @@ pub(crate) struct GroupedHashAggregateStream { /// first element in the array corresponds to normal accumulators /// second element in the array corresponds to row accumulators indices: [Vec>; 2], + aggregation_ordering: Option, + is_end: bool, } #[derive(Debug)] @@ -137,6 +145,8 @@ impl GroupedHashAggregateStream { batch_size: usize, context: Arc, partition: usize, + // Stores algorithm mode and output ordering + aggregation_ordering: Option, ) -> Result { let timer = baseline_metrics.elapsed_compute().timer(); @@ -237,6 +247,8 @@ impl GroupedHashAggregateStream { batch_size, row_group_skip_position: 0, indices: [normal_agg_indices, row_agg_indices], + is_end: false, + aggregation_ordering, }) } } @@ -275,6 +287,9 @@ impl Stream for GroupedHashAggregateStream { Some(Err(e)) => return Poll::Ready(Some(Err(e))), // inner is done, producing output None => { + for element in self.aggr_state.group_states.iter_mut() { + element.status = GroupStatus::CanEmit; + } self.exec_state = ExecutionState::ProducingOutput; } } @@ -285,12 +300,16 @@ impl Stream for GroupedHashAggregateStream { let result = self.create_batch_from_map(); timer.done(); - self.row_group_skip_position += self.batch_size; match result { // made output Ok(Some(result)) => { let batch = result.record_output(&self.baseline_metrics); + self.row_group_skip_position += batch.num_rows(); + if self.aggregation_ordering.is_some() { + self.exec_state = ExecutionState::ReadingInput; + self.prune(); + } return Poll::Ready(Some(Ok(batch))); } // end of output @@ -313,15 +332,136 @@ impl RecordBatchStream for GroupedHashAggregateStream { } } +/// This utility object encapsulates the row object, the hash and the group +/// indices for a group. This information is used when executing streaming +/// GROUP BY calculations. +struct GroupOrderInfo { + owned_row: OwnedRow, + hash: u64, + range: Range, +} + impl GroupedHashAggregateStream { - // Update the row_aggr_state according to groub_by values (result of group_by_expressions) + // Update the aggr_state according to group_by values (result of group_by_expressions) when group by + // expressions are fully ordered. + fn update_ordered_group_state( + &mut self, + group_values: &[ArrayRef], + per_group_indices: Vec, + allocated: &mut usize, + ) -> Result> { + // 1.1 construct the key from the group values + // 1.2 construct the mapping key if it does not exist + // 1.3 add the row' index to `indices` + + // track which entries in `aggr_state` have rows in this batch to aggregate + let mut groups_with_rows = vec![]; + + let AggregationState { + map: row_map, + group_states: row_group_states, + .. + } = &mut self.aggr_state; + + for GroupOrderInfo { + owned_row, + hash, + range, + } in per_group_indices + { + let entry = row_map.get_mut(hash, |(_hash, group_idx)| { + // verify that a group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + let group_state = &row_group_states[*group_idx]; + owned_row.row() == group_state.group_by_values.row() + }); + + match entry { + // Existing entry for this group value + Some((_hash, group_idx)) => { + let group_state = &mut row_group_states[*group_idx]; + + // 1.3 + if group_state.indices.is_empty() { + groups_with_rows.push(*group_idx); + }; + for row in range.start..range.end { + // remember this row + group_state.indices.push_accounted(row as u32, allocated); + } + } + // 1.2 Need to create new entry + None => { + let accumulator_set = + aggregates::create_accumulators(&self.normal_aggr_expr)?; + let ordered_columns = if let Some(state) = &self.aggregation_ordering + { + let row = get_row_at_idx(group_values, range.start)?; + let result = state + .order_indices + .iter() + .map(|idx| row[*idx].clone()) + .collect::>(); + Some(result) + } else { + None + }; + // Add new entry to group_states and save newly created index + let group_state = GroupState { + group_by_values: owned_row, + ordered_columns, + status: GroupStatus::GroupProgress, + hash, + aggregation_buffer: vec![ + 0; + self.row_aggr_layout.fixed_part_width() + ], + accumulator_set, + indices: (range.start as u32..range.end as u32) + .collect::>(), // 1.3 + }; + let group_idx = row_group_states.len(); + + // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by + // `group_states` (see allocation down below) + *allocated += (std::mem::size_of::() + * group_state.group_by_values.as_ref().len()) + + (std::mem::size_of::() + * group_state.aggregation_buffer.capacity()) + + (std::mem::size_of::() * group_state.indices.capacity()); + + // Allocation done by normal accumulators + *allocated += (std::mem::size_of::>() + * group_state.accumulator_set.capacity()) + + group_state + .accumulator_set + .iter() + .map(|accu| accu.size()) + .sum::(); + + // for hasher function, use precomputed hash value + row_map.insert_accounted( + (hash, group_idx), + |(hash, _group_index)| *hash, + allocated, + ); + + row_group_states.push_accounted(group_state, allocated); + + groups_with_rows.push(group_idx); + } + }; + } + Ok(groups_with_rows) + } + + // Update the aggr_state according to group_by values (result of group_by_expressions) fn update_group_state( &mut self, group_values: &[ArrayRef], allocated: &mut usize, ) -> Result> { - let group_rows = self.row_converter.convert_columns(group_values)?; - let n_rows = group_rows.num_rows(); // 1.1 construct the key from the group values // 1.2 construct the mapping key if it does not exist // 1.3 add the row' index to `indices` @@ -329,6 +469,8 @@ impl GroupedHashAggregateStream { // track which entries in `aggr_state` have rows in this batch to aggregate let mut groups_with_rows = vec![]; + let group_rows = self.row_converter.convert_columns(group_values)?; + let n_rows = group_rows.num_rows(); // 1.1 Calculate the group keys for the group values let mut batch_hashes = vec![0; n_rows]; create_hashes(group_values, &self.random_state, &mut batch_hashes)?; @@ -362,9 +504,24 @@ impl GroupedHashAggregateStream { None => { let accumulator_set = aggregates::create_accumulators(&self.normal_aggr_expr)?; + let ordered_columns = if let Some(state) = &self.aggregation_ordering + { + let row = get_row_at_idx(group_values, row)?; + let result = state + .order_indices + .iter() + .map(|idx| row[*idx].clone()) + .collect::>(); + Some(result) + } else { + None + }; // Add new entry to group_states and save newly created index let group_state = GroupState { group_by_values: group_rows.row(row).owned(), + ordered_columns, + status: GroupStatus::GroupProgress, + hash, aggregation_buffer: vec![ 0; self.row_aggr_layout.fixed_part_width() @@ -407,7 +564,7 @@ impl GroupedHashAggregateStream { Ok(groups_with_rows) } - // Update the accumulator results, according to row_aggr_state. + // Update the accumulator results, according to aggr_state. #[allow(clippy::too_many_arguments)] fn update_accumulators_using_batch( &mut self, @@ -490,7 +647,7 @@ impl GroupedHashAggregateStream { Ok(()) } - // Update the accumulator results, according to row_aggr_state. + // Update the accumulator results, according to aggr_state. fn update_accumulators_using_scalar( &mut self, groups_with_rows: &[usize], @@ -562,8 +719,48 @@ impl GroupedHashAggregateStream { let row_converter_size_pre = self.row_converter.size(); for group_values in &group_by_values { - let groups_with_rows = - self.update_group_state(group_values, &mut allocated)?; + let groups_with_rows = if let Some(AggregationOrdering { + mode: GroupByOrderMode::Ordered, + order_indices, + ordering, + }) = &self.aggregation_ordering + { + let group_rows = self.row_converter.convert_columns(group_values)?; + let n_rows = group_rows.num_rows(); + // 1.1 Calculate the group keys for the group values + let mut batch_hashes = vec![0; n_rows]; + create_hashes(group_values, &self.random_state, &mut batch_hashes)?; + let sort_column = order_indices + .iter() + .enumerate() + .map(|(idx, cur_idx)| SortColumn { + values: group_values[*cur_idx].clone(), + options: Some(ordering[idx].options), + }) + .collect::>(); + let n_rows = group_rows.num_rows(); + let ranges = evaluate_partition_ranges(n_rows, &sort_column)?; + let per_group_indices = ranges + .into_iter() + .map(|range| { + let row = group_rows.row(range.start).owned(); + // (row, batch_hashes[range.start], range) + GroupOrderInfo { + owned_row: row, + hash: batch_hashes[range.start], + range, + } + }) + .collect::>(); + self.update_ordered_group_state( + group_values, + per_group_indices, + &mut allocated, + )? + } else { + self.update_group_state(group_values, &mut allocated)? + }; + // Decide the accumulators update mode, use scalar value to update the accumulators when all of the conditions are meet: // 1) The aggregation mode is Partial or Single // 2) There is not normal aggregation expressions @@ -580,7 +777,7 @@ impl GroupedHashAggregateStream { )?; } else { // Collect all indices + offsets based on keys in this vec - let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0); + let mut batch_indices = UInt32Builder::with_capacity(0); let mut offsets = vec![0]; let mut offset_so_far = 0; for &group_idx in groups_with_rows.iter() { @@ -591,38 +788,93 @@ impl GroupedHashAggregateStream { } let batch_indices = batch_indices.finish(); - let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?; - let normal_values = - get_at_indices(&normal_aggr_input_values, &batch_indices)?; let row_filter_values = get_optional_filters(&row_filter_values, &batch_indices); let normal_filter_values = get_optional_filters(&normal_filter_values, &batch_indices); - self.update_accumulators_using_batch( - &groups_with_rows, - &offsets, - &row_values, - &normal_values, - &row_filter_values, - &normal_filter_values, - &mut allocated, - )?; + if self + .aggregation_ordering + .as_ref() + .map_or(false, |s| s.mode == GroupByOrderMode::Ordered) + { + self.update_accumulators_using_batch( + &groups_with_rows, + &offsets, + &row_aggr_input_values, + &normal_aggr_input_values, + &row_filter_values, + &normal_filter_values, + &mut allocated, + )?; + } else { + let row_values = + get_at_indices(&row_aggr_input_values, &batch_indices)?; + let normal_values = + get_at_indices(&normal_aggr_input_values, &batch_indices)?; + self.update_accumulators_using_batch( + &groups_with_rows, + &offsets, + &row_values, + &normal_values, + &row_filter_values, + &normal_filter_values, + &mut allocated, + )?; + }; } } allocated += self .row_converter .size() .saturating_sub(row_converter_size_pre); + + if self.aggregation_ordering.is_some() { + let mut new_result = false; + let last_ordered_columns = self + .aggr_state + .group_states + .last() + .map(|item| item.ordered_columns.clone()); + + if let Some(last_ordered_columns) = last_ordered_columns { + for cur_group in &mut self.aggr_state.group_states { + if cur_group.ordered_columns != last_ordered_columns { + // We will no longer receive value. Set status to GroupStatus::CanEmit + // meaning we can generate result for this group. + cur_group.status = GroupStatus::CanEmit; + new_result = true; + } + } + } + if new_result { + self.exec_state = ExecutionState::ProducingOutput; + } + } + Ok(allocated) } } +#[derive(Debug, PartialEq)] +enum GroupStatus { + // `GroupProgress` means data for current group is not complete. New data may arrive. + GroupProgress, + // `CanEmit` means data for current group is completed. And its result can emitted. + CanEmit, + // Emitted means that result for the groups is outputted. Group can be pruned from state. + Emitted, +} + /// The state that is built for each output group. #[derive(Debug)] pub struct GroupState { /// The actual group by values, stored sequentially group_by_values: OwnedRow, + ordered_columns: Option>, + status: GroupStatus, + hash: u64, + // Accumulator state, stored sequentially pub aggregation_buffer: Vec, @@ -663,12 +915,32 @@ impl std::fmt::Debug for AggregationState { } impl GroupedHashAggregateStream { + /// Prune the groups from the `self.aggr_state.group_states` which are in + /// `GroupStatus::Emitted`(this status means that result of this group emitted/outputted already, and + /// we are sure that these groups cannot receive new rows.) status. + fn prune(&mut self) { + let n_partition = self.aggr_state.group_states.len(); + self.aggr_state + .group_states + .retain(|elem| elem.status != GroupStatus::Emitted); + let n_partition_new = self.aggr_state.group_states.len(); + let n_pruned = n_partition - n_partition_new; + self.aggr_state.map.clear(); + for (idx, item) in self.aggr_state.group_states.iter().enumerate() { + self.aggr_state + .map + .insert(item.hash, (item.hash, idx), |(hash, _)| *hash); + } + self.row_group_skip_position -= n_pruned; + } + /// Create a RecordBatch with all group keys and accumulator' states or values. fn create_batch_from_map(&mut self) -> Result> { let skip_items = self.row_group_skip_position; - if skip_items > self.aggr_state.group_states.len() { + if skip_items > self.aggr_state.group_states.len() || self.is_end { return Ok(None); } + self.is_end |= skip_items == self.aggr_state.group_states.len(); if self.aggr_state.group_states.is_empty() { let schema = self.schema.clone(); return Ok(Some(RecordBatch::new_empty(schema))); @@ -679,6 +951,11 @@ impl GroupedHashAggregateStream { self.aggr_state.group_states.len(), ); let group_state_chunk = &self.aggr_state.group_states[skip_items..end_idx]; + // Consider only the groups that can be emitted. (The ones we are sure that will not receive new entry.) + let group_state_chunk = group_state_chunk + .iter() + .filter(|item| item.status == GroupStatus::CanEmit) + .collect::>(); if group_state_chunk.is_empty() { let schema = self.schema.clone(); @@ -785,6 +1062,14 @@ impl GroupedHashAggregateStream { } } } + + // Set status of the emitted groups to GroupStatus::Emitted mode. + for gs in self.aggr_state.group_states[skip_items..end_idx].iter_mut() { + if gs.status == GroupStatus::CanEmit { + gs.status = GroupStatus::Emitted; + } + } + Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?)) } } diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index 2e12251c2f9b..08b5bb34ed87 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -77,7 +77,7 @@ impl ExecutionPlan for AnalyzeExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { if children[0] { diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index ec7dd7b4d63a..82f37ea39684 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -96,7 +96,7 @@ impl ExecutionPlan for CoalesceBatchesExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { Ok(children[0]) diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs index 8ff8a37aa36a..fe667d1e6e2a 100644 --- a/datafusion/core/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs @@ -81,7 +81,7 @@ impl ExecutionPlan for CoalescePartitionsExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { Ok(children[0]) diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index 494d3fc869ea..3786568cf35c 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -107,7 +107,7 @@ impl ExecutionPlan for FilterExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { Ok(children[0]) diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs index 44a91865e27c..67b7fd971891 100644 --- a/datafusion/core/src/physical_plan/joins/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -160,7 +160,7 @@ impl ExecutionPlan for CrossJoinExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { if children[0] || children[1] { diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index c624c5c17d82..05a3c252436a 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -247,7 +247,7 @@ impl ExecutionPlan for HashJoinExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { let (left, right) = (children[0], children[1]); diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index 23d3d70848c2..4356d4aa85e0 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -514,13 +514,12 @@ impl ExecutionPlan for SymmetricHashJoinExec { for (join_side, child) in join_sides.iter().zip(children.iter()) { let sorted_expr = child .output_ordering() - .and_then(|orders| orders.first()) - .and_then(|order| { + .and_then(|orders| { build_filter_input_order( *join_side, filter, &child.schema(), - order, + &orders[0], ) .transpose() }) diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 50c145095bbe..01ece80aca43 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -111,7 +111,7 @@ pub trait ExecutionPlan: Debug + Send + Sync { fn output_partitioning(&self) -> Partitioning; /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, _children: &[bool]) -> Result { Ok(false) diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index 799de0c1914c..49c429f94d9c 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -156,7 +156,7 @@ impl ExecutionPlan for ProjectionExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { Ok(children[0]) diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 7f13418d26f7..8db230a1222a 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -318,7 +318,7 @@ impl ExecutionPlan for RepartitionExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { Ok(children[0]) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 1651ed1c6c9d..db0baffb88bf 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -739,7 +739,7 @@ impl ExecutionPlan for SortExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { if children[0] { diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index 044881324089..cace842aafbd 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -157,7 +157,7 @@ impl ExecutionPlan for UnionExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { Ok(children.iter().any(|x| *x)) @@ -355,7 +355,7 @@ impl ExecutionPlan for InterleaveExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { Ok(children.iter().any(|x| *x)) diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs index 0d60273c3010..9a9408035d1c 100644 --- a/datafusion/core/src/physical_plan/unnest.rs +++ b/datafusion/core/src/physical_plan/unnest.rs @@ -77,7 +77,7 @@ impl ExecutionPlan for UnnestExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { Ok(children[0]) diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs index 5dbafab76f71..ac84a6a39511 100644 --- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs @@ -171,8 +171,7 @@ impl BoundedWindowAggExec { // to calculate partition separation points pub fn partition_by_sort_keys(&self) -> Result> { // Partition by sort keys indices are stored in self.ordered_partition_by_indices. - let sort_keys = self.input.output_ordering(); - let sort_keys = sort_keys.unwrap_or(&[]); + let sort_keys = self.input.output_ordering().unwrap_or(&[]); get_at_indices(sort_keys, &self.ordered_partition_by_indices) } diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index bda1b52ff889..7ca95954ce41 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -144,7 +144,7 @@ impl ExecutionPlan for WindowAggExec { } /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but it its input(s) are + /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result { if children[0] { diff --git a/datafusion/core/tests/aggregate_fuzz.rs b/datafusion/core/tests/aggregate_fuzz.rs new file mode 100644 index 000000000000..4491ad45df31 --- /dev/null +++ b/datafusion/core/tests/aggregate_fuzz.rs @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int64Array}; +use arrow::compute::{concat_batches, SortOptions}; +use arrow::datatypes::DataType; +use arrow::record_batch::RecordBatch; +use arrow::util::pretty::pretty_format_batches; +use datafusion::physical_plan::aggregates::{ + AggregateExec, AggregateMode, PhysicalGroupBy, +}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + +use datafusion::physical_plan::collect; +use datafusion::physical_plan::memory::MemoryExec; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_physical_expr::expressions::{col, Sum}; +use datafusion_physical_expr::{AggregateExpr, PhysicalSortExpr}; +use test_utils::add_empty_batches; + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test(flavor = "multi_thread", worker_threads = 8)] + async fn aggregate_test() { + let test_cases = vec![ + vec!["a"], + vec!["b", "a"], + vec!["c", "a"], + vec!["c", "b", "a"], + vec!["d", "a"], + vec!["d", "b", "a"], + vec!["d", "c", "a"], + vec!["d", "c", "b", "a"], + ]; + let n = 300; + let distincts = vec![10, 20]; + for distinct in distincts { + let mut handles = Vec::new(); + for i in 0..n { + let test_idx = i % test_cases.len(); + let group_by_columns = test_cases[test_idx].clone(); + let job = tokio::spawn(run_aggregate_test( + make_staggered_batches::(1000, distinct, i as u64), + group_by_columns, + )); + handles.push(job); + } + for job in handles { + job.await.unwrap(); + } + } + } +} + +/// Perform batch and running window same input +/// and verify outputs of `WindowAggExec` and `BoundedWindowAggExec` are equal +async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str>) { + let schema = input1[0].schema(); + let session_config = SessionConfig::new().with_batch_size(50); + let ctx = SessionContext::with_config(session_config); + let mut sort_keys = vec![]; + for ordering_col in ["a", "b", "c"] { + sort_keys.push(PhysicalSortExpr { + expr: col(ordering_col, &schema).unwrap(), + options: SortOptions::default(), + }) + } + + let concat_input_record = concat_batches(&schema, &input1).unwrap(); + let usual_source = Arc::new( + MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap(), + ); + + let running_source = Arc::new( + MemoryExec::try_new(&[input1.clone()], schema.clone(), None) + .unwrap() + .with_sort_information(sort_keys), + ); + + let aggregate_expr = vec![Arc::new(Sum::new( + col("d", &schema).unwrap(), + "sum1", + DataType::Int64, + )) as Arc]; + let expr = group_by_columns + .iter() + .map(|elem| (col(elem, &schema).unwrap(), elem.to_string())) + .collect::>(); + let group_by = PhysicalGroupBy::new_single(expr); + let aggregate_exec_running = Arc::new( + AggregateExec::try_new( + AggregateMode::Partial, + group_by.clone(), + aggregate_expr.clone(), + vec![None], + running_source, + schema.clone(), + ) + .unwrap(), + ) as _; + + let aggregate_exec_usual = Arc::new( + AggregateExec::try_new( + AggregateMode::Partial, + group_by.clone(), + aggregate_expr.clone(), + vec![None], + usual_source, + schema.clone(), + ) + .unwrap(), + ) as _; + + let task_ctx = ctx.task_ctx(); + let collected_usual = collect(aggregate_exec_usual, task_ctx.clone()) + .await + .unwrap(); + + let collected_running = collect(aggregate_exec_running, task_ctx.clone()) + .await + .unwrap(); + assert!(collected_running.len() > 2); + // Running should produce more chunk than the usual AggregateExec. + // Otherwise it means that we cannot generate result in running mode. + assert!(collected_running.len() > collected_usual.len()); + // compare + let usual_formatted = pretty_format_batches(&collected_usual).unwrap().to_string(); + let running_formatted = pretty_format_batches(&collected_running) + .unwrap() + .to_string(); + + let mut usual_formatted_sorted: Vec<&str> = usual_formatted.trim().lines().collect(); + usual_formatted_sorted.sort_unstable(); + + let mut running_formatted_sorted: Vec<&str> = + running_formatted.trim().lines().collect(); + running_formatted_sorted.sort_unstable(); + for (i, (usual_line, running_line)) in usual_formatted_sorted + .iter() + .zip(&running_formatted_sorted) + .enumerate() + { + assert_eq!((i, usual_line), (i, running_line), "Inconsistent result"); + } +} + +/// Return randomly sized record batches with: +/// three sorted int64 columns 'a', 'b', 'c' ranged from 0..'n_distinct' as columns +/// one random int64 column 'd' as other columns +pub(crate) fn make_staggered_batches( + len: usize, + n_distinct: usize, + random_seed: u64, +) -> Vec { + // use a random number generator to pick a random sized output + let mut rng = StdRng::seed_from_u64(random_seed); + let mut input123: Vec<(i64, i64, i64)> = vec![(0, 0, 0); len]; + let mut input4: Vec = vec![0; len]; + input123.iter_mut().for_each(|v| { + *v = ( + rng.gen_range(0..n_distinct) as i64, + rng.gen_range(0..n_distinct) as i64, + rng.gen_range(0..n_distinct) as i64, + ) + }); + input4.iter_mut().for_each(|v| { + *v = rng.gen_range(0..n_distinct) as i64; + }); + input123.sort(); + let input1 = Int64Array::from_iter_values(input123.clone().into_iter().map(|k| k.0)); + let input2 = Int64Array::from_iter_values(input123.clone().into_iter().map(|k| k.1)); + let input3 = Int64Array::from_iter_values(input123.clone().into_iter().map(|k| k.2)); + let input4 = Int64Array::from_iter_values(input4.into_iter()); + + // split into several record batches + let mut remainder = RecordBatch::try_from_iter(vec![ + ("a", Arc::new(input1) as ArrayRef), + ("b", Arc::new(input2) as ArrayRef), + ("c", Arc::new(input3) as ArrayRef), + ("d", Arc::new(input4) as ArrayRef), + ]) + .unwrap(); + + let mut batches = vec![]; + if STREAM { + while remainder.num_rows() > 0 { + let batch_size = rng.gen_range(0..50); + if remainder.num_rows() < batch_size { + break; + } + batches.push(remainder.slice(0, batch_size)); + remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size); + } + } else { + while remainder.num_rows() > 0 { + let batch_size = rng.gen_range(0..remainder.num_rows() + 1); + batches.push(remainder.slice(0, batch_size)); + remainder = remainder.slice(batch_size, remainder.num_rows() - batch_size); + } + } + add_empty_batches(batches, &mut rng) +} diff --git a/datafusion/core/tests/sql/group_by.rs b/datafusion/core/tests/sql/group_by.rs index b4a92db3fc37..299c49028b5a 100644 --- a/datafusion/core/tests/sql/group_by.rs +++ b/datafusion/core/tests/sql/group_by.rs @@ -16,6 +16,7 @@ // under the License. use super::*; +use datafusion::test_util::get_test_context2; #[tokio::test] async fn group_by_date_trunc() -> Result<()> { @@ -160,3 +161,99 @@ async fn group_by_dictionary() { run_test_case::().await; run_test_case::().await; } + +#[tokio::test] +async fn test_source_sorted_groupby() -> Result<()> { + let tmpdir = TempDir::new().unwrap(); + let session_config = SessionConfig::new().with_target_partitions(1); + let ctx = get_test_context2(&tmpdir, true, session_config).await?; + + let sql = "SELECT a, b, + SUM(c) as summation1 + FROM annotated_data + GROUP BY b, a"; + + let msg = format!("Creating logical plan for '{sql}'"); + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; + let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + let expected = { + vec![ + "ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data.c)@2 as summation1]", + " AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data.c)]", + ] + }; + + let actual: Vec<&str> = formatted.trim().lines().collect(); + let actual_len = actual.len(); + let actual_trim_last = &actual[..actual_len - 1]; + assert_eq!( + expected, actual_trim_last, + "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+---+---+------------+", + "| a | b | summation1 |", + "+---+---+------------+", + "| 0 | 0 | 300 |", + "| 0 | 1 | 925 |", + "| 1 | 2 | 1550 |", + "| 1 | 3 | 2175 |", + "+---+---+------------+", + ]; + assert_batches_eq!(expected, &actual); + Ok(()) +} + +#[tokio::test] +async fn test_source_sorted_groupby2() -> Result<()> { + let tmpdir = TempDir::new().unwrap(); + let session_config = SessionConfig::new().with_target_partitions(1); + let ctx = get_test_context2(&tmpdir, true, session_config).await?; + + let sql = "SELECT a, d, + SUM(c) as summation1 + FROM annotated_data + GROUP BY d, a"; + + let msg = format!("Creating logical plan for '{sql}'"); + let dataframe = ctx.sql(sql).await.expect(&msg); + let physical_plan = dataframe.create_physical_plan().await?; + let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + let expected = { + vec![ + "ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data.c)@2 as summation1]", + " AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data.c)]", + ] + }; + + let actual: Vec<&str> = formatted.trim().lines().collect(); + let actual_len = actual.len(); + let actual_trim_last = &actual[..actual_len - 1]; + assert_eq!( + expected, actual_trim_last, + "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+---+---+------------+", + "| a | d | summation1 |", + "+---+---+------------+", + "| 0 | 0 | 292 |", + "| 0 | 2 | 196 |", + "| 0 | 1 | 315 |", + "| 0 | 4 | 164 |", + "| 0 | 3 | 258 |", + "| 1 | 0 | 622 |", + "| 1 | 3 | 299 |", + "| 1 | 1 | 1043 |", + "| 1 | 4 | 913 |", + "| 1 | 2 | 848 |", + "+---+---+------------+", + ]; + assert_batches_eq!(expected, &actual); + Ok(()) +} diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt index 8b95005e1bcb..60339955b418 100644 --- a/datafusion/core/tests/sqllogictests/test_files/window.slt +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -356,23 +356,22 @@ Sort: d.b ASC NULLS LAST EmptyRelation physical_plan SortPreservingMergeExec: [b@0 ASC NULLS LAST] - SortExec: expr=[b@0 ASC NULLS LAST] - ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as MAX(d.seq)] - AggregateExec: mode=Single, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)] - ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 as a, b@1 as b] - BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted] - SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST] - CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([Column { name: "b", index: 1 }], 4), input_partitions=4 - UnionExec - ProjectionExec: expr=[1 as a, aa as b] - EmptyExec: produce_one_row=true - ProjectionExec: expr=[3 as a, aa as b] - EmptyExec: produce_one_row=true - ProjectionExec: expr=[5 as a, bb as b] - EmptyExec: produce_one_row=true - ProjectionExec: expr=[7 as a, bb as b] - EmptyExec: produce_one_row=true + ProjectionExec: expr=[b@0 as b, MAX(d.a)@1 as max_a, MAX(d.seq)@2 as MAX(d.seq)] + AggregateExec: mode=Single, gby=[b@2 as b], aggr=[MAX(d.a), MAX(d.seq)] + ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [s.b] ORDER BY [s.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as seq, a@0 as a, b@1 as b] + BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: "ROW_NUMBER()", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], mode=[Sorted] + SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([Column { name: "b", index: 1 }], 4), input_partitions=4 + UnionExec + ProjectionExec: expr=[1 as a, aa as b] + EmptyExec: produce_one_row=true + ProjectionExec: expr=[3 as a, aa as b] + EmptyExec: produce_one_row=true + ProjectionExec: expr=[5 as a, bb as b] + EmptyExec: produce_one_row=true + ProjectionExec: expr=[7 as a, bb as b] + EmptyExec: produce_one_row=true # check actual result diff --git a/datafusion/core/tests/window_fuzz.rs b/datafusion/core/tests/window_fuzz.rs index e8524304a309..77b6e0a5d11b 100644 --- a/datafusion/core/tests/window_fuzz.rs +++ b/datafusion/core/tests/window_fuzz.rs @@ -480,6 +480,10 @@ async fn run_window_test( let collected_running = collect(running_window_exec, task_ctx.clone()) .await .unwrap(); + + // BoundedWindowAggExec should produce more chunk than the usual WindowAggExec. + // Otherwise it means that we cannot generate result in running mode. + assert!(collected_running.len() > collected_usual.len()); // compare let usual_formatted = pretty_format_batches(&collected_usual).unwrap().to_string(); let running_formatted = pretty_format_batches(&collected_running)