From 22ed9aa697c500f1b24a71017017388e40f57d30 Mon Sep 17 00:00:00 2001 From: wjHuang Date: Thu, 17 Mar 2022 10:43:53 +0800 Subject: [PATCH] cherry pick #33168 to release-5.3 Signed-off-by: ti-srebot --- executor/aggregate_test.go | 8 + planner/core/logical_plan_builder.go | 2 +- planner/core/logical_plans.go | 3 - planner/core/rule_predicate_push_down.go | 201 ++++++++++++++++++ .../ordered_result_mode_suite_out.json | 12 +- .../testdata/plan_suite_unexported_out.json | 16 +- statistics/testdata/stats_suite_out.json | 10 +- 7 files changed, 237 insertions(+), 15 deletions(-) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index b3ed40178f65b..f7f464b9a6a54 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -920,6 +920,7 @@ func (s *testSuiteAgg) TestHaving(c *C) { tk.MustQuery("select 1 from t group by c1 having sum(abs(c2 + c3)) = c1").Check(testkit.Rows("1")) } +<<<<<<< HEAD func (s *testSuiteAgg) TestIssue26496(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) @@ -931,6 +932,13 @@ func (s *testSuiteAgg) TestIssue26496(c *C) { func (s *testSuiteAgg) TestAggEliminator(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) +======= +func TestAggEliminator(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") +>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168) tk.MustExec("create table t(a int primary key, b int)") tk.MustQuery("select min(a), min(a) from t").Check(testkit.Rows(" ")) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 693089dde6d17..7487c3ec4c1ed 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -980,7 +980,7 @@ func (b *PlanBuilder) buildSelection(ctx context.Context, p LogicalPlan, where a conditions := splitWhere(where) expressions := make([]expression.Expression, 0, len(conditions)) - selection := LogicalSelection{buildByHaving: aggMapper != nil}.Init(b.ctx, b.getSelectOffset()) + selection := LogicalSelection{}.Init(b.ctx, b.getSelectOffset()) for _, cond := range conditions { expr, np, err := b.rewrite(ctx, cond, p, aggMapper, false) if err != nil { diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 78a41ee2bf826..9d4496f009e4b 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -451,9 +451,6 @@ type LogicalSelection struct { // but after we converted to CNF(Conjunctive normal form), it can be // split into a list of AND conditions. Conditions []expression.Expression - - // having selection can't be pushed down, because it must above the aggregation. - buildByHaving bool } // ExtractCorrelatedCols implements LogicalPlan interface. diff --git a/planner/core/rule_predicate_push_down.go b/planner/core/rule_predicate_push_down.go index 3b7be4fbee6be..284ace5183191 100644 --- a/planner/core/rule_predicate_push_down.go +++ b/planner/core/rule_predicate_push_down.go @@ -85,6 +85,7 @@ func (p *LogicalSelection) PredicatePushDown(predicates []expression.Expression) p.Conditions = DeleteTrueExprs(p, p.Conditions) var child LogicalPlan var retConditions []expression.Expression +<<<<<<< HEAD if p.buildByHaving { retConditions, child = p.children[0].PredicatePushDown(predicates) retConditions = append(retConditions, p.Conditions...) @@ -93,6 +94,13 @@ func (p *LogicalSelection) PredicatePushDown(predicates []expression.Expression) retConditions, child = p.children[0].PredicatePushDown(append(canBePushDown, predicates...)) retConditions = append(retConditions, canNotBePushDown...) } +======= + var originConditions []expression.Expression + canBePushDown, canNotBePushDown := splitSetGetVarFunc(p.Conditions) + originConditions = canBePushDown + retConditions, child = p.children[0].PredicatePushDown(append(canBePushDown, predicates...), opt) + retConditions = append(retConditions, canNotBePushDown...) +>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168) if len(retConditions) > 0 { p.Conditions = expression.PropagateConstant(p.ctx, retConditions) // Return table dual when filter is constant false or null. @@ -634,3 +642,196 @@ func (p *LogicalMemTable) PredicatePushDown(predicates []expression.Expression) func (*ppdSolver) name() string { return "predicate_push_down" } +<<<<<<< HEAD +======= + +func appendTableDualTraceStep(replaced LogicalPlan, dual LogicalPlan, conditions []expression.Expression, opt *logicalOptimizeOp) { + action := func() string { + return fmt.Sprintf("%v_%v is replaced by %v_%v", replaced.TP(), replaced.ID(), dual.TP(), dual.ID()) + } + reason := func() string { + buffer := bytes.NewBufferString("The conditions[") + for i, cond := range conditions { + if i > 0 { + buffer.WriteString(",") + } + buffer.WriteString(cond.String()) + } + buffer.WriteString("] are constant false or null") + return buffer.String() + } + opt.appendStepToCurrent(dual.ID(), dual.TP(), reason, action) +} + +func appendSelectionPredicatePushDownTraceStep(p *LogicalSelection, conditions []expression.Expression, opt *logicalOptimizeOp) { + action := func() string { + return fmt.Sprintf("%v_%v is removed", p.TP(), p.ID()) + } + reason := func() string { + return "" + } + if len(conditions) > 0 { + reason = func() string { + buffer := bytes.NewBufferString("The conditions[") + for i, cond := range conditions { + if i > 0 { + buffer.WriteString(",") + } + buffer.WriteString(cond.String()) + } + buffer.WriteString(fmt.Sprintf("] in %v_%v are pushed down", p.TP(), p.ID())) + return buffer.String() + } + } + opt.appendStepToCurrent(p.ID(), p.TP(), reason, action) +} + +func appendDataSourcePredicatePushDownTraceStep(ds *DataSource, opt *logicalOptimizeOp) { + if len(ds.pushedDownConds) < 1 { + return + } + reason := func() string { + return "" + } + action := func() string { + buffer := bytes.NewBufferString("The conditions[") + for i, cond := range ds.pushedDownConds { + if i > 0 { + buffer.WriteString(",") + } + buffer.WriteString(cond.String()) + } + buffer.WriteString(fmt.Sprintf("] are pushed down across %v_%v", ds.TP(), ds.ID())) + return buffer.String() + } + opt.appendStepToCurrent(ds.ID(), ds.TP(), reason, action) +} + +func appendAddSelectionTraceStep(p LogicalPlan, child LogicalPlan, sel *LogicalSelection, opt *logicalOptimizeOp) { + reason := func() string { + return "" + } + action := func() string { + return fmt.Sprintf("add %v_%v to connect %v_%v and %v_%v", sel.TP(), sel.ID(), p.TP(), p.ID(), child.TP(), child.ID()) + } + opt.appendStepToCurrent(sel.ID(), sel.TP(), reason, action) +} + +// AddPrefix4ShardIndexes add expression prefix for shard index. e.g. an index is test.uk(tidb_shard(a), a). +// It transforms the sql "SELECT * FROM test WHERE a = 10" to +// "SELECT * FROM test WHERE tidb_shard(a) = val AND a = 10", val is the value of tidb_shard(10). +// It also transforms the sql "SELECT * FROM test WHERE a IN (10, 20, 30)" to +// "SELECT * FROM test WHERE tidb_shard(a) = val1 AND a = 10 OR tidb_shard(a) = val2 AND a = 20" +// @param[in] conds the original condtion of this datasource +// @retval - the new condition after adding expression prefix +func (ds *DataSource) AddPrefix4ShardIndexes(sc sessionctx.Context, conds []expression.Expression) []expression.Expression { + if !ds.containExprPrefixUk { + return conds + } + + var err error + newConds := conds + + for _, path := range ds.possibleAccessPaths { + if !path.IsUkShardIndexPath { + continue + } + newConds, err = ds.addExprPrefixCond(sc, path, newConds) + if err != nil { + logutil.BgLogger().Error("Add tidb_shard expression failed", + zap.Error(err), + zap.Uint64("connection id", sc.GetSessionVars().ConnectionID), + zap.String("database name", ds.DBName.L), + zap.String("table name", ds.tableInfo.Name.L), + zap.String("index name", path.Index.Name.L)) + return conds + } + } + + return newConds +} + +func (ds *DataSource) addExprPrefixCond(sc sessionctx.Context, path *util.AccessPath, + conds []expression.Expression) ([]expression.Expression, error) { + IdxCols, IdxColLens := + expression.IndexInfo2PrefixCols(ds.Columns, ds.schema.Columns, path.Index) + if len(IdxCols) == 0 { + return conds, nil + } + + adder := &exprPrefixAdder{ + sctx: sc, + OrigConds: conds, + cols: IdxCols, + lengths: IdxColLens, + } + + return adder.addExprPrefix4ShardIndex() +} + +// AddExprPrefix4ShardIndex +// if original condition is a LogicOr expression, such as `WHERE a = 1 OR a = 10`, +// call the function AddExprPrefix4DNFCond to add prefix expression tidb_shard(a) = xxx for shard index. +// Otherwise, if the condition is `WHERE a = 1`, `WHERE a = 1 AND b = 10`, `WHERE a IN (1, 2, 3)`......, +// call the function AddExprPrefix4CNFCond to add prefix expression for shard index. +func (adder *exprPrefixAdder) addExprPrefix4ShardIndex() ([]expression.Expression, error) { + if len(adder.OrigConds) == 1 { + if sf, ok := adder.OrigConds[0].(*expression.ScalarFunction); ok && sf.FuncName.L == ast.LogicOr { + return adder.addExprPrefix4DNFCond(sf) + } + } + return adder.addExprPrefix4CNFCond(adder.OrigConds) +} + +// AddExprPrefix4CNFCond +// add the prefix expression for CNF condition, e.g. `WHERE a = 1`, `WHERE a = 1 AND b = 10`, ...... +// @param[in] conds the original condtion of the datasoure. e.g. `WHERE t1.a = 1 AND t1.b = 10 AND t2.a = 20`. +// if current datasource is `t1`, conds is {t1.a = 1, t1.b = 10}. if current datasource is +// `t2`, conds is {t2.a = 20} +// @return - the new condition after adding expression prefix +func (adder *exprPrefixAdder) addExprPrefix4CNFCond(conds []expression.Expression) ([]expression.Expression, error) { + newCondtionds, err := ranger.AddExpr4EqAndInCondition(adder.sctx, + conds, adder.cols) + + return newCondtionds, err +} + +// AddExprPrefix4DNFCond +// add the prefix expression for DNF condition, e.g. `WHERE a = 1 OR a = 10`, ...... +// The condition returned is `WHERE (tidb_shard(a) = 214 AND a = 1) OR (tidb_shard(a) = 142 AND a = 10)` +// @param[in] condition the original condtion of the datasoure. e.g. `WHERE a = 1 OR a = 10`. +// condtion is `a = 1 OR a = 10` +// @return - the new condition after adding expression prefix. It's still a LogicOr expression. +func (adder *exprPrefixAdder) addExprPrefix4DNFCond(condition *expression.ScalarFunction) ([]expression.Expression, error) { + var err error + dnfItems := expression.FlattenDNFConditions(condition) + newAccessItems := make([]expression.Expression, 0, len(dnfItems)) + + for _, item := range dnfItems { + if sf, ok := item.(*expression.ScalarFunction); ok { + var accesses []expression.Expression + if sf.FuncName.L == ast.LogicAnd { + cnfItems := expression.FlattenCNFConditions(sf) + accesses, err = adder.addExprPrefix4CNFCond(cnfItems) + if err != nil { + return []expression.Expression{condition}, err + } + newAccessItems = append(newAccessItems, expression.ComposeCNFCondition(adder.sctx, accesses...)) + } else if sf.FuncName.L == ast.EQ || sf.FuncName.L == ast.In { + // only add prefix expression for EQ or IN function + accesses, err = adder.addExprPrefix4CNFCond([]expression.Expression{sf}) + if err != nil { + return []expression.Expression{condition}, err + } + newAccessItems = append(newAccessItems, expression.ComposeCNFCondition(adder.sctx, accesses...)) + } else { + newAccessItems = append(newAccessItems, item) + } + } else { + newAccessItems = append(newAccessItems, item) + } + } + + return []expression.Expression{expression.ComposeDNFCondition(adder.sctx, newAccessItems...)}, nil +} +>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168) diff --git a/planner/core/testdata/ordered_result_mode_suite_out.json b/planner/core/testdata/ordered_result_mode_suite_out.json index 80d8b06a86fd6..9fffea09ded93 100644 --- a/planner/core/testdata/ordered_result_mode_suite_out.json +++ b/planner/core/testdata/ordered_result_mode_suite_out.json @@ -417,12 +417,12 @@ }, { "Plan": [ - "Selection_8 6400.00 root lt(Column#6, 20)", - "└─Sort_9 8000.00 root Column#5, Column#6, Column#7", - " └─HashAgg_15 8000.00 root group by:test.t1.d, funcs:min(Column#11)->Column#5, funcs:max(Column#12)->Column#6, funcs:sum(Column#13)->Column#7", - " └─TableReader_16 8000.00 root data:HashAgg_11", - " └─HashAgg_11 8000.00 cop[tikv] group by:test.t1.d, funcs:min(test.t1.a)->Column#11, funcs:max(test.t1.b)->Column#12, funcs:sum(test.t1.c)->Column#13", - " └─TableFullScan_14 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + "Sort_9 6400.00 root Column#5, Column#6, Column#7", + "└─Selection_11 6400.00 root lt(Column#6, 20)", + " └─HashAgg_16 8000.00 root group by:test.t1.d, funcs:min(Column#11)->Column#5, funcs:max(Column#12)->Column#6, funcs:sum(Column#13)->Column#7", + " └─TableReader_17 8000.00 root data:HashAgg_12", + " └─HashAgg_12 8000.00 cop[tikv] group by:test.t1.d, funcs:min(test.t1.a)->Column#11, funcs:max(test.t1.b)->Column#12, funcs:sum(test.t1.c)->Column#13", + " └─TableFullScan_15 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" ] }, { diff --git a/planner/core/testdata/plan_suite_unexported_out.json b/planner/core/testdata/plan_suite_unexported_out.json index 6401df6aa9264..6718fb0071465 100644 --- a/planner/core/testdata/plan_suite_unexported_out.json +++ b/planner/core/testdata/plan_suite_unexported_out.json @@ -3,7 +3,7 @@ "Name": "TestEagerAggregation", "Cases": [ "DataScan(t)->Aggr(sum(test.t.a),sum(plus(test.t.a, 1)),count(test.t.a))->Projection", - "DataScan(t)->Aggr(sum(plus(test.t.a, test.t.b)),sum(plus(test.t.a, test.t.c)),count(test.t.a))->Projection->Sel([gt(Column#13, 0)])->Sort->Projection", + "DataScan(t)->Aggr(sum(plus(test.t.a, test.t.b)),sum(plus(test.t.a, test.t.c)),count(test.t.a))->Sel([gt(Column#13, 0)])->Projection->Sort->Projection", "Join{DataScan(a)->Aggr(sum(test.t.a),firstrow(test.t.c))->DataScan(b)}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection", "Join{DataScan(a)->DataScan(b)->Aggr(sum(test.t.a),firstrow(test.t.c))}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection", "Join{DataScan(a)->DataScan(b)->Aggr(sum(test.t.a),firstrow(test.t.c))}(test.t.c,test.t.c)->Aggr(sum(Column#26),firstrow(test.t.a))->Projection", @@ -89,7 +89,7 @@ "DataScan(t)->Aggr(sum(test.t.b),firstrow(test.t.a))->Sel([gt(cast(test.t.a, decimal(20,0) BINARY), Column#13)])->Projection->Projection", "DataScan(t)->Aggr(sum(test.t.b),firstrow(test.t.a))->Sel([gt(test.t.a, 1)])->Projection->Projection", "Dual->Sel([gt(test.t.a, 1)])->Projection", - "DataScan(t)->Aggr(count(test.t.a),firstrow(test.t.a))->Projection->Sel([lt(Column#13, 1)])", + "DataScan(t)->Aggr(count(test.t.a),firstrow(test.t.a))->Sel([lt(Column#13, 1)])->Projection", "Join{DataScan(t1)->DataScan(t2)}(test.t.a,test.t.a)->Projection", "Dual->Projection", "DataScan(t)->Projection->Projection->Window(min(test.t.a)->Column#14)->Sel([lt(test.t.a, 10) eq(test.t.b, Column#14)])->Projection->Projection", @@ -194,7 +194,11 @@ "TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over())->Sort->Projection", "TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over(partition by test.t.a))->Sort->Projection", "TableReader(Table(t)->StreamAgg)->StreamAgg->Window(sum(Column#13)->Column#15 over())->Sort->Projection", +<<<<<<< HEAD "Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#38 over())->MaxOneRow}->Sel([Column#38])->Projection", +======= + "Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(10,0) BINARY))->Column#38 over())->MaxOneRow->Sel([Column#38])}->Projection", +>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168) "[planner:3594]You cannot use the alias 'w' of an expression containing a window function in this context.'", "[planner:1247]Reference 'sum_a' not supported (reference to window function)", "[planner:3579]Window name 'w2' is not defined.", @@ -267,7 +271,11 @@ "TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over())->Sort->Projection", "TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over(partition by test.t.a))->Sort->Projection", "TableReader(Table(t)->StreamAgg)->StreamAgg->Window(sum(Column#13)->Column#15 over())->Sort->Projection", +<<<<<<< HEAD "Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#38 over())->MaxOneRow}->Sel([Column#38])->Projection", +======= + "Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(10,0) BINARY))->Column#38 over())->MaxOneRow->Sel([Column#38])}->Projection", +>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168) "[planner:3594]You cannot use the alias 'w' of an expression containing a window function in this context.'", "[planner:1247]Reference 'sum_a' not supported (reference to window function)", "[planner:3579]Window name 'w2' is not defined.", @@ -482,12 +490,12 @@ "test.t.f" ] ], - "4": [ + "5": [ [ "test.t.f" ] ], - "5": [ + "6": [ [ "test.t.f" ] diff --git a/statistics/testdata/stats_suite_out.json b/statistics/testdata/stats_suite_out.json index 2897c02fb3df2..ce489d7ecbe33 100644 --- a/statistics/testdata/stats_suite_out.json +++ b/statistics/testdata/stats_suite_out.json @@ -706,12 +706,16 @@ { "Start": 300, "End": 899, - "Count": 4500 + "Count": 4498.5 }, { "Start": 800, "End": 1000, +<<<<<<< HEAD "Count": 1219.196869573942 +======= + "Count": 1201.196869573942 +>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168) }, { "Start": 900, @@ -736,7 +740,11 @@ { "Start": 200, "End": 400, +<<<<<<< HEAD "Count": 1186.5288209899081 +======= + "Count": 1211.5288209899081 +>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168) }, { "Start": 200,