diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 8af71d5abbb36..30293d02e7a3f 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -324,6 +324,15 @@ config_namespace! { /// Should DataFusion keep the columns used for partition_by in the output RecordBatches pub keep_partition_by_columns: bool, default = false + + /// Aggregation ratio (number of distinct groups / number of input rows) + /// threshold for skipping partial aggregation. If the value is greater + /// then partial aggregation will skip aggregation for further input + pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8 + + /// Number of input rows partial aggregation partition should process, before + /// aggregation ratio check and trying to switch to skipping aggregation mode + pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000 } } diff --git a/datafusion/expr/src/groups_accumulator.rs b/datafusion/expr/src/groups_accumulator.rs index 0d57c403bbe0b..c96fce4e0cecc 100644 --- a/datafusion/expr/src/groups_accumulator.rs +++ b/datafusion/expr/src/groups_accumulator.rs @@ -18,7 +18,7 @@ //! Vectorized [`GroupsAccumulator`] use arrow_array::{ArrayRef, BooleanArray}; -use datafusion_common::Result; +use datafusion_common::{not_impl_err, Result}; /// Describes how many rows should be emitted during grouping. #[derive(Debug, Clone, Copy)] @@ -158,6 +158,24 @@ pub trait GroupsAccumulator: Send { total_num_groups: usize, ) -> Result<()>; + /// Converts input batch to intermediate aggregate state, + /// without grouping (each input row considered as a separate + /// group). + fn convert_to_state( + &self, + _values: &[ArrayRef], + _opt_filter: Option<&BooleanArray>, + ) -> Result> { + not_impl_err!("Input batch conversion to state not implemented") + } + + /// Returns `true` is groups accumulator supports input batch + /// to intermediate aggregate state conversion (`convert_to_state` + /// method is implemented). + fn convert_to_state_supported(&self) -> bool { + false + } + /// Amount of memory used to store the state of this accumulator, /// in bytes. This function is called once per batch, so it should /// be `O(n)` to compute, not `O(num_groups)` diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 56850d0e02a16..ce86fa3301686 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -33,7 +33,7 @@ use arrow::{ }; use arrow::{ - array::{Array, BooleanArray, Int64Array, PrimitiveArray}, + array::{Array, BooleanArray, Int64Array, Int64Builder, PrimitiveArray}, buffer::BooleanBuffer, }; use datafusion_common::{ @@ -433,6 +433,49 @@ impl GroupsAccumulator for CountGroupsAccumulator { Ok(vec![Arc::new(counts) as ArrayRef]) } + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + let values = &values[0]; + + let state_array = match (values.logical_nulls(), opt_filter) { + (Some(nulls), None) => { + let mut builder = Int64Builder::with_capacity(values.len()); + nulls + .into_iter() + .for_each(|is_valid| builder.append_value(is_valid as i64)); + builder.finish() + } + (Some(nulls), Some(filter)) => { + let mut builder = Int64Builder::with_capacity(values.len()); + nulls.into_iter().zip(filter.iter()).for_each( + |(is_valid, filter_value)| { + builder.append_value( + (is_valid && filter_value.is_some_and(|val| val)) as i64, + ) + }, + ); + builder.finish() + } + (None, Some(filter)) => { + let mut builder = Int64Builder::with_capacity(values.len()); + filter.into_iter().for_each(|filter_value| { + builder.append_value(filter_value.is_some_and(|val| val) as i64) + }); + builder.finish() + } + (None, None) => Int64Array::from_value(1, values.len()), + }; + + Ok(vec![Arc::new(state_array)]) + } + + fn convert_to_state_supported(&self) -> bool { + true + } + fn size(&self) -> usize { self.counts.capacity() * std::mem::size_of::() } diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs index debb36852b224..5aad5486dc30c 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}; +use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray, PrimitiveBuilder}; use arrow::datatypes::ArrowPrimitiveType; use arrow::datatypes::DataType; use datafusion_common::Result; @@ -134,6 +134,46 @@ where self.update_batch(values, group_indices, opt_filter, total_num_groups) } + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + let values = values[0].as_primitive::(); + let mut state = PrimitiveBuilder::::with_capacity(values.len()) + .with_data_type(self.data_type.clone()); + + match opt_filter { + Some(filter) => { + values + .iter() + .zip(filter.iter()) + .for_each(|(val, filter_val)| match (val, filter_val) { + (Some(val), Some(true)) => { + let mut state_val = self.starting_value; + (self.prim_fn)(&mut state_val, val); + state.append_value(state_val); + } + (_, _) => state.append_null(), + }) + } + None => values.iter().for_each(|val| match val { + Some(val) => { + let mut state_val = self.starting_value; + (self.prim_fn)(&mut state_val, val); + state.append_value(state_val); + } + None => state.append_null(), + }), + }; + + Ok(vec![Arc::new(state.finish())]) + } + + fn convert_to_state_supported(&self) -> bool { + true + } + fn size(&self) -> usize { self.values.capacity() * std::mem::size_of::() + self.null_state.size() } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index d1152038eb2a2..5290ba36265dc 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -2398,4 +2398,189 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_skip_aggregation_after_first_batch() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int32, true), + Field::new("val", DataType::Int32, true), + ])); + let df_schema = DFSchema::try_from(schema.clone())?; + + let group_by = + PhysicalGroupBy::new_single(vec![(col("key", &schema)?, "key".to_string())]); + + let aggr_expr: Vec> = + vec![create_aggregate_expr_with_dfschema( + &count_udaf(), + &[col("val", &schema)?], + &[datafusion_expr::col("val")], + &[], + &[], + &df_schema, + "COUNT(val)", + false, + false, + false, + )?]; + + let input_data = vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + ]; + + let input = Arc::new(MemoryExec::try_new( + &[input_data], + Arc::clone(&schema), + None, + )?); + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + aggr_expr, + vec![None], + Arc::clone(&input) as Arc, + schema, + )?); + + let mut session_config = SessionConfig::default(); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + ScalarValue::Int64(Some(2)), + ); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_ratio_threshold", + ScalarValue::Float64(Some(0.1)), + ); + + let ctx = TaskContext::default().with_session_config(session_config); + let output = collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?; + + let expected = [ + "+-----+-------------------+", + "| key | COUNT(val)[count] |", + "+-----+-------------------+", + "| 1 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "| 4 | 1 |", + "+-----+-------------------+", + ]; + assert_batches_eq!(expected, &output); + + Ok(()) + } + + #[tokio::test] + async fn test_skip_aggregation_after_threshold() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("key", DataType::Int32, true), + Field::new("val", DataType::Int32, true), + ])); + let df_schema = DFSchema::try_from(schema.clone())?; + + let group_by = + PhysicalGroupBy::new_single(vec![(col("key", &schema)?, "key".to_string())]); + + let aggr_expr: Vec> = + vec![create_aggregate_expr_with_dfschema( + &count_udaf(), + &[col("val", &schema)?], + &[datafusion_expr::col("val")], + &[], + &[], + &df_schema, + "COUNT(val)", + false, + false, + false, + )?]; + + let input_data = vec![ + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![2, 3, 4])), + Arc::new(Int32Array::from(vec![0, 0, 0])), + ], + ) + .unwrap(), + ]; + + let input = Arc::new(MemoryExec::try_new( + &[input_data], + Arc::clone(&schema), + None, + )?); + let aggregate_exec = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + aggr_expr, + vec![None], + Arc::clone(&input) as Arc, + schema, + )?); + + let mut session_config = SessionConfig::default(); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_rows_threshold", + ScalarValue::Int64(Some(5)), + ); + session_config = session_config.set( + "datafusion.execution.skip_partial_aggregation_probe_ratio_threshold", + ScalarValue::Float64(Some(0.1)), + ); + + let ctx = TaskContext::default().with_session_config(session_config); + let output = collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?; + + let expected = [ + "+-----+-------------------+", + "| key | COUNT(val)[count] |", + "+-----+-------------------+", + "| 1 | 1 |", + "| 2 | 2 |", + "| 3 | 2 |", + "| 4 | 1 |", + "| 2 | 1 |", + "| 3 | 1 |", + "| 4 | 1 |", + "+-----+-------------------+", + ]; + assert_batches_eq!(expected, &output); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 167ca72407503..54cff2aabe86c 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -39,7 +39,7 @@ use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; use arrow::datatypes::SchemaRef; use arrow_schema::SortOptions; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{internal_datafusion_err, DataFusionError, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -62,6 +62,10 @@ pub(crate) enum ExecutionState { /// When producing output, the remaining rows to output are stored /// here and are sliced off as needed in batch_size chunks ProducingOutput(RecordBatch), + /// Indicates that GroupedHashAggregateStream should produce + /// intermediate aggregate state for each input rows without + /// their aggregation + SkippingAggregation, Done, } @@ -90,6 +94,69 @@ struct SpillState { merging_group_by: PhysicalGroupBy, } +struct SkipAggregationProbe { + /// Number of processed input rows + input_rows: usize, + /// Number of total group values for `input_rows` + num_groups: usize, + + /// Aggregation ratio check should be performed only when the + /// number of input rows exceeds this threshold + probe_rows_threshold: usize, + /// Maximum allowed value of `input_rows` / `num_groups` to + /// continue aggregation + probe_ratio_threshold: f64, + + /// Flag indicating that further data aggregation mey be skipped + should_skip: bool, + /// Flag indicating that further updates of `SkipAggregationProbe` + /// state won't make any effect + is_locked: bool, +} + +impl SkipAggregationProbe { + fn new(probe_rows_threshold: usize, probe_ratio_threshold: f64) -> Self { + Self { + input_rows: 0, + num_groups: 0, + probe_rows_threshold, + probe_ratio_threshold, + should_skip: false, + is_locked: false, + } + } + + /// Updates `SkipAggregationProbe` state: + /// - increments the number of input rows + /// - replaces the number of groups with the new value + /// - on `probe_rows_threshold` exceeded calculates + /// aggregation ratio and sets `should_skip` flag + /// - if `should_skip` is set, locks further state updates + fn update_state(&mut self, input_rows: usize, num_groups: usize) { + if self.is_locked { + return; + } + self.input_rows += input_rows; + self.num_groups = num_groups; + if self.input_rows >= self.probe_rows_threshold { + self.should_skip = self.num_groups as f64 / self.input_rows as f64 + >= self.probe_ratio_threshold; + self.is_locked = true; + } + } + + fn should_skip(&self) -> bool { + self.should_skip + } + + /// Provides an ability to externally set `should_skip` flag + /// to `false` and prohibit further state updates + fn forbid_skipping(&mut self) { + self.should_skip = false; + self.is_locked = true; + } +} + /// HashTable based Grouping Aggregator /// /// # Design Goals @@ -275,6 +342,10 @@ pub(crate) struct GroupedHashAggregateStream { /// the `GroupedHashAggregateStream` operation immediately switches to /// output mode and emits all groups. group_values_soft_limit: Option, + + /// Optional probe for skipping data aggregation, if supported by + /// current stream + skip_aggregation_probe: Option, } impl GroupedHashAggregateStream { @@ -365,6 +436,36 @@ impl GroupedHashAggregateStream { merging_group_by: PhysicalGroupBy::new_single(agg_group_by.expr.clone()), }; + // Skip aggregation is supported if: + // - aggregation mode is Partial + // - input is not ordered by GROUP BY expressions, + // since Final mode expects unique group values as its input + // - all accumulators support input batch to intermediate + // aggregate state conversion + // - there is only one GROUP BY expressions set + let skip_aggregation_probe = if agg.mode == AggregateMode::Partial + && matches!(group_ordering, GroupOrdering::None) + && accumulators + .iter() + .all(|acc| acc.convert_to_state_supported()) + && agg_group_by.is_single() + { + Some(SkipAggregationProbe::new( + context + .session_config() + .options() + .execution + .skip_partial_aggregation_probe_rows_threshold, + context + .session_config() + .options() + .execution + .skip_partial_aggregation_probe_ratio_threshold, + )) + } else { + None + }; + Ok(GroupedHashAggregateStream { schema: agg_schema, input, @@ -384,6 +485,7 @@ impl GroupedHashAggregateStream { runtime: context.runtime_env(), spill_state, group_values_soft_limit: agg.limit, + skip_aggregation_probe, }) } } @@ -434,12 +536,16 @@ impl Stream for GroupedHashAggregateStream { // new batch to aggregate Some(Ok(batch)) => { let timer = elapsed_compute.timer(); + let input_rows = batch.num_rows(); + // Make sure we have enough capacity for `batch`, otherwise spill extract_ok!(self.spill_previous_if_necessary(&batch)); // Do the grouping extract_ok!(self.group_aggregate_batch(batch)); + self.update_skip_aggregation_probe(input_rows); + // If we can begin emitting rows, do so, // otherwise keep consuming input assert!(!self.input_done); @@ -463,6 +569,8 @@ impl Stream for GroupedHashAggregateStream { extract_ok!(self.emit_early_if_necessary()); + extract_ok!(self.switch_to_skip_aggregation()); + timer.done(); } Some(Err(e)) => { @@ -476,6 +584,26 @@ impl Stream for GroupedHashAggregateStream { } } + ExecutionState::SkippingAggregation => { + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let _timer = elapsed_compute.timer(); + let states = self.transform_to_states(batch)?; + return Poll::Ready(Some(Ok( + states.record_output(&self.baseline_metrics) + ))); + } + Some(Err(e)) => { + // inner had error, return to caller + return Poll::Ready(Some(Err(e))); + } + None => { + // inner is done, switching to `Done` state + self.exec_state = ExecutionState::Done; + } + } + } + ExecutionState::ProducingOutput(batch) => { // slice off a part of the batch, if needed let output_batch; @@ -484,6 +612,12 @@ impl Stream for GroupedHashAggregateStream { ( if self.input_done { ExecutionState::Done + } else if self + .skip_aggregation_probe + .as_ref() + .is_some_and(|probe| probe.should_skip()) + { + ExecutionState::SkippingAggregation } else { ExecutionState::ReadingInput }, @@ -797,4 +931,59 @@ impl GroupedHashAggregateStream { timer.done(); Ok(()) } + + // Updates skip aggregation probe state. + // In case stream has any spills, the probe is forcefully set to + // forbid aggregation skipping, and locked, since spilling resets + // total number of unique groups. + // + // Note: currently spilling is not supported for Partial aggregation + fn update_skip_aggregation_probe(&mut self, input_rows: usize) { + if let Some(probe) = self.skip_aggregation_probe.as_mut() { + if !self.spill_state.spills.is_empty() { + probe.forbid_skipping(); + } else { + probe.update_state(input_rows, self.group_values.len()); + } + }; + } + + // In case the probe indicates that aggregation may be + // skipped, forces stream to produce currently accumulated output. + fn switch_to_skip_aggregation(&mut self) -> Result<()> { + if let Some(probe) = self.skip_aggregation_probe.as_mut() { + if probe.should_skip() { + let batch = self.emit(EmitTo::All, false)?; + self.exec_state = ExecutionState::ProducingOutput(batch); + } + } + + Ok(()) + } + + // Transforms input batch to intermediate aggregate state, without grouping it + fn transform_to_states(&self, batch: RecordBatch) -> Result { + let group_values = evaluate_group_by(&self.group_by, &batch)?; + let input_values = evaluate_many(&self.aggregate_arguments, &batch)?; + let filter_values = evaluate_optional(&self.filter_expressions, &batch)?; + + let mut output = group_values.first().cloned().ok_or_else(|| { + internal_datafusion_err!("group_values expected to have at least one element") + })?; + + let iter = self + .accumulators + .iter() + .zip(input_values.iter()) + .zip(filter_values.iter()); + + for ((acc, values), opt_filter) in iter { + let opt_filter = opt_filter.as_ref().map(|filter| filter.as_boolean()); + output.extend(acc.convert_to_state(values, opt_filter)?); + } + + let states_batch = RecordBatch::try_new(self.schema(), output)?; + + Ok(states_batch) + } } diff --git a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt new file mode 100644 index 0000000000000..1d152bf477bc7 --- /dev/null +++ b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt @@ -0,0 +1,177 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# The main goal of these tests is to verify correctness of transforming +# input values to state by accumulators, supporting `convert_to_state`. + + +# Setup test data table +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 INT UNSIGNED NOT NULL, + c10 BIGINT UNSIGNED NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +LOCATION '../../testing/data/csv/aggregate_test_100.csv' +OPTIONS ('format.has_header' 'true'); + +# Create table for nullable aggregations test +statement ok +CREATE TABLE aggregate_test_100_null ( + c2 TINYINT NOT NULL, + c3 SMALLINT, + c11 FLOAT +); + +# Prepare settings to always skip aggregation after couple of batches +statement ok +set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 10; + +statement ok +set datafusion.execution.skip_partial_aggregation_probe_ratio_threshold = 0.0; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4; + +# Inserting into nullable table with batch_size specified above +# to prevent creation on single in-memory batch +statement ok +INSERT INTO aggregate_test_100_null +SELECT + c2, + CASE WHEN c1 = 'e' THEN NULL ELSE c3 END as c3, + CASE WHEN c1 = 'a' THEN NULL ELSE c11 END as c11 +FROM aggregate_test_100; + +# Test count varchar / int / float +query IIII +SELECT c2, count(c1), count(c5), count(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 22 22 22 +2 22 22 22 +3 19 19 19 +4 23 23 23 +5 14 14 14 + +# Test min / max for int / float +query IIIRR +SELECT c2, min(c5), max(c5), min(c11), max(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 -1991133944 2143473091 0.064453244 0.89651865 +2 -2138770630 2053379412 0.055064857 0.8315913 +3 -2141999138 2030965207 0.034291923 0.9488028 +4 -1885422396 2064155045 0.028003037 0.7459874 +5 -2117946883 2025611582 0.12559289 0.87989986 + +# Test sum for int / float +query IIR +SELECT c2, sum(c5), sum(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 -438598674 12.153253793716 +2 -8259865364 9.577824473381 +3 1956035476 9.590891361237 +4 16155718643 9.531112968922 +5 6449337880 7.074412226677 + +# Enabling PG dialect for filtered aggregates tests +statement ok +set datafusion.sql_parser.dialect = 'Postgres'; + +# Test count with filter +query III +SELECT + c2, + count(c3) FILTER (WHERE c3 > 0), + count(c3) FILTER (WHERE c11 > 10) +FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 13 0 +2 13 0 +3 13 0 +4 13 0 +5 5 0 + +# Test min / max with filter +query III +SELECT + c2, + min(c3) FILTER (WHERE c3 > 0), + max(c3) FILTER (WHERE c3 < 0) +FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 12 -5 +2 1 -29 +3 13 -2 +4 3 -38 +5 36 -5 + +# Test sum with filter +query II +SELECT + c2, + sum(c3) FILTER (WHERE c1 != 'e' AND c3 > 0) +FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 612 +2 565 +3 466 +4 417 +5 284 + +# Test count with nullable fields +query III +SELECT c2, count(c3), count(c11) FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; +---- +1 19 17 +2 17 19 +3 15 13 +4 16 19 +5 12 11 + +# Test min / max with nullable fields +query IIIRR +SELECT c2, min(c3), max(c3), min(c11), max(c11) FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; +---- +1 -99 125 0.064453244 0.89651865 +2 -117 122 0.09683716 0.8315913 +3 -101 123 0.034291923 0.94669616 +4 -117 123 0.028003037 0.7085086 +5 -101 118 0.12559289 0.87989986 + +# Test sum with nullable fields +query IIR +SELECT c2, sum(c3), sum(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +---- +1 367 12.153253793716 +2 184 9.577824473381 +3 395 9.590891361237 +4 29 9.531112968922 +5 -194 7.074412226677 diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index e85159fd137a7..e581736ccfd6e 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -206,6 +206,8 @@ datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 +datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 +datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 @@ -292,6 +294,8 @@ datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistic datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system +datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input +datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 5814d88c7dd87..683fe3c2fe2fb 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -88,6 +88,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | | datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | +| datafusion.execution.skip_partial_aggregation_probe_ratio_threshold | 0.8 | Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input | +| datafusion.execution.skip_partial_aggregation_probe_rows_threshold | 100000 | Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |