From 5d7c6a7ff651a84e08efdb70ce8df496c9b86dcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B2=8D=E9=87=91=E6=97=A5?= Date: Mon, 23 Dec 2024 17:23:57 +0800 Subject: [PATCH] fix: scan with predicate (#1617) ## Rationale - support scan with predicate ## Detailed Changes ## Test Plan CI --- Cargo.lock | 4 +- src/metric_engine/src/compaction/scheduler.rs | 1 - src/metric_engine/src/read.rs | 40 +++++++++++++------ src/metric_engine/src/storage.rs | 27 +++++++++++++ 4 files changed, 57 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8cace345bf..8500103657 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2637,7 +2637,7 @@ checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", "heck", - "itertools 0.13.0", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -2657,7 +2657,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn", diff --git a/src/metric_engine/src/compaction/scheduler.rs b/src/metric_engine/src/compaction/scheduler.rs index d947096e93..581d397b81 100644 --- a/src/metric_engine/src/compaction/scheduler.rs +++ b/src/metric_engine/src/compaction/scheduler.rs @@ -64,7 +64,6 @@ impl Scheduler { let task_handle = { let store = store.clone(); let manifest = manifest.clone(); - let trigger_tx = trigger_tx.clone(); let executor = Executor::new( runtime.clone(), store, diff --git a/src/metric_engine/src/read.rs b/src/metric_engine/src/read.rs index 1dea5c6ca4..0d27433760 100644 --- a/src/metric_engine/src/read.rs +++ b/src/metric_engine/src/read.rs @@ -42,8 +42,9 @@ use datafusion::{ parquet::arrow::async_reader::AsyncFileReader, physical_expr::{create_physical_expr, LexOrdering}, physical_plan::{ - metrics::ExecutionPlanMetricsSet, sorts::sort_preserving_merge::SortPreservingMergeExec, - DisplayAs, Distribution, ExecutionPlan, PlanProperties, + filter::FilterExec, metrics::ExecutionPlanMetricsSet, + sorts::sort_preserving_merge::SortPreservingMergeExec, DisplayAs, Distribution, + ExecutionPlan, PlanProperties, }, physical_planner::create_physical_sort_exprs, prelude::{ident, Expr}, @@ -430,17 +431,28 @@ impl ParquetReader { let mut builder = ParquetExec::builder(scan_config).with_parquet_file_reader_factory( Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())), ); - if let Some(expr) = conjunction(predicates) { - let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new()) - .context("create physical expr")?; - builder = builder.with_predicate(filters); - } + let base_plan: Arc = match conjunction(predicates) { + Some(expr) => { + let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new()) + .context("create physical expr")?; + + builder = builder.with_predicate(filters.clone()); + let parquet_exec = builder.build(); + + let filter_exec = FilterExec::try_new(filters, Arc::new(parquet_exec)) + .context("create filter exec")?; + Arc::new(filter_exec) + } + None => { + let parquet_exec = builder.build(); + Arc::new(parquet_exec) + } + }; // TODO: fetch using multiple threads since read from parquet will incur CPU // when convert between arrow and parquet. - let parquet_exec = builder.build(); - let sort_exec = SortPreservingMergeExec::new(sort_exprs, Arc::new(parquet_exec)) - .with_round_robin_repartition(true); + let sort_exec = + SortPreservingMergeExec::new(sort_exprs, base_plan).with_round_robin_repartition(true); let merge_exec = MergeExec::new( Arc::new(sort_exec), @@ -459,6 +471,7 @@ impl ParquetReader { #[cfg(test)] mod tests { + use datafusion::logical_expr::{col, lit}; use object_store::local::LocalFileSystem; use test_log::test; @@ -542,6 +555,8 @@ mod tests { }, Arc::new(SstPathGenerator::new("mock".to_string())), ); + + let expr = col("pk1").eq(lit(0_u8)); let plan = reader .build_df_plan( (100..103) @@ -558,7 +573,7 @@ mod tests { }) .collect(), None, - vec![], + vec![expr], ) .unwrap(); let display_plan = @@ -567,7 +582,8 @@ mod tests { assert_eq!( r#"MergeExec: [primary_keys: 1, seq_idx: 2] SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC] - ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC]] + FilterExec: pk1@0 = 0 + ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC]], predicate=pk1@0 = 0, pruning_predicate=CASE WHEN pk1_null_count@2 = pk1_row_count@3 THEN false ELSE pk1_min@0 <= 0 AND 0 <= pk1_max@1 END, required_guarantees=[pk1 in (0)] "#, format!("{display_plan}") ); diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs index b86ad80382..2d5b3a6b50 100644 --- a/src/metric_engine/src/storage.rs +++ b/src/metric_engine/src/storage.rs @@ -404,6 +404,7 @@ impl TimeMergeStorage for CloudObjectStorage { #[cfg(test)] mod tests { + use datafusion::logical_expr::{col, lit}; use object_store::local::LocalFileSystem; use test_log::test; @@ -488,6 +489,32 @@ mod tests { ]; check_stream(result_stream, expected_batch).await; + + // test with predicate + let expr = col("pk1").eq(lit(11_u8)); + let result_stream = storage + .scan(ScanRequest { + range: TimeRange::new(Timestamp(0), Timestamp::MAX), + predicate: vec![expr], + projections: None, + }) + .await + .unwrap(); + let expected_batch = [ + record_batch!( + ("pk1", UInt8, vec![11]), + ("pk2", UInt8, vec![99]), + ("value", Int64, vec![77]) + ) + .unwrap(), + record_batch!( + ("pk1", UInt8, vec![11]), + ("pk2", UInt8, vec![100]), + ("value", Int64, vec![22]) + ) + .unwrap(), + ]; + check_stream(result_stream, expected_batch).await; }); }