Skip to content

Commit

Permalink
refactor: parallelize parquet_exec test case single_file (#4735)
Browse files Browse the repository at this point in the history
* refactor: parallelize test case

Signed-off-by: Ruihang Xia <[email protected]>

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* format code

Signed-off-by: Ruihang Xia <[email protected]>

* change row limit

Signed-off-by: Ruihang Xia <[email protected]>

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
waynexia and alamb authored Dec 27, 2022
1 parent 734c211 commit 38a24c0
Showing 1 changed file with 66 additions and 63 deletions.
129 changes: 66 additions & 63 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
//! select * from data limit 10;
//! ```
use std::sync::Arc;
use std::time::Instant;

use arrow::compute::concat_batches;
Expand All @@ -42,6 +43,7 @@ use test_utils::AccessLogGenerator;

/// how many rows of generated data to write to our parquet file (arbitrary)
const NUM_ROWS: usize = 53819;
const ROW_LIMIT: usize = 4096;

#[cfg(test)]
#[ctor::ctor]
Expand All @@ -51,45 +53,49 @@ fn init() {
}

#[cfg(not(target_family = "windows"))]
#[tokio::test]
// Use multi-threaded executor as this test consumes CPU
#[tokio::test(flavor = "multi_thread")]
async fn single_file() {
// Only create the parquet file once as it is fairly large

let tempdir = TempDir::new().unwrap();

let generator = AccessLogGenerator::new().with_row_limit(NUM_ROWS);
let generator = AccessLogGenerator::new()
.with_row_limit(NUM_ROWS)
.with_max_batch_size(ROW_LIMIT);

// default properties
let props = WriterProperties::builder().build();
let file = tempdir.path().join("data.parquet");

let start = Instant::now();
println!("Writing test data to {:?}", file);
let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap();
let test_parquet_file =
Arc::new(TestParquetFile::try_new(file, props, generator).unwrap());
println!(
"Completed generating test data in {:?}",
Instant::now() - start
);

TestCase::new(&test_parquet_file)
let mut set = tokio::task::JoinSet::new();

let case = TestCase::new(test_parquet_file.clone())
.with_name("selective")
// request_method = 'GET'
.with_filter(col("request_method").eq(lit("GET")))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(8886)
.run()
.await;
.with_expected_rows(8875);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("non_selective")
// request_method != 'GET'
.with_filter(col("request_method").not_eq(lit("GET")))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(44933)
.run()
.await;
.with_expected_rows(44944);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("basic_conjunction")
// request_method = 'POST' AND
// response_status = 503
Expand All @@ -101,49 +107,44 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(1729)
.run()
.await;
.with_expected_rows(1731);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("everything")
// filter filters everything (no row has this status)
// response_status = 429
.with_filter(col("response_status").eq(lit(429_u16)))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(0)
.run()
.await;
.with_expected_rows(0);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("nothing")
// No rows are filtered out -- all are returned
// response_status > 0
.with_filter(col("response_status").gt(lit(0_u16)))
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(NUM_ROWS)
.run()
.await;
.with_expected_rows(NUM_ROWS);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_selective")
// container = 'backend_container_0'
.with_filter(col("container").eq(lit("backend_container_0")))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(37856)
.run()
.await;
.with_expected_rows(15911);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("not eq")
// container != 'backend_container_0'
.with_filter(col("container").not_eq(lit("backend_container_0")))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(15963)
.run()
.await;
.with_expected_rows(37908);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_conjunction")
// container == 'backend_container_0' AND
// pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg'
Expand All @@ -155,11 +156,10 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(3052)
.run()
.await;
.with_expected_rows(3052);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_very_selective")
// request_bytes > 2B AND
// container == 'backend_container_0' AND
Expand All @@ -173,11 +173,10 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(88)
.run()
.await;
.with_expected_rows(88);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_very_selective2")
// picks only 2 rows
// client_addr = '204.47.29.82' AND
Expand All @@ -192,11 +191,10 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(88)
.run()
.await;
.with_expected_rows(88);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_disjunction")
// container = 'backend_container_0' OR
// pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg'
Expand All @@ -207,12 +205,11 @@ async fn single_file() {
])
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(39982)
.run()
.await;
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(16955);
set.spawn(async move { case.run().await });

TestCase::new(&test_parquet_file)
let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_disjunction3")
// request_method != 'GET' OR
// response_status = 400 OR
Expand All @@ -225,10 +222,14 @@ async fn single_file() {
])
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(NUM_ROWS)
.run()
.await;
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(48919);
set.spawn(async move { case.run().await });

// Join all the cases.
while let Some(result) = set.join_next().await {
result.unwrap()
}
}

#[cfg(not(target_family = "windows"))]
Expand All @@ -247,7 +248,8 @@ async fn single_file_small_data_pages() {

let start = Instant::now();
println!("Writing test data to {:?}", file);
let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap();
let test_parquet_file =
Arc::new(TestParquetFile::try_new(file, props, generator).unwrap());
println!(
"Completed generating test data in {:?}",
Instant::now() - start
Expand All @@ -267,7 +269,7 @@ async fn single_file_small_data_pages() {
// page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216
// page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739

TestCase::new(&test_parquet_file)
TestCase::new(test_parquet_file.clone())
.with_name("selective")
// predicate is chosen carefully to prune pages 0, 1, 2, 3, 4
// pod = 'iadnalqpdzthpifrvewossmpqibgtsuin'
Expand All @@ -286,7 +288,7 @@ async fn single_file_small_data_pages() {
// page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004269056, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
// page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.007261184, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
// page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005330944, num_nulls not defined] CRC:[none] SZ:12601 VC:7739
TestCase::new(&test_parquet_file)
TestCase::new(test_parquet_file.clone())
.with_name("selective")
// predicate is chosen carefully to prune pages 1, 2, 4, and 5
// time > 1970-01-01T00:00:00.004300000
Expand Down Expand Up @@ -314,7 +316,7 @@ async fn single_file_small_data_pages() {
// offset compressed size first row index
// page-0 5581636 147517 0
// page-1 5729153 147517 9216
TestCase::new(&test_parquet_file)
TestCase::new(test_parquet_file.clone())
.with_name("selective_on_decimal")
// predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5
// decimal_price < 9200
Expand Down Expand Up @@ -345,8 +347,8 @@ enum PageIndexFilteringExpected {
}

/// parameters for running a test
struct TestCase<'a> {
test_parquet_file: &'a TestParquetFile,
struct TestCase {
test_parquet_file: Arc<TestParquetFile>,
/// Human readable name to help debug failures
name: String,
/// The filter to apply
Expand All @@ -361,8 +363,8 @@ struct TestCase<'a> {
expected_rows: usize,
}

impl<'a> TestCase<'a> {
fn new(test_parquet_file: &'a TestParquetFile) -> Self {
impl TestCase {
fn new(test_parquet_file: Arc<TestParquetFile>) -> Self {
Self {
test_parquet_file,
name: "<NOT SPECIFIED>".into(),
Expand Down Expand Up @@ -533,12 +535,13 @@ impl<'a> TestCase<'a> {

match pushdown_expected {
PushdownExpected::None => {
assert_eq!(pushdown_rows_filtered, 0);
assert_eq!(pushdown_rows_filtered, 0, "{}", self.name);
}
PushdownExpected::Some => {
assert!(
pushdown_rows_filtered > 0,
"Expected to filter rows via pushdown, but none were"
"{}: Expected to filter rows via pushdown, but none were",
self.name
);
}
};
Expand Down

0 comments on commit 38a24c0

Please sign in to comment.