Skip to content

Commit

Permalink
fix cr
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri committed Dec 23, 2024
1 parent 2a9b93a commit c19a373
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 9 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ members = [
"src/common",
"src/metric_engine",
"src/pb_types",
"src/server"
"src/server",
]

[workspace.dependencies]
Expand All @@ -39,7 +39,6 @@ thiserror = "1"
bytes = "1"
byteorder = "1"
datafusion = "43"
datafusion-expr = "43"
parquet = { version = "53" }
object_store = { version = "0.11" }
pb_types = { path = "src/pb_types" }
Expand Down
1 change: 0 additions & 1 deletion src/metric_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ bytes = { workspace = true }
bytesize = { workspace = true }
common = { workspace = true }
datafusion = { workspace = true }
datafusion-expr = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
Expand Down
12 changes: 8 additions & 4 deletions src/metric_engine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ impl ParquetReader {
let mut builder = ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
);
let plan: Arc<dyn ExecutionPlan> = match conjunction(predicates) {
let base_plan: Arc<dyn ExecutionPlan> = match conjunction(predicates) {
Some(expr) => {
let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
.context("create physical expr")?;
Expand All @@ -452,7 +452,7 @@ impl ParquetReader {
// TODO: fetch using multiple threads since read from parquet will incur CPU
// when convert between arrow and parquet.
let sort_exec =
SortPreservingMergeExec::new(sort_exprs, plan).with_round_robin_repartition(true);
SortPreservingMergeExec::new(sort_exprs, base_plan).with_round_robin_repartition(true);

let merge_exec = MergeExec::new(
Arc::new(sort_exec),
Expand All @@ -471,6 +471,7 @@ impl ParquetReader {

#[cfg(test)]
mod tests {
use datafusion::logical_expr::{col, lit};
use object_store::local::LocalFileSystem;
use test_log::test;

Expand Down Expand Up @@ -554,6 +555,8 @@ mod tests {
},
Arc::new(SstPathGenerator::new("mock".to_string())),
);

let expr = col("pk1").eq(lit(0_u8));
let plan = reader
.build_df_plan(
(100..103)
Expand All @@ -570,7 +573,7 @@ mod tests {
})
.collect(),
None,
vec![],
vec![expr],
)
.unwrap();
let display_plan =
Expand All @@ -579,7 +582,8 @@ mod tests {
assert_eq!(
r#"MergeExec: [primary_keys: 1, seq_idx: 2]
SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC]
ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC]]
FilterExec: pk1@0 = 0
ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC]], predicate=pk1@0 = 0, pruning_predicate=CASE WHEN pk1_null_count@2 = pk1_row_count@3 THEN false ELSE pk1_min@0 <= 0 AND 0 <= pk1_max@1 END, required_guarantees=[pk1 in (0)]
"#,
format!("{display_plan}")
);
Expand Down
2 changes: 1 addition & 1 deletion src/metric_engine/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl TimeMergeStorage for CloudObjectStorage {

#[cfg(test)]
mod tests {
use datafusion_expr::{col, lit};
use datafusion::logical_expr::{col, lit};
use object_store::local::LocalFileSystem;
use test_log::test;

Expand Down

0 comments on commit c19a373

Please sign in to comment.