From 4ad1ec45f463e514f0a84d36d394429894212f6e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 10 Oct 2024 07:21:41 -0400 Subject: [PATCH 1/4] Add random queries into aggregate fuzz tester --- .../core/tests/fuzz_cases/aggregate_fuzz.rs | 382 ++++++------------ .../aggregation_fuzzer/data_generator.rs | 37 +- .../fuzz_cases/aggregation_fuzzer/fuzzer.rs | 225 ++++++++++- test-utils/src/string_gen.rs | 2 +- 4 files changed, 357 insertions(+), 289 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 34061a64d783..21196339a517 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -45,299 +45,149 @@ use rand::{Rng, SeedableRng}; use tokio::task::JoinSet; use crate::fuzz_cases::aggregation_fuzzer::{ - AggregationFuzzerBuilder, ColumnDescr, DatasetGeneratorConfig, + AggregationFuzzerBuilder, ColumnDescr, DatasetGeneratorConfig, QueryBuilder, }; // ======================================================================== // The new aggregation fuzz tests based on [`AggregationFuzzer`] // ======================================================================== +// +// Notes on tests: +// +// Since the supported types differ for each aggregation function, the tests +// below are structured so they enumerate each different aggregate function. +// +// The test framework handles varying combinations of arguments (data types), +// sortedness, and grouping parameters +// +// TODO: Test on floating point values (where output needs to be compared with some +// acceptable range due to floating point rounding) +// +// TODO: test other aggregate functions +// - AVG (unstable given the wide range of inputs) +// +// TODO: specific test for ordering (ensure all group by columns are ordered) -// TODO: write more test case to cover more `group by`s and `aggregation function`s -// TODO: maybe we can use macro to simply the case creating - -/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` + `no group by` -#[tokio::test(flavor = "multi_thread")] -async fn test_basic_prim_aggr_no_group() { - let builder = AggregationFuzzerBuilder::default(); - - // Define data generator config - let columns = vec![ColumnDescr::new("a", DataType::Int32)]; - - let data_gen_config = DatasetGeneratorConfig { - columns, - rows_num_range: (512, 1024), - sort_keys_set: Vec::new(), - }; - - // Build fuzzer - let fuzzer = builder - .data_gen_config(data_gen_config) - .data_gen_rounds(16) - .add_sql("SELECT sum(a) FROM fuzz_table") - .add_sql("SELECT sum(distinct a) FROM fuzz_table") - .add_sql("SELECT max(a) FROM fuzz_table") - .add_sql("SELECT min(a) FROM fuzz_table") - .add_sql("SELECT count(a) FROM fuzz_table") - .add_sql("SELECT count(distinct a) FROM fuzz_table") - .add_sql("SELECT avg(a) FROM fuzz_table") - .table_name("fuzz_table") - .build(); - - fuzzer.run().await -} - -/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` + `group by single int64` -#[tokio::test(flavor = "multi_thread")] -async fn test_basic_prim_aggr_group_by_single_int64() { - let builder = AggregationFuzzerBuilder::default(); - - // Define data generator config - let columns = vec![ - ColumnDescr::new("a", DataType::Int32), - ColumnDescr::new("b", DataType::Int64), - ColumnDescr::new("c", DataType::Int64), - ]; - let sort_keys_set = vec![ - vec!["b".to_string()], - vec!["c".to_string(), "b".to_string()], - ]; - let data_gen_config = DatasetGeneratorConfig { - columns, - rows_num_range: (512, 1024), - sort_keys_set, - }; - - // Build fuzzer - let fuzzer = builder - .data_gen_config(data_gen_config) - .data_gen_rounds(16) - .add_sql("SELECT b, sum(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, sum(distinct a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, max(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, min(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, count(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, count(distinct a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, avg(a) FROM fuzz_table GROUP BY b") - .table_name("fuzz_table") - .build(); - - fuzzer.run().await; -} - -/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` + `group by single string` #[tokio::test(flavor = "multi_thread")] -async fn test_basic_prim_aggr_group_by_single_string() { - let builder = AggregationFuzzerBuilder::default(); - - // Define data generator config - let columns = vec![ - ColumnDescr::new("a", DataType::Int32), - ColumnDescr::new("b", DataType::Utf8), - ColumnDescr::new("c", DataType::Int64), - ]; - let sort_keys_set = vec![ - vec!["b".to_string()], - vec!["c".to_string(), "b".to_string()], - ]; - let data_gen_config = DatasetGeneratorConfig { - columns, - rows_num_range: (512, 1024), - sort_keys_set, - }; - - // Build fuzzer - let fuzzer = builder - .data_gen_config(data_gen_config) - .data_gen_rounds(16) - .add_sql("SELECT b, sum(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, sum(distinct a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, max(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, min(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, count(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, count(distinct a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, avg(a) FROM fuzz_table GROUP BY b") - .table_name("fuzz_table") - .build(); - - fuzzer.run().await; +async fn test_min() { + let data_gen_config = baseline_config(); + + // Queries like SELECT min(a) FROM fuzz_table GROUP BY b + let query_builder = QueryBuilder::new() + .with_table_name("fuzz_table") + .with_aggregate_function("min") + // min works on all column types + .with_aggregate_arguments(data_gen_config.all_columns()) + .with_group_by_columns(data_gen_config.all_columns()); + + AggregationFuzzerBuilder::from(data_gen_config) + .add_query_builder(query_builder) + .build() + .run() + .await; } -/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` + `group by string + int64` #[tokio::test(flavor = "multi_thread")] -async fn test_basic_prim_aggr_group_by_mixed_string_int64() { - let builder = AggregationFuzzerBuilder::default(); - - // Define data generator config - let columns = vec![ - ColumnDescr::new("a", DataType::Int32), - ColumnDescr::new("b", DataType::Utf8), - ColumnDescr::new("c", DataType::Int64), - ColumnDescr::new("d", DataType::Int32), - ]; - let sort_keys_set = vec![ - vec!["b".to_string(), "c".to_string()], - vec!["d".to_string(), "b".to_string(), "c".to_string()], - ]; - let data_gen_config = DatasetGeneratorConfig { - columns, - rows_num_range: (512, 1024), - sort_keys_set, - }; - - // Build fuzzer - let fuzzer = builder - .data_gen_config(data_gen_config) - .data_gen_rounds(16) - .add_sql("SELECT b, c, sum(a) FROM fuzz_table GROUP BY b, c") - .add_sql("SELECT b, c, sum(distinct a) FROM fuzz_table GROUP BY b,c") - .add_sql("SELECT b, c, max(a) FROM fuzz_table GROUP BY b, c") - .add_sql("SELECT b, c, min(a) FROM fuzz_table GROUP BY b, c") - .add_sql("SELECT b, c, count(a) FROM fuzz_table GROUP BY b, c") - .add_sql("SELECT b, c, count(distinct a) FROM fuzz_table GROUP BY b, c") - .add_sql("SELECT b, c, avg(a) FROM fuzz_table GROUP BY b, c") - .table_name("fuzz_table") - .build(); - - fuzzer.run().await; +async fn test_max() { + let data_gen_config = baseline_config(); + + // Queries like SELECT max(a) FROM fuzz_table GROUP BY b + let query_builder = QueryBuilder::new() + .with_table_name("fuzz_table") + .with_aggregate_function("max") + // max works on all column types + .with_aggregate_arguments(data_gen_config.all_columns()) + .with_group_by_columns(data_gen_config.all_columns()); + + AggregationFuzzerBuilder::from(data_gen_config) + .add_query_builder(query_builder) + .build() + .run() + .await; } -/// Fuzz test for `basic string aggr(count/count distinct/min/max)` + `no group by` #[tokio::test(flavor = "multi_thread")] -async fn test_basic_string_aggr_no_group() { - let builder = AggregationFuzzerBuilder::default(); - - // Define data generator config - let columns = vec![ColumnDescr::new("a", DataType::Utf8)]; - - let data_gen_config = DatasetGeneratorConfig { - columns, - rows_num_range: (512, 1024), - sort_keys_set: Vec::new(), - }; - - // Build fuzzer - let fuzzer = builder - .data_gen_config(data_gen_config) - .data_gen_rounds(8) - .add_sql("SELECT max(a) FROM fuzz_table") - .add_sql("SELECT min(a) FROM fuzz_table") - .add_sql("SELECT count(a) FROM fuzz_table") - .add_sql("SELECT count(distinct a) FROM fuzz_table") - .table_name("fuzz_table") - .build(); - - fuzzer.run().await; +async fn test_sum() { + let data_gen_config = baseline_config(); + + // Queries like SELECT sum(a), sum(distinct) FROM fuzz_table GROUP BY b + let query_builder = QueryBuilder::new() + .with_table_name("fuzz_table") + .with_aggregate_function("sum") + .with_distinct_aggregate_function("sum") + // sum only works on numeric columns + .with_aggregate_arguments(data_gen_config.numeric_columns()) + .with_group_by_columns(data_gen_config.all_columns()); + + AggregationFuzzerBuilder::from(data_gen_config) + .add_query_builder(query_builder) + .build() + .run() + .await; } -/// Fuzz test for `basic string aggr(count/count distinct/min/max)` + `group by single int64` #[tokio::test(flavor = "multi_thread")] -async fn test_basic_string_aggr_group_by_single_int64() { - let builder = AggregationFuzzerBuilder::default(); - - // Define data generator config - let columns = vec![ - ColumnDescr::new("a", DataType::Utf8), - ColumnDescr::new("b", DataType::Int64), - ColumnDescr::new("c", DataType::Int64), - ]; - let sort_keys_set = vec![ - vec!["b".to_string()], - vec!["c".to_string(), "b".to_string()], - ]; - let data_gen_config = DatasetGeneratorConfig { - columns, - rows_num_range: (512, 1024), - sort_keys_set, - }; - - // Build fuzzer - let fuzzer = builder - .data_gen_config(data_gen_config) - .data_gen_rounds(8) - .add_sql("SELECT b, max(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, min(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, count(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, count(distinct a) FROM fuzz_table GROUP BY b") - .table_name("fuzz_table") - .build(); - - fuzzer.run().await; +async fn test_count() { + let data_gen_config = baseline_config(); + + // Queries like SELECT count(a), count(distinct) FROM fuzz_table GROUP BY b + let query_builder = QueryBuilder::new() + .with_table_name("fuzz_table") + .with_aggregate_function("count") + .with_distinct_aggregate_function("count") + // count work for all arguments + .with_aggregate_arguments(data_gen_config.all_columns()) + .with_group_by_columns(data_gen_config.all_columns()); + + AggregationFuzzerBuilder::from(data_gen_config) + .add_query_builder(query_builder) + .build() + .run() + .await; } -/// Fuzz test for `basic string aggr(count/count distinct/min/max)` + `group by single string` -#[tokio::test(flavor = "multi_thread")] -async fn test_basic_string_aggr_group_by_single_string() { - let builder = AggregationFuzzerBuilder::default(); - - // Define data generator config +/// Return a standard set of columns for testing data generation +/// +/// Includes numeric and string types +/// +/// Does not include: +/// 1. Floating point numbers +/// 1. structured types +fn baseline_config() -> DatasetGeneratorConfig { let columns = vec![ - ColumnDescr::new("a", DataType::Utf8), - ColumnDescr::new("b", DataType::Utf8), - ColumnDescr::new("c", DataType::Int64), - ]; - let sort_keys_set = vec![ - vec!["b".to_string()], - vec!["c".to_string(), "b".to_string()], + ColumnDescr::new("i8", DataType::Int8), + ColumnDescr::new("i16", DataType::Int16), + ColumnDescr::new("i32", DataType::Int32), + ColumnDescr::new("i64", DataType::Int64), + ColumnDescr::new("u8", DataType::UInt8), + ColumnDescr::new("u16", DataType::UInt16), + ColumnDescr::new("u32", DataType::UInt32), + ColumnDescr::new("u64", DataType::UInt64), + // TODO: date/time columns + // todo decimal columns + // begin string columns + ColumnDescr::new("utf8", DataType::Utf8), + ColumnDescr::new("largeutf8", DataType::LargeUtf8), + // TODO add support for utf8view in data generator + // ColumnDescr::new("utf8view", DataType::Utf8View), + // todo binary ]; - let data_gen_config = DatasetGeneratorConfig { - columns, - rows_num_range: (512, 1024), - sort_keys_set, - }; - - // Build fuzzer - let fuzzer = builder - .data_gen_config(data_gen_config) - .data_gen_rounds(16) - .add_sql("SELECT b, max(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, min(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, count(a) FROM fuzz_table GROUP BY b") - .add_sql("SELECT b, count(distinct a) FROM fuzz_table GROUP BY b") - .table_name("fuzz_table") - .build(); - - fuzzer.run().await; -} -/// Fuzz test for `basic string aggr(count/count distinct/min/max)` + `group by string + int64` -#[tokio::test(flavor = "multi_thread")] -async fn test_basic_string_aggr_group_by_mixed_string_int64() { - let builder = AggregationFuzzerBuilder::default(); - - // Define data generator config - let columns = vec![ - ColumnDescr::new("a", DataType::Utf8), - ColumnDescr::new("b", DataType::Utf8), - ColumnDescr::new("c", DataType::Int64), - ColumnDescr::new("d", DataType::Int32), - ]; - let sort_keys_set = vec![ - vec!["b".to_string(), "c".to_string()], - vec!["d".to_string(), "b".to_string(), "c".to_string()], - ]; - let data_gen_config = DatasetGeneratorConfig { + DatasetGeneratorConfig { columns, rows_num_range: (512, 1024), - sort_keys_set, - }; - - // Build fuzzer - let fuzzer = builder - .data_gen_config(data_gen_config) - .data_gen_rounds(16) - .add_sql("SELECT b, c, max(a) FROM fuzz_table GROUP BY b, c") - .add_sql("SELECT b, c, min(a) FROM fuzz_table GROUP BY b, c") - .add_sql("SELECT b, c, count(a) FROM fuzz_table GROUP BY b, c") - .add_sql("SELECT b, c, count(distinct a) FROM fuzz_table GROUP BY b, c") - .table_name("fuzz_table") - .build(); - - fuzzer.run().await; + sort_keys_set: vec![ + // low cardinality to try and get many repeated runs + vec![String::from("u8")], + vec![String::from("utf8"), String::from("u8")], + ], + } } // ======================================================================== // The old aggregation fuzz tests // ======================================================================== + /// Tracks if this stream is generating input or output /// Tests that streaming aggregate and batch (non streaming) aggregate produce /// same results @@ -353,7 +203,7 @@ async fn streaming_aggregate_test() { vec!["d", "c", "a"], vec!["d", "c", "b", "a"], ]; - let n = 300; + let n = 10; let distincts = vec![10, 20]; for distinct in distincts { let mut join_set = JoinSet::new(); diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs index 9d45779295e7..44f96d5a1a07 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs @@ -48,16 +48,41 @@ use test_utils::{ /// #[derive(Debug, Clone)] pub struct DatasetGeneratorConfig { - // Descriptions of columns in datasets, it's `required` + /// Descriptions of columns in datasets, it's `required` pub columns: Vec, - // Rows num range of the generated datasets, it's `required` + /// Rows num range of the generated datasets, it's `required` pub rows_num_range: (usize, usize), - // Sort keys used to generate the sorted data set, it's optional + /// Additional optional sort keys + /// + /// The generated datasets always include a non-sorted copy. For each + /// element in `sort_keys_set`, an additional datasets is created that + /// is sorted by these values as well. pub sort_keys_set: Vec>, } +impl DatasetGeneratorConfig { + /// return a list of all column names + pub fn all_columns(&self) -> Vec<&str> { + self.columns.iter().map(|d| d.name.as_str()).collect() + } + + /// return a list of column names that are "numeric" + pub fn numeric_columns(&self) -> Vec<&str> { + self.columns + .iter() + .filter_map(|d| { + if d.column_type.is_numeric() { + Some(d.name.as_str()) + } else { + None + } + }) + .collect() + } +} + /// Dataset generator /// /// It will generate one random [`Dataset`]s when `generate` function is called. @@ -96,7 +121,7 @@ impl DatasetGenerator { pub fn generate(&self) -> Result> { let mut datasets = Vec::with_capacity(self.sort_keys_set.len() + 1); - // Generate the base batch + // Generate the base batch (unsorted) let base_batch = self.batch_generator.generate()?; let batches = stagger_batch(base_batch.clone()); let dataset = Dataset::new(batches, Vec::new()); @@ -362,7 +387,9 @@ impl RecordBatchGenerator { DataType::LargeUtf8 => { generate_string_array!(self, num_rows, batch_gen_rng, array_gen_rng, i64) } - _ => unreachable!(), + _ => { + panic!("Unsupported data generator type: {data_type}") + } } } } diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs index 6daebc894272..a56233e38955 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashSet; use std::sync::Arc; use arrow::util::pretty::pretty_format_batches; @@ -61,6 +62,15 @@ impl AggregationFuzzerBuilder { } } + /// Adds n random SQL queries to the fuzzer along with the table name + pub fn add_query_builder(mut self, query_builder: QueryBuilder) -> Self { + const NUM_QUERIES: usize = 10; + for _ in 0..NUM_QUERIES { + self = self.add_sql(&query_builder.generate_query()); + } + self.table_name(query_builder.table_name()) + } + pub fn add_sql(mut self, sql: &str) -> Self { self.candidate_sqls.push(Arc::from(sql)); self @@ -76,11 +86,6 @@ impl AggregationFuzzerBuilder { self } - pub fn data_gen_rounds(mut self, data_gen_rounds: usize) -> Self { - self.data_gen_rounds = data_gen_rounds; - self - } - pub fn build(self) -> AggregationFuzzer { assert!(!self.candidate_sqls.is_empty()); let candidate_sqls = self.candidate_sqls; @@ -99,12 +104,18 @@ impl AggregationFuzzerBuilder { } } -impl Default for AggregationFuzzerBuilder { +impl std::default::Default for AggregationFuzzerBuilder { fn default() -> Self { Self::new() } } +impl From for AggregationFuzzerBuilder { + fn from(value: DatasetGeneratorConfig) -> Self { + Self::default().data_gen_config(value) + } +} + /// AggregationFuzzer randomly generating multiple [`AggregationFuzzTestTask`], /// and running them to check the correctness of the optimizations /// (e.g. sorted, partial skipping, spilling...) @@ -169,6 +180,10 @@ impl AggregationFuzzer { }) .collect::>(); + for q in &query_groups { + println!(" Testing with query {}", q.sql); + } + let tasks = self.generate_fuzz_tasks(query_groups).await; for task in tasks { join_set.spawn(async move { task.run().await }); @@ -270,20 +285,27 @@ impl AggregationFuzzTestTask { check_equality_of_batches(task_result, expected_result).map_err(|e| { // If we found inconsistent result, we print the test details for reproducing at first let message = format!( - "{}\n\ - ### Inconsistent row:\n\ - - row_idx:{}\n\ - - task_row:{}\n\ - - expected_row:{}\n\ - ### Task total result:\n{}\n\ - ### Expected total result:\n{}\n\ - ", - self.context_error_report(), + "##### AggregationFuzzer error report ##### + ### Sql:\n{}\n\ + ### Schema:\n{}\n\ + ### Session context params:\n{:?}\n\ + ### Inconsistent row:\n\ + - row_idx:{}\n\ + - task_row:{}\n\ + - expected_row:{}\n\ + ### Task total result:\n{}\n\ + ### Expected total result:\n{}\n\ + ### Input:\n{}\n\ + ", + self.sql, + self.dataset_ref.batches[0].schema_ref(), + self.ctx_with_params.params, e.row_idx, e.lhs_row, e.rhs_row, - pretty_format_batches(task_result).unwrap(), - pretty_format_batches(expected_result).unwrap(), + format_batches_with_limit(task_result), + format_batches_with_limit(expected_result), + format_batches_with_limit(&self.dataset_ref.batches), ); DataFusionError::Internal(message) }) @@ -305,3 +327,172 @@ impl AggregationFuzzTestTask { ) } } + +/// Pretty prints the `RecordBatch`es, limited to the first 100 rows +fn format_batches_with_limit(batches: &[RecordBatch]) -> impl std::fmt::Display { + const MAX_ROWS: usize = 100; + let mut row_count = 0; + let to_print = batches + .iter() + .filter_map(|b| { + if row_count >= MAX_ROWS { + None + } else if row_count + b.num_rows() > MAX_ROWS { + // output last rows before limit + let slice_len = MAX_ROWS - row_count; + let b = b.slice(0, slice_len); + row_count += slice_len; + Some(b) + } else { + row_count += b.num_rows(); + Some(b.clone()) + } + }) + .collect::>(); + + pretty_format_batches(&to_print).unwrap() +} + +/// Random aggregate query builder +/// +/// Creates queries like +/// ```sql +/// SELECT AGG(..) FROM table_name GROUP BY +///``` +#[derive(Debug, Default)] +pub struct QueryBuilder { + /// The name of the table to query + table_name: String, + /// Aggregate functions to be used in the query + /// (function_name, is_distinct) + aggregate_functions: Vec<(String, bool)>, + /// Columns to be used in group by + group_by_columns: Vec, + /// Possible columns for arguments in the aggregate functions + /// + /// Assumes each + arguments: Vec, +} +impl QueryBuilder { + pub fn new() -> Self { + std::default::Default::default() + } + + /// return the table name if any + pub fn table_name(&self) -> &str { + &self.table_name + } + + /// Set the table name for the query builder + pub fn with_table_name(mut self, table_name: impl Into) -> Self { + self.table_name = table_name.into(); + self + } + + /// Add a new possible aggregate function to the query builder + pub fn with_aggregate_function( + mut self, + aggregate_function: impl Into, + ) -> Self { + self.aggregate_functions + .push((aggregate_function.into(), false)); + self + } + + /// Add a new possible `DISTINCT` aggregate function to the query + /// + /// This is different than `with_aggregate_function` because only certain + /// aggregates support `DISTINCT` + pub fn with_distinct_aggregate_function( + mut self, + aggregate_function: impl Into, + ) -> Self { + self.aggregate_functions + .push((aggregate_function.into(), true)); + self + } + + /// Add a column to be used in the group bys + pub fn with_group_by_columns<'a>( + mut self, + group_by: impl IntoIterator, + ) -> Self { + let group_by = group_by.into_iter().map(String::from); + self.group_by_columns.extend(group_by); + self + } + + /// Add a column to be used as an argument in the aggregate functions + pub fn with_aggregate_arguments<'a>( + mut self, + arguments: impl IntoIterator, + ) -> Self { + let arguments = arguments.into_iter().map(String::from); + self.arguments.extend(arguments); + self + } + + pub fn generate_query(&self) -> String { + let group_by = self.random_group_by(); + let mut query = String::from("SELECT "); + query.push_str(&self.random_aggregate_functions().join(", ")); + query.push_str(" FROM "); + query.push_str(&self.table_name); + if !group_by.is_empty() { + query.push_str(" GROUP BY "); + query.push_str(&group_by.join(", ")); + } + query + } + + /// Generate a random number of aggregate functions (potentially repeating). + /// + fn random_aggregate_functions(&self) -> Vec { + const MAX_NUM_FUNCTIONS: usize = 5; + let mut rng = thread_rng(); + let num_aggregate_functions = rng.gen_range(1..MAX_NUM_FUNCTIONS); + + let mut alias_gen = 1; + + let mut aggregate_functions = vec![]; + while aggregate_functions.len() < num_aggregate_functions { + let idx = rng.gen_range(0..self.aggregate_functions.len()); + let (function_name, is_distinct) = &self.aggregate_functions[idx]; + let argument = self.random_argument(); + let alias = format!("col{}", alias_gen); + let distinct = if *is_distinct { "DISTINCT " } else { "" }; + alias_gen += 1; + let function = format!("{function_name}({distinct}{argument}) as {alias}"); + aggregate_functions.push(function); + } + aggregate_functions + } + + /// Pick a random aggregate function argument + fn random_argument(&self) -> String { + let mut rng = thread_rng(); + let idx = rng.gen_range(0..self.arguments.len()); + self.arguments[idx].clone() + } + + /// Pick a random number of fields to group by (non repeating) + /// + /// Limited to 3 group by columns to ensure coverage for large groups. With + /// larger numbers of columns, each group has many fewer values. + fn random_group_by(&self) -> Vec { + let mut rng = thread_rng(); + const MAX_GROUPS: usize = 3; + let max_groups = self.group_by_columns.len().max(MAX_GROUPS); + let num_group_by = rng.gen_range(1..max_groups); + + let mut already_used = HashSet::new(); + let mut group_by = vec![]; + while group_by.len() < num_group_by { + let idx = rng.gen_range(0..self.group_by_columns.len()); + if already_used.insert(idx) { + group_by.push(self.group_by_columns[idx].clone()); + } + } + group_by + } +} diff --git a/test-utils/src/string_gen.rs b/test-utils/src/string_gen.rs index 725eb22b85af..b598241db1e9 100644 --- a/test-utils/src/string_gen.rs +++ b/test-utils/src/string_gen.rs @@ -62,7 +62,7 @@ impl StringBatchGenerator { let mut cases = vec![]; let mut rng = thread_rng(); for null_pct in [0.0, 0.01, 0.1, 0.5] { - for _ in 0..100 { + for _ in 0..10 { // max length of generated strings let max_len = rng.gen_range(1..50); let num_strings = rng.gen_range(1..100); From e61792eec9b76aa2af693828307d9801d1172639 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 17 Oct 2024 12:12:40 -0400 Subject: [PATCH 2/4] Address review comments --- .../core/tests/fuzz_cases/aggregate_fuzz.rs | 5 +++-- .../fuzz_cases/aggregation_fuzzer/fuzzer.rs | 16 +++++++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 21196339a517..fe50dd6b2250 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -60,14 +60,15 @@ use crate::fuzz_cases::aggregation_fuzzer::{ // The test framework handles varying combinations of arguments (data types), // sortedness, and grouping parameters // -// TODO: Test on floating point values (where output needs to be compared with some +// TODO: Test floating point values (where output needs to be compared with some // acceptable range due to floating point rounding) // // TODO: test other aggregate functions // - AVG (unstable given the wide range of inputs) // // TODO: specific test for ordering (ensure all group by columns are ordered) - +// Currently the data is sorted by random columns, so there are almost no +// repeated runs. To improve coverage we should also sort by lower cardinality columns #[tokio::test(flavor = "multi_thread")] async fn test_min() { let data_gen_config = baseline_config(); diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs index a56233e38955..e7abc058e612 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs @@ -62,7 +62,7 @@ impl AggregationFuzzerBuilder { } } - /// Adds n random SQL queries to the fuzzer along with the table name + /// Adds random SQL queries to the fuzzer along with the table name pub fn add_query_builder(mut self, query_builder: QueryBuilder) -> Self { const NUM_QUERIES: usize = 10; for _ in 0..NUM_QUERIES { @@ -71,7 +71,7 @@ impl AggregationFuzzerBuilder { self.table_name(query_builder.table_name()) } - pub fn add_sql(mut self, sql: &str) -> Self { + fn add_sql(mut self, sql: &str) -> Self { self.candidate_sqls.push(Arc::from(sql)); self } @@ -445,8 +445,18 @@ impl QueryBuilder { query } - /// Generate a random number of aggregate functions (potentially repeating). + /// Generate a some random aggregate function invocations (potentially repeating). /// + /// Each aggregate function invocation is of the form + /// + /// ```sql + /// function_name( argument) as alias + /// ``` + /// + /// where + /// * `function_names` are randomly selected from [`Self::aggregate_functions`] + /// * ` argument` is randomly selected from [`Self::arguments`] + /// * `alias` is a unique alias `colN` for the column (to avoid duplicate column names) fn random_aggregate_functions(&self) -> Vec { const MAX_NUM_FUNCTIONS: usize = 5; let mut rng = thread_rng(); From d428d532d8b1efa5162ff5576ace1d859ba88a7f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 21 Oct 2024 16:32:24 -0400 Subject: [PATCH 3/4] Update datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs Co-authored-by: Jax Liu --- datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs index e7abc058e612..dd6c71f8efde 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs @@ -485,7 +485,7 @@ impl QueryBuilder { self.arguments[idx].clone() } - /// Pick a random number of fields to group by (non repeating) + /// Pick a random number of fields to group by (non-repeating) /// /// Limited to 3 group by columns to ensure coverage for large groups. With /// larger numbers of columns, each group has many fewer values. From 0b2e9a2337d2a4f4a9cb14f1cc80e8d8cd71b868 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 21 Oct 2024 16:32:30 -0400 Subject: [PATCH 4/4] Update datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs Co-authored-by: Jax Liu --- datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs index dd6c71f8efde..898d1081ff13 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs @@ -285,7 +285,7 @@ impl AggregationFuzzTestTask { check_equality_of_batches(task_result, expected_result).map_err(|e| { // If we found inconsistent result, we print the test details for reproducing at first let message = format!( - "##### AggregationFuzzer error report ##### + "##### AggregationFuzzer error report #####\n\ ### Sql:\n{}\n\ ### Schema:\n{}\n\ ### Session context params:\n{:?}\n\