Skip to content

Commit

Permalink
refactor: add datafusion default optimizer rules (apache#1147)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 authored Aug 11, 2023
1 parent 50b8f77 commit 965327a
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 50 deletions.
94 changes: 94 additions & 0 deletions integration_tests/cases/common/dml/issue-1087.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
CREATE TABLE `issue_1087` (
`name` string TAG NULL,
`value` double NOT NULL,
`t` timestamp NOT NULL,
timestamp KEY (t))
ENGINE=Analytic with (enable_ttl='false');

affected_rows: 0

explain verbose select * from issue_1087;

plan_type,plan,
String("initial_logical_plan"),String("Projection: issue_1087.tsid, issue_1087.t, issue_1087.name, issue_1087.value\n TableScan: issue_1087"),
String("logical_plan after ceresdb_type_conversion"),String("SAME TEXT AS ABOVE"),
String("logical_plan after inline_table_scan"),String("SAME TEXT AS ABOVE"),
String("logical_plan after type_coercion"),String("SAME TEXT AS ABOVE"),
String("logical_plan after count_wildcard_rule"),String("SAME TEXT AS ABOVE"),
String("analyzed_logical_plan"),String("SAME TEXT AS ABOVE"),
String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"),
String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after decorrelate_predicate_subquery"),String("SAME TEXT AS ABOVE"),
String("logical_plan after scalar_subquery_to_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after extract_equijoin_predicate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
String("logical_plan after merge_projection"),String("SAME TEXT AS ABOVE"),
String("logical_plan after rewrite_disjunctive_predicate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_duplicated_expr"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_filter"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_cross_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS ABOVE"),
String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after push_down_filter"),String("SAME TEXT AS ABOVE"),
String("logical_plan after single_distinct_aggregation_to_group_by"),String("SAME TEXT AS ABOVE"),
String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"),
String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after push_down_projection"),String("Projection: issue_1087.tsid, issue_1087.t, issue_1087.name, issue_1087.value\n TableScan: issue_1087 projection=[tsid, t, name, value]"),
String("logical_plan after eliminate_projection"),String("TableScan: issue_1087 projection=[tsid, t, name, value]"),
String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME TEXT AS ABOVE"),
String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"),
String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"),
String("logical_plan after replace_distinct_aggregate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after decorrelate_predicate_subquery"),String("SAME TEXT AS ABOVE"),
String("logical_plan after scalar_subquery_to_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after extract_equijoin_predicate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
String("logical_plan after merge_projection"),String("SAME TEXT AS ABOVE"),
String("logical_plan after rewrite_disjunctive_predicate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_duplicated_expr"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_filter"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_cross_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after propagate_empty_relation"),String("SAME TEXT AS ABOVE"),
String("logical_plan after filter_null_join_keys"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_outer_join"),String("SAME TEXT AS ABOVE"),
String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after push_down_filter"),String("SAME TEXT AS ABOVE"),
String("logical_plan after single_distinct_aggregation_to_group_by"),String("SAME TEXT AS ABOVE"),
String("logical_plan after simplify_expressions"),String("SAME TEXT AS ABOVE"),
String("logical_plan after unwrap_cast_in_comparison"),String("SAME TEXT AS ABOVE"),
String("logical_plan after common_sub_expression_eliminate"),String("SAME TEXT AS ABOVE"),
String("logical_plan after push_down_projection"),String("SAME TEXT AS ABOVE"),
String("logical_plan after eliminate_projection"),String("SAME TEXT AS ABOVE"),
String("logical_plan after push_down_limit"),String("SAME TEXT AS ABOVE"),
String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME TEXT AS ABOVE"),
String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"),
String("logical_plan"),String("TableScan: issue_1087 projection=[tsid, t, name, value]"),
String("initial_physical_plan"),String("ScanTable: table=issue_1087, parallelism=8\n"),
String("physical_plan after aggregate_statistics"),String("SAME TEXT AS ABOVE"),
String("physical_plan after join_selection"),String("SAME TEXT AS ABOVE"),
String("physical_plan after PipelineFixer"),String("SAME TEXT AS ABOVE"),
String("physical_plan after repartition"),String("SAME TEXT AS ABOVE"),
String("physical_plan after EnforceDistribution"),String("SAME TEXT AS ABOVE"),
String("physical_plan after CombinePartialFinalAggregate"),String("SAME TEXT AS ABOVE"),
String("physical_plan after EnforceSorting"),String("SAME TEXT AS ABOVE"),
String("physical_plan after coalesce_batches"),String("SAME TEXT AS ABOVE"),
String("physical_plan after PipelineChecker"),String("SAME TEXT AS ABOVE"),
String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8\n"),


DROP TABLE `issue_1087`;

affected_rows: 0

12 changes: 12 additions & 0 deletions integration_tests/cases/common/dml/issue-1087.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE `issue_1087` (
`name` string TAG NULL,
`value` double NOT NULL,
`t` timestamp NOT NULL,
timestamp KEY (t))
ENGINE=Analytic with (enable_ttl='false');


-- Check which optimizer rules we are using now
explain verbose select * from issue_1087;

DROP TABLE `issue_1087`;
8 changes: 4 additions & 4 deletions integration_tests/cases/common/dml/issue-341.result
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ WHERE
`value` = 3;

plan_type,plan,
String("logical_plan"),String("Projection: issue341_t1.timestamp, issue341_t1.value\n TableScan: issue341_t1 projection=[timestamp, value], full_filters=[issue341_t1.value = Int32(3)]"),
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8\n"),
String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp, value], full_filters=[issue341_t1.value = Int32(3)]"),
String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8\n"),


EXPLAIN SELECT
Expand Down Expand Up @@ -111,8 +111,8 @@ WHERE
`value` = 3;

plan_type,plan,
String("logical_plan"),String("Projection: issue341_t2.timestamp, issue341_t2.value\n Filter: issue341_t2.value = Float64(3)\n TableScan: issue341_t2 projection=[timestamp, value], partial_filters=[issue341_t2.value = Float64(3)]"),
String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as timestamp, value@1 as value]\n CoalesceBatchesExec: target_batch_size=8192\n FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8\n"),
String("logical_plan"),String("Filter: issue341_t2.value = Float64(3)\n TableScan: issue341_t2 projection=[timestamp, value], partial_filters=[issue341_t2.value = Float64(3)]"),
String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8\n"),


EXPLAIN SELECT
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/cases/common/dml/issue-59.result
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ FROM issue59
GROUP BY id+1;

plan_type,plan,
String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Projection: group_alias_0, alias1\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n Projection: issue59.id, issue59.account\n TableScan: issue59 projection=[id, account]"),
String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0, alias1@1], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ProjectionExec: expr=[id@0 as id, account@1 as account]\n ScanTable: table=issue59, parallelism=8\n"),
String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Projection: group_alias_0, alias1\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) + Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n TableScan: issue59 projection=[id, account]"),
String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n ProjectionExec: expr=[group_alias_0@0 as group_alias_0, alias1@1 as alias1]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([group_alias_0@0, alias1@1], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ScanTable: table=issue59, parallelism=8\n"),


DROP TABLE IF EXISTS issue59;
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/cases/common/explain/explain.result
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ affected_rows: 0
EXPLAIN SELECT t FROM `04_explain_t`;

plan_type,plan,
String("logical_plan"),String("Projection: 04_explain_t.t\n TableScan: 04_explain_t projection=[t]"),
String("physical_plan"),String("ProjectionExec: expr=[t@0 as t]\n ScanTable: table=04_explain_t, parallelism=8\n"),
String("logical_plan"),String("TableScan: 04_explain_t projection=[t]"),
String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8\n"),


DROP TABLE `04_explain_t`;
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/cases/common/optimizer/optimizer.result
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ affected_rows: 0
EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM `07_optimizer_t` GROUP BY name;

plan_type,plan,
String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]], aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n Projection: 07_optimizer_t.name, 07_optimizer_t.value\n TableScan: 07_optimizer_t projection=[name, value]"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ProjectionExec: expr=[name@0 as name, value@1 as value]\n ScanTable: table=07_optimizer_t, parallelism=8\n"),
String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]], aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n TableScan: 07_optimizer_t projection=[name, value]"),
String("physical_plan"),String("ProjectionExec: expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n ScanTable: table=07_optimizer_t, parallelism=8\n"),


DROP TABLE `07_optimizer_t`;
Expand Down
62 changes: 22 additions & 40 deletions query_engine/src/datafusion_impl/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,7 @@ use std::{sync::Arc, time::Instant};
use async_trait::async_trait;
use datafusion::{
execution::{context::SessionState, runtime_env::RuntimeEnv},
optimizer::{
analyzer::{
count_wildcard_rule::CountWildcardRule, inline_table_scan::InlineTableScan,
AnalyzerRule,
},
common_subexpr_eliminate::CommonSubexprEliminate,
eliminate_limit::EliminateLimit,
push_down_filter::PushDownFilter,
push_down_limit::PushDownLimit,
push_down_projection::PushDownProjection,
simplify_expressions::SimplifyExpressions,
single_distinct_to_groupby::SingleDistinctToGroupBy,
OptimizerRule,
},
optimizer::analyzer::Analyzer,
physical_optimizer::PhysicalOptimizerRule,
prelude::{SessionConfig, SessionContext},
};
Expand Down Expand Up @@ -81,18 +68,24 @@ impl DatafusionPhysicalPlannerImpl {
.extensions
.insert(ceresdb_options);

let logical_optimize_rules = Self::logical_optimize_rules();
// Using default logcial optimizer, if want to add more custom rule, using
// `add_optimizer_rule` to add.
let state =
SessionState::with_config_rt(df_session_config, Arc::new(RuntimeEnv::default()))
.with_query_planner(Arc::new(QueryPlannerAdapter))
.with_analyzer_rules(Self::analyzer_rules())
.with_optimizer_rules(logical_optimize_rules);
.with_query_planner(Arc::new(QueryPlannerAdapter));

// Register analyzer rules
let state = Self::register_analyzer_rules(state);

// Register iox optimizers, used by influxql.
let state = influxql_query::logical_optimizer::register_iox_logical_optimizers(state);
let physical_optimizer =
Self::apply_adapters_for_physical_optimize_rules(state.physical_optimizers());
SessionContext::with_state(state.with_physical_optimizer_rules(physical_optimizer))

SessionContext::with_state(state)
}

// TODO: this is not used now, bug of RepartitionAdapter is already fixed in
// datafusion itself. Remove this code in future.
#[allow(dead_code)]
fn apply_adapters_for_physical_optimize_rules(
default_rules: &[Arc<dyn PhysicalOptimizerRule + Send + Sync>],
) -> Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> {
Expand All @@ -104,26 +97,15 @@ impl DatafusionPhysicalPlannerImpl {
new_rules
}

fn logical_optimize_rules() -> Vec<Arc<dyn OptimizerRule + Send + Sync>> {
vec![
// These rules are the default settings of the datafusion.
Arc::new(SimplifyExpressions::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
Arc::new(PushDownProjection::new()),
Arc::new(PushDownFilter::new()),
Arc::new(PushDownLimit::new()),
Arc::new(SingleDistinctToGroupBy::new()),
]
}
fn register_analyzer_rules(mut state: SessionState) -> SessionState {
// Our analyzer has high priority, so first add we custom rules, then add the
// default ones.
state = state.with_analyzer_rules(vec![Arc::new(TypeConversion)]);
for rule in Analyzer::new().rules {
state = state.add_analyzer_rule(rule);
}

fn analyzer_rules() -> Vec<Arc<dyn AnalyzerRule + Send + Sync>> {
vec![
Arc::new(InlineTableScan::new()),
Arc::new(TypeConversion),
Arc::new(datafusion::optimizer::analyzer::type_coercion::TypeCoercion::new()),
Arc::new(CountWildcardRule::new()),
]
state
}
}

Expand Down

0 comments on commit 965327a

Please sign in to comment.