diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result b/integration_tests/cases/env/cluster/ddl/partition_table.result index 980a7bc1e1..d576d93bd7 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.result +++ b/integration_tests/cases/env/cluster/ddl/partition_table.result @@ -99,24 +99,24 @@ UInt64(16367588166920223437),Timestamp(1651737067000),String("horaedb9"),Int32(0 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx --- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx +-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx -- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0"; plan_type,plan, -String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=xx\n ScanTable: table=__partition_table_t_1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), +String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n __partition_table_t_1:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx --- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx +-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx -- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4"); plan_type,plan, -String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), +String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb2\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb1\"), Utf8(\"ceresdb3\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), ALTER TABLE partition_table_t ADD COLUMN (b string); diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql b/integration_tests/cases/env/cluster/ddl/partition_table.sql index e1f32de515..f06dee2ea8 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.sql +++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql @@ -57,7 +57,7 @@ SELECT * from partition_table_t where name in ("horaedb5", "horaedb6", "horaedb7 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx --- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx +-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx -- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0"; @@ -65,7 +65,7 @@ EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0"; -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx --- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx +-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx -- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4"); diff --git a/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs b/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs index dd430f520d..55692f258b 100644 --- a/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs +++ b/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs @@ -47,7 +47,7 @@ use datafusion::{ }; use futures::{future::BoxFuture, FutureExt, Stream, StreamExt}; use runtime::Priority; -use table_engine::{remote::model::TableIdentifier, table::ReadRequest}; +use table_engine::{predicate::Predicate, remote::model::TableIdentifier, table::ReadRequest}; use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector, TraceMetricWhenDrop}; use crate::dist_sql_query::{RemotePhysicalPlanExecutor, RemoteTaskContext, TableScanContext}; @@ -62,6 +62,7 @@ pub struct UnresolvedPartitionedScan { pub table_scan_ctx: TableScanContext, pub metrics_collector: MetricsCollector, pub priority: Priority, + pub predicates: Option>, } impl UnresolvedPartitionedScan { @@ -69,6 +70,7 @@ impl UnresolvedPartitionedScan { table_name: &str, sub_tables: Vec, read_request: ReadRequest, + predicates: Option>, ) -> Self { let metrics_collector = MetricsCollector::new(table_name.to_string()); let table_scan_ctx = TableScanContext { @@ -83,6 +85,7 @@ impl UnresolvedPartitionedScan { table_scan_ctx, metrics_collector, priority: read_request.priority, + predicates, } } } diff --git a/src/df_engine_extensions/src/dist_sql_query/resolver.rs b/src/df_engine_extensions/src/dist_sql_query/resolver.rs index c3724b9d95..5fd3430a88 100644 --- a/src/df_engine_extensions/src/dist_sql_query/resolver.rs +++ b/src/df_engine_extensions/src/dist_sql_query/resolver.rs @@ -135,10 +135,22 @@ impl Resolver { let sub_tables = unresolved.sub_tables.clone(); let remote_plans = sub_tables .into_iter() - .map(|table| { + .enumerate() + .map(|(idx, table)| { let plan = Arc::new(UnresolvedSubTableScan { table: table.clone(), - table_scan_ctx: unresolved.table_scan_ctx.clone(), + table_scan_ctx: if let Some(ref predicates) = unresolved.predicates { + // Since all each partition has different predicate, so we shall build + // seperate ctx regarding each partition + // with different predicate + let mut ctx = unresolved.table_scan_ctx.clone(); + // overwrite old predicate (it's the predidcate before partiton + // calculation) with optimized predicate + ctx.predicate = Arc::new(predicates[idx].clone()); + ctx + } else { + unresolved.table_scan_ctx.clone() + }, }); let sub_metrics_collect = metrics_collector.span(table.table.clone()); diff --git a/src/df_engine_extensions/src/dist_sql_query/test_util.rs b/src/df_engine_extensions/src/dist_sql_query/test_util.rs index c42f9e3862..873c7a2214 100644 --- a/src/df_engine_extensions/src/dist_sql_query/test_util.rs +++ b/src/df_engine_extensions/src/dist_sql_query/test_util.rs @@ -313,6 +313,7 @@ impl TestContext { "test", sub_tables, self.request.clone(), + None, )); let filter: Arc = @@ -364,6 +365,7 @@ impl TestContext { "test", self.sub_table_groups[0].clone(), self.request.clone(), + None, )); self.build_aggr_plan_with_input(unresolved_scan) diff --git a/src/partition_table_engine/src/lib.rs b/src/partition_table_engine/src/lib.rs index 209049770a..ee0b460c30 100644 --- a/src/partition_table_engine/src/lib.rs +++ b/src/partition_table_engine/src/lib.rs @@ -27,14 +27,17 @@ use std::sync::Arc; use analytic_engine::TableOptions; use async_trait::async_trait; +use datafusion::logical_expr::expr::{Expr, InList}; use generic_error::BoxError; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use table_engine::{ engine::{ CloseShardRequest, CloseTableRequest, CreateTableParams, CreateTableRequest, - DropTableRequest, OpenShardRequest, OpenShardResult, OpenTableRequest, Result, TableEngine, - Unexpected, UnexpectedNoCause, + DropTableRequest, InvalidPartitionContext, OpenShardRequest, OpenShardResult, + OpenTableRequest, Result, TableEngine, Unexpected, UnexpectedNoCause, }, + partition::rule::df_adapter::PartitionedFilterKeyIndex, + predicate::Predicate, remote::RemoteEngineRef, table::TableRef, PARTITION_TABLE_ENGINE_TYPE, @@ -110,3 +113,43 @@ impl TableEngine for PartitionTableEngine { vec![Ok("".to_string())] } } + +pub fn partitioned_predicates( + predicate: Arc, + partitions: &[usize], + partitioned_key_indices: &mut PartitionedFilterKeyIndex, +) -> Result> { + ensure!( + partitions.len() == partitioned_key_indices.keys().len(), + InvalidPartitionContext { + msg: format!( + "partitions length:{}, partitioned_key_indices length: {}", + partitions.len(), + partitioned_key_indices.keys().len() + ) + } + ); + let mut predicates = vec![(*predicate).clone(); partitions.len()]; + for (idx, predicate) in predicates.iter_mut().enumerate() { + let partition = partitions[idx]; + if let Some(filter_indices) = partitioned_key_indices.get(&partition) { + let exprs = predicate.mut_exprs(); + for (filter_idx, key_indices) in filter_indices { + if let Expr::InList(InList { + list, + negated: false, + .. + }) = &mut exprs[*filter_idx] + { + let mut idx = 0; + list.retain(|_| { + let should_kept = key_indices.contains(&idx); + idx += 1; + should_kept + }); + } + } + } + } + Ok(predicates) +} diff --git a/src/partition_table_engine/src/partition.rs b/src/partition_table_engine/src/partition.rs index 440722164a..2eb0a6a32f 100644 --- a/src/partition_table_engine/src/partition.rs +++ b/src/partition_table_engine/src/partition.rs @@ -33,8 +33,8 @@ use table_engine::{ partition::{ format_sub_partition_table_name, rule::{ - df_adapter::DfPartitionRuleAdapter, PartitionedRow, PartitionedRows, - PartitionedRowsIter, + df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex}, + PartitionedRow, PartitionedRows, PartitionedRowsIter, }, PartitionInfo, }, @@ -289,14 +289,14 @@ impl Table for PartitionTableImpl { .context(CreatePartitionRule)? } }; - + let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); // Evaluate expr and locate partition. let partitions = { let _locate_timer = PARTITION_TABLE_PARTITIONED_READ_DURATION_HISTOGRAM .with_label_values(&["locate"]) .start_timer(); df_partition_rule - .locate_partitions_for_read(request.predicate.exprs()) + .locate_partitions_for_read(request.predicate.exprs(), &mut partitioned_key_indices) .box_err() .context(LocatePartitions)? }; diff --git a/src/partition_table_engine/src/scan_builder.rs b/src/partition_table_engine/src/scan_builder.rs index 27281c0a52..25d080d3ff 100644 --- a/src/partition_table_engine/src/scan_builder.rs +++ b/src/partition_table_engine/src/scan_builder.rs @@ -27,13 +27,16 @@ use datafusion::{ use df_engine_extensions::dist_sql_query::physical_plan::UnresolvedPartitionedScan; use table_engine::{ partition::{ - format_sub_partition_table_name, rule::df_adapter::DfPartitionRuleAdapter, PartitionInfo, + format_sub_partition_table_name, + rule::df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex}, + PartitionInfo, }, provider::TableScanBuilder, remote::model::TableIdentifier, table::ReadRequest, }; +use crate::partitioned_predicates; #[derive(Debug)] pub struct PartitionedTableScanBuilder { table_name: String, @@ -61,13 +64,13 @@ impl PartitionedTableScanBuilder { &self, table_name: &str, partition_info: &PartitionInfo, - partitions: Vec, + partitions: &[usize], ) -> Vec { let definitions = partition_info.get_definitions(); partitions - .into_iter() + .iter() .map(|p| { - let partition_name = &definitions[p].name; + let partition_name = &definitions[*p].name; TableIdentifier { catalog: self.catalog_name.clone(), schema: self.schema_name.clone(), @@ -89,18 +92,226 @@ impl TableScanBuilder for PartitionedTableScanBuilder { DataFusionError::Internal(format!("failed to build partition rule, err:{e}")) })?; + let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); // Evaluate expr and locate partition. let partitions = df_partition_rule - .locate_partitions_for_read(request.predicate.exprs()) + .locate_partitions_for_read(request.predicate.exprs(), &mut partitioned_key_indices) .map_err(|e| { DataFusionError::Internal(format!("failed to locate partition for read, err:{e}")) })?; + let sub_tables = - self.get_sub_table_idents(&self.table_name, &self.partition_info, partitions); + self.get_sub_table_idents(&self.table_name, &self.partition_info, &partitions); + + let predicates = if partitioned_key_indices.len() == partitions.len() { + Some( + partitioned_predicates( + request.predicate.clone(), + &partitions, + &mut partitioned_key_indices, + ) + .map_err(|e| { + DataFusionError::Internal(format!("partition predicates failed, err:{e}")) + })?, + ) + } else { + // since FilterExtractor.extract only cover some specific expr + // cases, partitioned_key_indices.len() could be 0. + // All partition requests will have the same predicate. + None + }; // Build plan. - let plan = UnresolvedPartitionedScan::new(&self.table_name, sub_tables, request); + let plan = + UnresolvedPartitionedScan::new(&self.table_name, sub_tables, request, predicates); Ok(Arc::new(plan)) } } + +#[cfg(test)] +mod tests { + use common_types::{column_schema::Builder as ColBuilder, datum::DatumKind, schema::Builder}; + use datafusion::logical_expr::{binary_expr, in_list, Expr, Operator}; + use table_engine::{ + partition::{ + rule::df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex}, + KeyPartitionInfo, PartitionDefinition, PartitionInfo, + }, + predicate::PredicateBuilder, + }; + + use crate::partitioned_predicates; + + #[test] + fn test_partitioned_predicate() { + // conditions: + // 1) table schema: col_ts, col1, col2, in which col1 and col2 are both keys, + // and with two partitions + // 2) sql: select * from table where col1 = '33' and col2 in ("aa", "bb", + // "cc", "dd") + // partition expectations: + // 1) query fit in two partitions + // 2) yield two predicates, p0: col1 = '33' and col2 in ("aa", "bb", "cc"); + // p1: col1 = '33' and col2 in ("dd") + let definitions = vec![ + PartitionDefinition { + name: "p1".to_string(), + origin_name: None, + }, + PartitionDefinition { + name: "p2".to_string(), + origin_name: None, + }, + ]; + + let partition_info = PartitionInfo::Key(KeyPartitionInfo { + version: 0, + definitions, + partition_key: vec!["col1".to_string(), "col2".to_string()], + linear: false, + }); + + let schema = { + let builder = Builder::new(); + let col_ts = ColBuilder::new("col_ts".to_string(), DatumKind::Timestamp) + .build() + .expect("ts"); + let col1 = ColBuilder::new("col1".to_string(), DatumKind::String) + .build() + .expect("should succeed to build column schema"); + let col2 = ColBuilder::new("col2".to_string(), DatumKind::String) + .build() + .expect("should succeed to build column schema"); + let col3 = ColBuilder::new("col3".to_string(), DatumKind::String) + .build() + .expect("should succeed to build column schema"); + builder + .auto_increment_column_id(true) + .add_key_column(col_ts) + .unwrap() + .add_key_column(col1) + .unwrap() + .add_key_column(col2) + .unwrap() + .add_key_column(col3) + .unwrap() + .primary_key_indexes(vec![1, 2]) + .build() + .unwrap() + }; + + let df_partition_rule = DfPartitionRuleAdapter::new(partition_info, &schema).unwrap(); + + let exprs = vec![ + binary_expr( + Expr::Column("col1".into()), + Operator::Eq, + Expr::Literal("33".into()), + ), + in_list( + Expr::Column("col2".into()), + vec![ + Expr::Literal("aa".into()), + Expr::Literal("bb".into()), + Expr::Literal("cc".into()), + Expr::Literal("dd".into()), + ], + false, + ), + in_list( + Expr::Column("col3".into()), + vec![ + Expr::Literal("1".into()), + Expr::Literal("2".into()), + Expr::Literal("3".into()), + Expr::Literal("4".into()), + ], + false, + ), + ]; + let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); + let partitions = df_partition_rule + .locate_partitions_for_read(&exprs, &mut partitioned_key_indices) + .unwrap(); + assert!(partitions.len() == 2); + assert!(partitioned_key_indices.len() == 2); + + let predicate = PredicateBuilder::default() + .add_pushdown_exprs(exprs.as_slice()) + .build(); + + let predicates = partitioned_predicates( + predicate, + partitions.as_slice(), + &mut partitioned_key_indices, + ); + assert!(predicates.is_ok()); + let predicates = predicates.unwrap(); + assert!(predicates.len() == 2); + + assert!(predicates[0].exprs().len() == 3); + assert!( + predicates[0].exprs()[0] + == binary_expr( + Expr::Column("col1".into()), + Operator::Eq, + Expr::Literal("33".into()) + ) + ); + assert!( + predicates[0].exprs()[1] + == in_list( + Expr::Column("col2".into()), + vec![ + Expr::Literal("aa".into()), + Expr::Literal("bb".into()), + Expr::Literal("cc".into()), + ], + false, + ) + ); + assert!( + predicates[0].exprs()[2] + == in_list( + Expr::Column("col3".into()), + vec![ + Expr::Literal("1".into()), + Expr::Literal("2".into()), + Expr::Literal("3".into()), + Expr::Literal("4".into()), + ], + false, + ) + ); + assert!( + predicates[1].exprs()[0] + == binary_expr( + Expr::Column("col1".into()), + Operator::Eq, + Expr::Literal("33".into()) + ) + ); + assert!( + predicates[1].exprs()[1] + == in_list( + Expr::Column("col2".into()), + vec![Expr::Literal("dd".into()),], + false, + ) + ); + assert!( + predicates[1].exprs()[2] + == in_list( + Expr::Column("col3".into()), + vec![ + Expr::Literal("1".into()), + Expr::Literal("2".into()), + Expr::Literal("3".into()), + Expr::Literal("4".into()), + ], + false, + ) + ); + } +} diff --git a/src/table_engine/src/engine.rs b/src/table_engine/src/engine.rs index a9ea133708..0f81e0c279 100644 --- a/src/table_engine/src/engine.rs +++ b/src/table_engine/src/engine.rs @@ -97,6 +97,9 @@ pub enum Error { msg: Option, source: GenericError, }, + + #[snafu(display("Invalid partiton context, err:{}", msg))] + InvalidPartitionContext { msg: String }, } define_result!(Error); diff --git a/src/table_engine/src/partition/rule/df_adapter/extractor.rs b/src/table_engine/src/partition/rule/df_adapter/extractor.rs index ff6c393d40..78007500c7 100644 --- a/src/table_engine/src/partition/rule/df_adapter/extractor.rs +++ b/src/table_engine/src/partition/rule/df_adapter/extractor.rs @@ -16,14 +16,16 @@ // under the License. //! Partition filter extractor - use std::collections::HashSet; use common_types::datum::Datum; use datafusion::logical_expr::{expr::InList, Expr, Operator}; use df_operator::visitor::find_columns_by_expr; -use crate::partition::rule::filter::{PartitionCondition, PartitionFilter}; +use crate::partition::rule::{ + df_adapter::IndexedPartitionFilter, + filter::{PartitionCondition, PartitionFilter}, +}; /// The datafusion filter exprs extractor /// @@ -36,13 +38,13 @@ use crate::partition::rule::filter::{PartitionCondition, PartitionFilter}; /// For example: [KeyRule] and [KeyExtractor]. /// If they are not related, [PartitionRule] may not take effect. pub trait FilterExtractor: Send + Sync + 'static { - fn extract(&self, filters: &[Expr], columns: &[String]) -> Vec; + fn extract(&self, filters: &[Expr], columns: &[String]) -> IndexedPartitionFilter; } pub struct NoopExtractor; impl FilterExtractor for NoopExtractor { - fn extract(&self, _filters: &[Expr], _columns: &[String]) -> Vec { + fn extract(&self, _filters: &[Expr], _columns: &[String]) -> IndexedPartitionFilter { vec![] } } @@ -50,13 +52,14 @@ impl FilterExtractor for NoopExtractor { pub struct KeyExtractor; impl FilterExtractor for KeyExtractor { - fn extract(&self, filters: &[Expr], columns: &[String]) -> Vec { + fn extract(&self, filters: &[Expr], columns: &[String]) -> IndexedPartitionFilter { + // PartitionFilter indices may not the same as filters indices if filters.is_empty() { return Vec::default(); } let mut target = Vec::with_capacity(filters.len()); - for filter in filters { + for (index, filter) in filters.iter().enumerate() { // If no target columns included in `filter`, ignore this `filter`. let columns_in_filter = find_columns_by_expr(filter) .into_iter() @@ -78,7 +81,6 @@ impl FilterExtractor for KeyExtractor { // Finally, we try to convert `filter` to `PartitionFilter`. // We just support the simple situation: "colum = value" now. - // TODO: support "colum in [value list]". // TODO: we need to compare and check the datatype of column and value. // (Actually, there is type conversion on high-level, but when converted data // is overflow, it may take no effect). @@ -126,7 +128,7 @@ impl FilterExtractor for KeyExtractor { }; if let Some(pf) = partition_filter { - target.push(pf); + target.push((index, pf)); } } @@ -157,7 +159,7 @@ mod tests { column: "col1".to_string(), condition: PartitionCondition::Eq(Datum::Int32(42)), }; - assert_eq!(partition_filter.first().unwrap(), &expected); + assert_eq!(partition_filter.first().unwrap().1, expected); // Other expr will be rejected now. let rejected_expr = col("col1").gt(Expr::Literal(ScalarValue::Int32(Some(42)))); @@ -182,7 +184,7 @@ mod tests { column: "col1".to_string(), condition: PartitionCondition::In(vec![Datum::Int32(42), Datum::Int32(38)]), }; - assert_eq!(partition_filter.first().unwrap(), &expected); + assert_eq!(partition_filter.first().unwrap().1, expected); } #[test] diff --git a/src/table_engine/src/partition/rule/df_adapter/mod.rs b/src/table_engine/src/partition/rule/df_adapter/mod.rs index 8e95f85674..1b5df6ca08 100644 --- a/src/table_engine/src/partition/rule/df_adapter/mod.rs +++ b/src/table_engine/src/partition/rule/df_adapter/mod.rs @@ -16,6 +16,7 @@ // under the License. //! Partition rule datafusion adapter +use std::collections::{BTreeSet, HashMap}; use common_types::{row::RowGroup, schema::Schema}; use datafusion::logical_expr::Expr; @@ -23,14 +24,22 @@ use datafusion::logical_expr::Expr; use self::extractor::{KeyExtractor, NoopExtractor}; use crate::partition::{ rule::{ - df_adapter::extractor::FilterExtractorRef, factory::PartitionRuleFactory, PartitionRulePtr, - PartitionedRows, + df_adapter::extractor::FilterExtractorRef, factory::PartitionRuleFactory, + filter::PartitionFilter, PartitionRulePtr, PartitionedRows, }, BuildPartitionRule, PartitionInfo, Result, }; mod extractor; +pub type PartitionId = usize; // partiton number (id) +pub type FilterIndex = usize; // filter (or expr) index regarding predicate.exprs() +pub type KeyIndex = usize; // key index regarding inlist expr +pub type FilterKeyIndex = HashMap>; +pub type PartitionedFilterKeyIndex = HashMap; +pub type IndexedPartitionFilter = Vec<(usize, PartitionFilter)>; +pub type IndexedPartitionFilterRef<'a> = &'a [(usize, PartitionFilter)]; + /// Partition rule's adapter for datafusion pub struct DfPartitionRuleAdapter { /// Partition rule @@ -56,12 +65,17 @@ impl DfPartitionRuleAdapter { self.rule.location_partitions_for_write(row_group) } - pub fn locate_partitions_for_read(&self, filters: &[Expr]) -> Result> { + pub fn locate_partitions_for_read( + &self, + filters: &[Expr], + partitioned_key_indices: &mut PartitionedFilterKeyIndex, + ) -> Result> { // Extract partition filters from datafusion filters. let partition_filters = self.extractor.extract(filters, self.columns()); // Locate partitions from filters. - self.rule.locate_partitions_for_read(&partition_filters) + self.rule + .locate_partitions_for_read(&partition_filters, partitioned_key_indices) } fn create_extractor(partition_info: &PartitionInfo) -> Result { @@ -116,8 +130,9 @@ mod tests { // Basic flow let key_rule_adapter = DfPartitionRuleAdapter::new(PartitionInfo::Key(ket_partition), &schema).unwrap(); + let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); let partitions = key_rule_adapter - .locate_partitions_for_read(&valid_filters_1) + .locate_partitions_for_read(&valid_filters_1, &mut partitioned_key_indices) .unwrap(); let partition_keys = [ @@ -132,7 +147,7 @@ mod tests { // Conflict filter and empty partitions let partitions = key_rule_adapter - .locate_partitions_for_read(&valid_filters_2) + .locate_partitions_for_read(&valid_filters_2, &mut partitioned_key_indices) .unwrap(); assert!(partitions.is_empty()); @@ -161,12 +176,13 @@ mod tests { let key_rule_adapter = DfPartitionRuleAdapter::new(PartitionInfo::Key(ket_partition), &schema).unwrap(); + let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); // Partitions located from invalid filters. let partitions_1 = key_rule_adapter - .locate_partitions_for_read(&invalid_filters_1) + .locate_partitions_for_read(&invalid_filters_1, &mut partitioned_key_indices) .unwrap(); let partitions_2 = key_rule_adapter - .locate_partitions_for_read(&invalid_filters_2) + .locate_partitions_for_read(&invalid_filters_2, &mut partitioned_key_indices) .unwrap(); // Expected diff --git a/src/table_engine/src/partition/rule/key.rs b/src/table_engine/src/partition/rule/key.rs index 70a3038f6e..7692c80020 100644 --- a/src/table_engine/src/partition/rule/key.rs +++ b/src/table_engine/src/partition/rule/key.rs @@ -30,7 +30,9 @@ use snafu::OptionExt; use crate::partition::{ rule::{ - filter::PartitionCondition, PartitionFilter, PartitionRule, PartitionedRow, PartitionedRows, + df_adapter::{IndexedPartitionFilterRef, PartitionedFilterKeyIndex}, + filter::PartitionCondition, + PartitionFilter, PartitionRule, PartitionedRow, PartitionedRows, }, Internal, LocateWritePartition, Result, }; @@ -139,15 +141,29 @@ impl KeyRule { &self, group: &[usize], filters: &[PartitionFilter], - ) -> Result> { - let mut partitions = BTreeSet::new(); + ) -> Result { + // Retrieve all the key DatumView instances along with their corresponding + // indices related to their positions in the predicate inlist. let expanded_group = expand_partition_keys_group(group, filters)?; - for partition_keys in expanded_group { - let partition = compute_partition(partition_keys.into_iter(), self.partition_num); - partitions.insert(partition); + + let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); + for indexed_partition_keys in expanded_group { + // batch all the keys for hash computation + let partition_keys = indexed_partition_keys.iter().map(|item| item.1.clone()); + let partition = compute_partition(partition_keys, self.partition_num); + + // collect all the key indices related to all predicate expr in the target + // partition + let filter_inlist_indices = partitioned_key_indices.entry(partition).or_default(); + for (index, item) in indexed_partition_keys.iter().enumerate() { + filter_inlist_indices + .entry(group[index]) + .or_default() + .insert(item.0); + } } - Ok(partitions) + Ok(partitioned_key_indices) } #[inline] @@ -189,16 +205,25 @@ impl PartitionRule for KeyRule { Ok(PartitionedRows::Multiple(Box::new(iter))) } - fn locate_partitions_for_read(&self, filters: &[PartitionFilter]) -> Result> { + fn locate_partitions_for_read( + &self, + indexed_filters: IndexedPartitionFilterRef, + partitioned_key_indices: &mut PartitionedFilterKeyIndex, + ) -> Result> { // Filters are empty. - if filters.is_empty() { + if indexed_filters.is_empty() { return Ok(self.all_partitions()); } + let filters = indexed_filters + .iter() + .map(|(_idx, filter)| filter.clone()) + .collect::>(); + // Group the filters by their columns. // If found invalid filter, return all partitions. let candidate_partition_keys_groups = self - .get_candidate_partition_keys_groups(filters) + .get_candidate_partition_keys_groups(&filters) .map_err(|e| { error!("KeyRule locate partition for read, err:{}", e); }) @@ -208,11 +233,31 @@ impl PartitionRule for KeyRule { } let (first_group, rest_groups) = candidate_partition_keys_groups.split_first().unwrap(); - let mut target_partitions = self.compute_partition_for_keys_group(first_group, filters)?; + let mut partitioned_key_indices_all = + self.compute_partition_for_keys_group(first_group, filters.as_slice())?; + let mut target_partitions: BTreeSet = + partitioned_key_indices_all.keys().copied().collect(); for group in rest_groups { // Same as above, if found invalid, return all partitions. - let partitions = match self.compute_partition_for_keys_group(group, filters) { - Ok(partitions) => partitions, + let partitions = match self.compute_partition_for_keys_group(group, filters.as_slice()) + { + Ok(partitioned_filter_key_index_rest) => { + for (partition_rest, filter_key_index_rest) in + &partitioned_filter_key_index_rest + { + // merge all the rest key indices. + let filter_key_index = partitioned_key_indices_all + .entry(*partition_rest) + .or_default(); + for item in filter_key_index_rest { + filter_key_index + .entry(*item.0) + .or_default() + .extend(item.1.iter()); + } + } + partitioned_filter_key_index_rest.keys().copied().collect() + } Err(e) => { error!("KeyRule locate partition for read, err:{}", e); return Ok(self.all_partitions()); @@ -225,6 +270,8 @@ impl PartitionRule for KeyRule { .collect::>(); } + partitioned_key_indices.extend(partitioned_key_indices_all); + Ok(target_partitions.into_iter().collect()) } } @@ -232,7 +279,7 @@ impl PartitionRule for KeyRule { fn expand_partition_keys_group<'a>( group: &[usize], filters: &'a [PartitionFilter], -) -> Result>>> { +) -> Result)>>> { let mut datum_by_columns = Vec::with_capacity(group.len()); for filter_idx in group { let filter = &filters[*filter_idx]; @@ -252,7 +299,7 @@ fn expand_partition_keys_group<'a>( Ok(datum_by_columns .into_iter() - .map(|filters| filters.into_iter()) + .map(|filters| filters.into_iter().enumerate()) .multi_cartesian_product()) } @@ -546,7 +593,7 @@ mod tests { // Expanded group let expanded_group = expand_partition_keys_group(&group, &filters) .unwrap() - .map(|v| v.iter().map(|view| view.to_datum()).collect_vec()) + .map(|v| v.iter().map(|view| view.1.to_datum()).collect_vec()) .collect_vec(); let expected = vec![ vec![Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)], diff --git a/src/table_engine/src/partition/rule/mod.rs b/src/table_engine/src/partition/rule/mod.rs index a1c138f3b8..b17ec85f80 100644 --- a/src/table_engine/src/partition/rule/mod.rs +++ b/src/table_engine/src/partition/rule/mod.rs @@ -22,10 +22,12 @@ mod factory; mod filter; mod key; mod random; - use common_types::row::{Row, RowGroup}; -use self::filter::PartitionFilter; +use self::{ + df_adapter::{IndexedPartitionFilterRef, PartitionedFilterKeyIndex}, + filter::PartitionFilter, +}; use crate::partition::Result; /// The partitioned rows of the written requests. @@ -67,7 +69,11 @@ pub trait PartitionRule: Send + Sync + 'static { /// passed here. /// /// If unexpected filters still found, all partitions will be returned. - fn locate_partitions_for_read(&self, filters: &[PartitionFilter]) -> Result>; + fn locate_partitions_for_read( + &self, + indexed_filters: IndexedPartitionFilterRef, + partitioned_key_indices: &mut PartitionedFilterKeyIndex, + ) -> Result>; } pub type PartitionRulePtr = Box; diff --git a/src/table_engine/src/partition/rule/random.rs b/src/table_engine/src/partition/rule/random.rs index 0be8804f6f..d1e84a03f2 100644 --- a/src/table_engine/src/partition/rule/random.rs +++ b/src/table_engine/src/partition/rule/random.rs @@ -21,7 +21,10 @@ use common_types::row::RowGroup; use itertools::Itertools; use crate::partition::{ - rule::{filter::PartitionFilter, PartitionRule, PartitionedRows}, + rule::{ + df_adapter::{IndexedPartitionFilterRef, PartitionedFilterKeyIndex}, + PartitionRule, PartitionedRows, + }, Result, }; @@ -47,7 +50,11 @@ impl PartitionRule for RandomRule { }) } - fn locate_partitions_for_read(&self, _filters: &[PartitionFilter]) -> Result> { + fn locate_partitions_for_read( + &self, + _indexed_filters: IndexedPartitionFilterRef, + _partitioned_key_indices: &mut PartitionedFilterKeyIndex, + ) -> Result> { Ok((0..self.partition_num).collect_vec()) } } diff --git a/src/table_engine/src/predicate.rs b/src/table_engine/src/predicate.rs index 3a3294fcd9..37c09e9de2 100644 --- a/src/table_engine/src/predicate.rs +++ b/src/table_engine/src/predicate.rs @@ -104,6 +104,10 @@ impl Predicate { &self.exprs } + pub fn mut_exprs(&mut self) -> &mut [Expr] { + &mut self.exprs + } + pub fn time_range(&self) -> TimeRange { self.time_range }