From 38a24c0fe0b8d998616ed7ae0021fd5ac5c20464 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 27 Dec 2022 18:47:58 +0800 Subject: [PATCH] refactor: parallelize `parquet_exec` test case `single_file` (#4735) * refactor: parallelize test case Signed-off-by: Ruihang Xia * Apply suggestions from code review Co-authored-by: Andrew Lamb * format code Signed-off-by: Ruihang Xia * change row limit Signed-off-by: Ruihang Xia Signed-off-by: Ruihang Xia Co-authored-by: Andrew Lamb --- .../core/tests/parquet/filter_pushdown.rs | 129 +++++++++--------- 1 file changed, 66 insertions(+), 63 deletions(-) diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index ac3744278c14..59350113c943 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -26,6 +26,7 @@ //! select * from data limit 10; //! ``` +use std::sync::Arc; use std::time::Instant; use arrow::compute::concat_batches; @@ -42,6 +43,7 @@ use test_utils::AccessLogGenerator; /// how many rows of generated data to write to our parquet file (arbitrary) const NUM_ROWS: usize = 53819; +const ROW_LIMIT: usize = 4096; #[cfg(test)] #[ctor::ctor] @@ -51,13 +53,16 @@ fn init() { } #[cfg(not(target_family = "windows"))] -#[tokio::test] +// Use multi-threaded executor as this test consumes CPU +#[tokio::test(flavor = "multi_thread")] async fn single_file() { // Only create the parquet file once as it is fairly large let tempdir = TempDir::new().unwrap(); - let generator = AccessLogGenerator::new().with_row_limit(NUM_ROWS); + let generator = AccessLogGenerator::new() + .with_row_limit(NUM_ROWS) + .with_max_batch_size(ROW_LIMIT); // default properties let props = WriterProperties::builder().build(); @@ -65,31 +70,32 @@ async fn single_file() { let start = Instant::now(); println!("Writing test data to {:?}", file); - let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap(); + let test_parquet_file = + Arc::new(TestParquetFile::try_new(file, props, generator).unwrap()); println!( "Completed generating test data in {:?}", Instant::now() - start ); - TestCase::new(&test_parquet_file) + let mut set = tokio::task::JoinSet::new(); + + let case = TestCase::new(test_parquet_file.clone()) .with_name("selective") // request_method = 'GET' .with_filter(col("request_method").eq(lit("GET"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(8886) - .run() - .await; + .with_expected_rows(8875); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("non_selective") // request_method != 'GET' .with_filter(col("request_method").not_eq(lit("GET"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(44933) - .run() - .await; + .with_expected_rows(44944); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("basic_conjunction") // request_method = 'POST' AND // response_status = 503 @@ -101,49 +107,44 @@ async fn single_file() { .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(1729) - .run() - .await; + .with_expected_rows(1731); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("everything") // filter filters everything (no row has this status) // response_status = 429 .with_filter(col("response_status").eq(lit(429_u16))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(0) - .run() - .await; + .with_expected_rows(0); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("nothing") // No rows are filtered out -- all are returned // response_status > 0 .with_filter(col("response_status").gt(lit(0_u16))) .with_pushdown_expected(PushdownExpected::None) - .with_expected_rows(NUM_ROWS) - .run() - .await; + .with_expected_rows(NUM_ROWS); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("dict_selective") // container = 'backend_container_0' .with_filter(col("container").eq(lit("backend_container_0"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(37856) - .run() - .await; + .with_expected_rows(15911); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("not eq") // container != 'backend_container_0' .with_filter(col("container").not_eq(lit("backend_container_0"))) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(15963) - .run() - .await; + .with_expected_rows(37908); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("dict_conjunction") // container == 'backend_container_0' AND // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg' @@ -155,11 +156,10 @@ async fn single_file() { .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(3052) - .run() - .await; + .with_expected_rows(3052); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("dict_very_selective") // request_bytes > 2B AND // container == 'backend_container_0' AND @@ -173,11 +173,10 @@ async fn single_file() { .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(88) - .run() - .await; + .with_expected_rows(88); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("dict_very_selective2") // picks only 2 rows // client_addr = '204.47.29.82' AND @@ -192,11 +191,10 @@ async fn single_file() { .unwrap(), ) .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(88) - .run() - .await; + .with_expected_rows(88); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("dict_disjunction") // container = 'backend_container_0' OR // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg' @@ -207,12 +205,11 @@ async fn single_file() { ]) .unwrap(), ) - .with_pushdown_expected(PushdownExpected::Some) - .with_expected_rows(39982) - .run() - .await; + .with_pushdown_expected(PushdownExpected::None) + .with_expected_rows(16955); + set.spawn(async move { case.run().await }); - TestCase::new(&test_parquet_file) + let case = TestCase::new(test_parquet_file.clone()) .with_name("dict_disjunction3") // request_method != 'GET' OR // response_status = 400 OR @@ -225,10 +222,14 @@ async fn single_file() { ]) .unwrap(), ) - .with_pushdown_expected(PushdownExpected::None) - .with_expected_rows(NUM_ROWS) - .run() - .await; + .with_pushdown_expected(PushdownExpected::Some) + .with_expected_rows(48919); + set.spawn(async move { case.run().await }); + + // Join all the cases. + while let Some(result) = set.join_next().await { + result.unwrap() + } } #[cfg(not(target_family = "windows"))] @@ -247,7 +248,8 @@ async fn single_file_small_data_pages() { let start = Instant::now(); println!("Writing test data to {:?}", file); - let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap(); + let test_parquet_file = + Arc::new(TestParquetFile::try_new(file, props, generator).unwrap()); println!( "Completed generating test data in {:?}", Instant::now() - start @@ -267,7 +269,7 @@ async fn single_file_small_data_pages() { // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216 // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739 - TestCase::new(&test_parquet_file) + TestCase::new(test_parquet_file.clone()) .with_name("selective") // predicate is chosen carefully to prune pages 0, 1, 2, 3, 4 // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin' @@ -286,7 +288,7 @@ async fn single_file_small_data_pages() { // page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004269056, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 // page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.007261184, num_nulls not defined] CRC:[none] SZ:14996 VC:9216 // page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005330944, num_nulls not defined] CRC:[none] SZ:12601 VC:7739 - TestCase::new(&test_parquet_file) + TestCase::new(test_parquet_file.clone()) .with_name("selective") // predicate is chosen carefully to prune pages 1, 2, 4, and 5 // time > 1970-01-01T00:00:00.004300000 @@ -314,7 +316,7 @@ async fn single_file_small_data_pages() { // offset compressed size first row index // page-0 5581636 147517 0 // page-1 5729153 147517 9216 - TestCase::new(&test_parquet_file) + TestCase::new(test_parquet_file.clone()) .with_name("selective_on_decimal") // predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5 // decimal_price < 9200 @@ -345,8 +347,8 @@ enum PageIndexFilteringExpected { } /// parameters for running a test -struct TestCase<'a> { - test_parquet_file: &'a TestParquetFile, +struct TestCase { + test_parquet_file: Arc, /// Human readable name to help debug failures name: String, /// The filter to apply @@ -361,8 +363,8 @@ struct TestCase<'a> { expected_rows: usize, } -impl<'a> TestCase<'a> { - fn new(test_parquet_file: &'a TestParquetFile) -> Self { +impl TestCase { + fn new(test_parquet_file: Arc) -> Self { Self { test_parquet_file, name: "".into(), @@ -533,12 +535,13 @@ impl<'a> TestCase<'a> { match pushdown_expected { PushdownExpected::None => { - assert_eq!(pushdown_rows_filtered, 0); + assert_eq!(pushdown_rows_filtered, 0, "{}", self.name); } PushdownExpected::Some => { assert!( pushdown_rows_filtered > 0, - "Expected to filter rows via pushdown, but none were" + "{}: Expected to filter rows via pushdown, but none were", + self.name ); } };