Skip to content

Commit

Permalink
fix: scan with predicate (#1617)
Browse files Browse the repository at this point in the history
## Rationale
- support scan with predicate

## Detailed Changes


## Test Plan
CI
  • Loading branch information
baojinri authored Dec 23, 2024
1 parent f8bb726 commit 5d7c6a7
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 15 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/metric_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ impl Scheduler {
let task_handle = {
let store = store.clone();
let manifest = manifest.clone();
let trigger_tx = trigger_tx.clone();
let executor = Executor::new(
runtime.clone(),
store,
Expand Down
40 changes: 28 additions & 12 deletions src/metric_engine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ use datafusion::{
parquet::arrow::async_reader::AsyncFileReader,
physical_expr::{create_physical_expr, LexOrdering},
physical_plan::{
metrics::ExecutionPlanMetricsSet, sorts::sort_preserving_merge::SortPreservingMergeExec,
DisplayAs, Distribution, ExecutionPlan, PlanProperties,
filter::FilterExec, metrics::ExecutionPlanMetricsSet,
sorts::sort_preserving_merge::SortPreservingMergeExec, DisplayAs, Distribution,
ExecutionPlan, PlanProperties,
},
physical_planner::create_physical_sort_exprs,
prelude::{ident, Expr},
Expand Down Expand Up @@ -430,17 +431,28 @@ impl ParquetReader {
let mut builder = ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
);
if let Some(expr) = conjunction(predicates) {
let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
.context("create physical expr")?;
builder = builder.with_predicate(filters);
}
let base_plan: Arc<dyn ExecutionPlan> = match conjunction(predicates) {
Some(expr) => {
let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
.context("create physical expr")?;

builder = builder.with_predicate(filters.clone());
let parquet_exec = builder.build();

let filter_exec = FilterExec::try_new(filters, Arc::new(parquet_exec))
.context("create filter exec")?;
Arc::new(filter_exec)
}
None => {
let parquet_exec = builder.build();
Arc::new(parquet_exec)
}
};

// TODO: fetch using multiple threads since read from parquet will incur CPU
// when convert between arrow and parquet.
let parquet_exec = builder.build();
let sort_exec = SortPreservingMergeExec::new(sort_exprs, Arc::new(parquet_exec))
.with_round_robin_repartition(true);
let sort_exec =
SortPreservingMergeExec::new(sort_exprs, base_plan).with_round_robin_repartition(true);

let merge_exec = MergeExec::new(
Arc::new(sort_exec),
Expand All @@ -459,6 +471,7 @@ impl ParquetReader {

#[cfg(test)]
mod tests {
use datafusion::logical_expr::{col, lit};
use object_store::local::LocalFileSystem;
use test_log::test;

Expand Down Expand Up @@ -542,6 +555,8 @@ mod tests {
},
Arc::new(SstPathGenerator::new("mock".to_string())),
);

let expr = col("pk1").eq(lit(0_u8));
let plan = reader
.build_df_plan(
(100..103)
Expand All @@ -558,7 +573,7 @@ mod tests {
})
.collect(),
None,
vec![],
vec![expr],
)
.unwrap();
let display_plan =
Expand All @@ -567,7 +582,8 @@ mod tests {
assert_eq!(
r#"MergeExec: [primary_keys: 1, seq_idx: 2]
SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC]
ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC]]
FilterExec: pk1@0 = 0
ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC]], predicate=pk1@0 = 0, pruning_predicate=CASE WHEN pk1_null_count@2 = pk1_row_count@3 THEN false ELSE pk1_min@0 <= 0 AND 0 <= pk1_max@1 END, required_guarantees=[pk1 in (0)]
"#,
format!("{display_plan}")
);
Expand Down
27 changes: 27 additions & 0 deletions src/metric_engine/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ impl TimeMergeStorage for CloudObjectStorage {

#[cfg(test)]
mod tests {
use datafusion::logical_expr::{col, lit};
use object_store::local::LocalFileSystem;
use test_log::test;

Expand Down Expand Up @@ -488,6 +489,32 @@ mod tests {
];

check_stream(result_stream, expected_batch).await;

// test with predicate
let expr = col("pk1").eq(lit(11_u8));
let result_stream = storage
.scan(ScanRequest {
range: TimeRange::new(Timestamp(0), Timestamp::MAX),
predicate: vec![expr],
projections: None,
})
.await
.unwrap();
let expected_batch = [
record_batch!(
("pk1", UInt8, vec![11]),
("pk2", UInt8, vec![99]),
("value", Int64, vec![77])
)
.unwrap(),
record_batch!(
("pk1", UInt8, vec![11]),
("pk2", UInt8, vec![100]),
("value", Int64, vec![22])
)
.unwrap(),
];
check_stream(result_stream, expected_batch).await;
});
}

Expand Down

0 comments on commit 5d7c6a7

Please sign in to comment.