From 965327aa228894ed62d8f049839f8c6f37a36e35 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Fri, 11 Aug 2023 18:38:29 +0800 Subject: [PATCH] refactor: add datafusion default optimizer rules (#1147) --- .../cases/common/dml/issue-1087.result | 94 +++++++++++++++++++ .../cases/common/dml/issue-1087.sql | 12 +++ .../cases/common/dml/issue-341.result | 8 +- .../cases/common/dml/issue-59.result | 4 +- .../cases/common/explain/explain.result | 4 +- .../cases/common/optimizer/optimizer.result | 4 +- .../src/datafusion_impl/physical_planner.rs | 62 +++++------- 7 files changed, 138 insertions(+), 50 deletions(-) create mode 100644 integration_tests/cases/common/dml/issue-1087.result create mode 100644 integration_tests/cases/common/dml/issue-1087.sql diff --git a/integration_tests/cases/common/dml/issue-1087.result b/integration_tests/cases/common/dml/issue-1087.result new file mode 100644 index 0000000000..1ecf124015 --- /dev/null +++ b/integration_tests/cases/common/dml/issue-1087.result @@ -0,0 +1,94 @@ +CREATE TABLE `issue_1087` ( + `name` string TAG NULL, + `value` double NOT NULL, + `t` timestamp NOT NULL, + timestamp KEY (t)) + ENGINE=Analytic with (enable_ttl='false'); + +affected_rows: 0 + +explain verbose select * from issue_1087; + +plan_type,plan, +String("initial_logical_plan"),String("Projection: issue_1087.tsid, issue_1087.t, issue_1087.name, issue_1087.value\n TableScan: issue_1087"), +String("logical_plan after ceresdb_type_conversion"),String("SAME TEXT AS ABOVE"), +String("logical_plan after inline_table_scan"),String("SAME TEXT AS ABOVE"), +String("logical_plan after type_coercion"),String("SAME TEXT AS ABOVE"), +String("logical_plan after count_wildcard_rule"),String("SAME TEXT AS ABOVE"), +String("analyzed_logical_plan"),String("SAME TEXT AS ABOVE"), +String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"), +String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"), +String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_join"),String("SAME TEXT AS ABOVE"), +String("logical_plan after decorrelate_predicate_subquery"),String("SAME TEXT AS ABOVE"), +String("logical_plan after scalar_subquery_to_join"),String("SAME TEXT AS ABOVE"), +String("logical_plan after extract_equijoin_predicate"),String("SAME TEXT AS ABOVE"), +String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"), +String("logical_plan after merge_projection"),String("SAME TEXT AS ABOVE"), +String("logical_plan after rewrite_disjunctive_predicate"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_duplicated_expr"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_filter"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_cross_join"),String("SAME TEXT AS ABOVE"), +String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"), +String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS ABOVE"), +String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"), +String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"), +String("logical_plan after push_down_filter"),String("SAME TEXT AS ABOVE"), +String("logical_plan after single_distinct_aggregation_to_group_by"),String("SAME TEXT AS ABOVE"), +String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"), +String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"), +String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"), +String("logical_plan after push_down_projection"),String("Projection: issue_1087.tsid, issue_1087.t, issue_1087.name, issue_1087.value\n TableScan: issue_1087 projection=[tsid, t, name, value]"), +String("logical_plan after eliminate_projection"),String("TableScan: issue_1087 projection=[tsid, t, name, value]"), +String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"), +String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME TEXT AS ABOVE"), +String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"), +String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"), +String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"), +String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_join"),String("SAME TEXT AS ABOVE"), +String("logical_plan after decorrelate_predicate_subquery"),String("SAME TEXT AS ABOVE"), +String("logical_plan after scalar_subquery_to_join"),String("SAME TEXT AS ABOVE"), +String("logical_plan after extract_equijoin_predicate"),String("SAME TEXT AS ABOVE"), +String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"), +String("logical_plan after merge_projection"),String("SAME TEXT AS ABOVE"), +String("logical_plan after rewrite_disjunctive_predicate"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_duplicated_expr"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_filter"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_cross_join"),String("SAME TEXT AS ABOVE"), +String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"), +String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS ABOVE"), +String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"), +String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"), +String("logical_plan after push_down_filter"),String("SAME TEXT AS ABOVE"), +String("logical_plan after single_distinct_aggregation_to_group_by"),String("SAME TEXT AS ABOVE"), +String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"), +String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"), +String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"), +String("logical_plan after push_down_projection"),String("SAME TEXT AS ABOVE"), +String("logical_plan after eliminate_projection"),String("SAME TEXT AS ABOVE"), +String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"), +String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME TEXT AS ABOVE"), +String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"), +String("logical_plan"),String("TableScan: issue_1087 projection=[tsid, t, name, value]"), +String("initial_physical_plan"),String("ScanTable: table=issue_1087, parallelism=8\n"), +String("physical_plan after aggregate_statistics"),String("SAME TEXT AS ABOVE"), +String("physical_plan after join_selection"),String("SAME TEXT AS ABOVE"), +String("physical_plan after PipelineFixer"),String("SAME TEXT AS ABOVE"), +String("physical_plan after repartition"),String("SAME TEXT AS ABOVE"), +String("physical_plan after EnforceDistribution"),String("SAME TEXT AS ABOVE"), +String("physical_plan after CombinePartialFinalAggregate"),String("SAME TEXT AS ABOVE"), +String("physical_plan after EnforceSorting"),String("SAME TEXT AS ABOVE"), +String("physical_plan after coalesce_batches"),String("SAME TEXT AS ABOVE"), +String("physical_plan after PipelineChecker"),String("SAME TEXT AS ABOVE"), +String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8\n"), + + +DROP TABLE `issue_1087`; + +affected_rows: 0 + diff --git a/integration_tests/cases/common/dml/issue-1087.sql b/integration_tests/cases/common/dml/issue-1087.sql new file mode 100644 index 0000000000..8aef61cf2b --- /dev/null +++ b/integration_tests/cases/common/dml/issue-1087.sql @@ -0,0 +1,12 @@ +CREATE TABLE `issue_1087` ( + `name` string TAG NULL, + `value` double NOT NULL, + `t` timestamp NOT NULL, + timestamp KEY (t)) + ENGINE=Analytic with (enable_ttl='false'); + + +-- Check which optimizer rules we are using now +explain verbose select * from issue_1087; + +DROP TABLE `issue_1087`; diff --git a/integration_tests/cases/common/dml/issue-341.result b/integration_tests/cases/common/dml/issue-341.result index 1882431286..22f029b0dc 100644 --- a/integration_tests/cases/common/dml/issue-341.result +++ b/integration_tests/cases/common/dml/issue-341.result @@ -56,8 +56,8 @@ WHERE `value` = 3; plan_type,plan, -String("logical_plan"),String("Projection: issue341_t1.timestamp, issue341_t1.value\n TableScan: issue341_t1 projection=[timestamp, value], full_filters=[issue341_t1.value = Int32(3)]"), -String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8\n"), +String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp, value], full_filters=[issue341_t1.value = Int32(3)]"), +String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8\n"), EXPLAIN SELECT @@ -111,8 +111,8 @@ WHERE `value` = 3; plan_type,plan, -String("logical_plan"),String("Projection: issue341_t2.timestamp, issue341_t2.value\n Filter: issue341_t2.value = Float64(3)\n TableScan: issue341_t2 projection=[timestamp, value], partial_filters=[issue341_t2.value = Float64(3)]"), -String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n CoalesceBatchesExec: target_batch_size=8192\n FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8\n"), +String("logical_plan"),String("Filter: issue341_t2.value = Float64(3)\n TableScan: issue341_t2 projection=[timestamp, value], partial_filters=[issue341_t2.value = Float64(3)]"), +String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8\n"), EXPLAIN SELECT diff --git a/integration_tests/cases/common/dml/issue-59.result b/integration_tests/cases/common/dml/issue-59.result index 187a8de38b..d2bdb35f99 100644 --- a/integration_tests/cases/common/dml/issue-59.result +++ b/integration_tests/cases/common/dml/issue-59.result @@ -24,8 +24,8 @@ FROM issue59 GROUP BY id+1; plan_type,plan, -String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Projection: group_alias_0, alias1\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n Projection: issue59.id, issue59.account\n TableScan: issue59 projection=[id, account]"), -String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0, alias1@1], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ProjectionExec: expr=[id@0 as id, account@1 as account]\n ScanTable: table=issue59, parallelism=8\n"), +String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Projection: group_alias_0, alias1\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n TableScan: issue59 projection=[id, account]"), +String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0, alias1@1], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ScanTable: table=issue59, parallelism=8\n"), DROP TABLE IF EXISTS issue59; diff --git a/integration_tests/cases/common/explain/explain.result b/integration_tests/cases/common/explain/explain.result index 99a4e164c0..26270d4eb3 100644 --- a/integration_tests/cases/common/explain/explain.result +++ b/integration_tests/cases/common/explain/explain.result @@ -9,8 +9,8 @@ affected_rows: 0 EXPLAIN SELECT t FROM `04_explain_t`; plan_type,plan, -String("logical_plan"),String("Projection: 04_explain_t.t\n TableScan: 04_explain_t projection=[t]"), -String("physical_plan"),String("ProjectionExec: expr=[t@0 as t]\n ScanTable: table=04_explain_t, parallelism=8\n"), +String("logical_plan"),String("TableScan: 04_explain_t projection=[t]"), +String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8\n"), DROP TABLE `04_explain_t`; diff --git a/integration_tests/cases/common/optimizer/optimizer.result b/integration_tests/cases/common/optimizer/optimizer.result index 78274c4152..9fa67473e3 100644 --- a/integration_tests/cases/common/optimizer/optimizer.result +++ b/integration_tests/cases/common/optimizer/optimizer.result @@ -9,8 +9,8 @@ affected_rows: 0 EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM `07_optimizer_t` GROUP BY name; plan_type,plan, -String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]], aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n Projection: 07_optimizer_t.name, 07_optimizer_t.value\n TableScan: 07_optimizer_t projection=[name, value]"), -String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ProjectionExec: expr=[name@0 as name, value@1 as value]\n ScanTable: table=07_optimizer_t, parallelism=8\n"), +String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]], aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n TableScan: 07_optimizer_t projection=[name, value]"), +String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ScanTable: table=07_optimizer_t, parallelism=8\n"), DROP TABLE `07_optimizer_t`; diff --git a/query_engine/src/datafusion_impl/physical_planner.rs b/query_engine/src/datafusion_impl/physical_planner.rs index 886c3d755a..101ed1e56d 100644 --- a/query_engine/src/datafusion_impl/physical_planner.rs +++ b/query_engine/src/datafusion_impl/physical_planner.rs @@ -17,20 +17,7 @@ use std::{sync::Arc, time::Instant}; use async_trait::async_trait; use datafusion::{ execution::{context::SessionState, runtime_env::RuntimeEnv}, - optimizer::{ - analyzer::{ - count_wildcard_rule::CountWildcardRule, inline_table_scan::InlineTableScan, - AnalyzerRule, - }, - common_subexpr_eliminate::CommonSubexprEliminate, - eliminate_limit::EliminateLimit, - push_down_filter::PushDownFilter, - push_down_limit::PushDownLimit, - push_down_projection::PushDownProjection, - simplify_expressions::SimplifyExpressions, - single_distinct_to_groupby::SingleDistinctToGroupBy, - OptimizerRule, - }, + optimizer::analyzer::Analyzer, physical_optimizer::PhysicalOptimizerRule, prelude::{SessionConfig, SessionContext}, }; @@ -81,18 +68,24 @@ impl DatafusionPhysicalPlannerImpl { .extensions .insert(ceresdb_options); - let logical_optimize_rules = Self::logical_optimize_rules(); + // Using default logcial optimizer, if want to add more custom rule, using + // `add_optimizer_rule` to add. let state = SessionState::with_config_rt(df_session_config, Arc::new(RuntimeEnv::default())) - .with_query_planner(Arc::new(QueryPlannerAdapter)) - .with_analyzer_rules(Self::analyzer_rules()) - .with_optimizer_rules(logical_optimize_rules); + .with_query_planner(Arc::new(QueryPlannerAdapter)); + + // Register analyzer rules + let state = Self::register_analyzer_rules(state); + + // Register iox optimizers, used by influxql. let state = influxql_query::logical_optimizer::register_iox_logical_optimizers(state); - let physical_optimizer = - Self::apply_adapters_for_physical_optimize_rules(state.physical_optimizers()); - SessionContext::with_state(state.with_physical_optimizer_rules(physical_optimizer)) + + SessionContext::with_state(state) } + // TODO: this is not used now, bug of RepartitionAdapter is already fixed in + // datafusion itself. Remove this code in future. + #[allow(dead_code)] fn apply_adapters_for_physical_optimize_rules( default_rules: &[Arc], ) -> Vec> { @@ -104,26 +97,15 @@ impl DatafusionPhysicalPlannerImpl { new_rules } - fn logical_optimize_rules() -> Vec> { - vec![ - // These rules are the default settings of the datafusion. - Arc::new(SimplifyExpressions::new()), - Arc::new(CommonSubexprEliminate::new()), - Arc::new(EliminateLimit::new()), - Arc::new(PushDownProjection::new()), - Arc::new(PushDownFilter::new()), - Arc::new(PushDownLimit::new()), - Arc::new(SingleDistinctToGroupBy::new()), - ] - } + fn register_analyzer_rules(mut state: SessionState) -> SessionState { + // Our analyzer has high priority, so first add we custom rules, then add the + // default ones. + state = state.with_analyzer_rules(vec![Arc::new(TypeConversion)]); + for rule in Analyzer::new().rules { + state = state.add_analyzer_rule(rule); + } - fn analyzer_rules() -> Vec> { - vec![ - Arc::new(InlineTableScan::new()), - Arc::new(TypeConversion), - Arc::new(datafusion::optimizer::analyzer::type_coercion::TypeCoercion::new()), - Arc::new(CountWildcardRule::new()), - ] + state } }