Skip to content

Commit

Permalink
cherry pick pingcap#33168 to release-5.3
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
wjhuang2016 authored and ti-srebot committed Mar 17, 2022
1 parent 459917c commit 22ed9aa
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 15 deletions.
8 changes: 8 additions & 0 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,7 @@ func (s *testSuiteAgg) TestHaving(c *C) {
tk.MustQuery("select 1 from t group by c1 having sum(abs(c2 + c3)) = c1").Check(testkit.Rows("1"))
}

<<<<<<< HEAD
func (s *testSuiteAgg) TestIssue26496(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)

Expand All @@ -931,6 +932,13 @@ func (s *testSuiteAgg) TestIssue26496(c *C) {

func (s *testSuiteAgg) TestAggEliminator(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
=======
func TestAggEliminator(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)

tk.MustExec("create table t(a int primary key, b int)")
tk.MustQuery("select min(a), min(a) from t").Check(testkit.Rows("<nil> <nil>"))
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ func (b *PlanBuilder) buildSelection(ctx context.Context, p LogicalPlan, where a

conditions := splitWhere(where)
expressions := make([]expression.Expression, 0, len(conditions))
selection := LogicalSelection{buildByHaving: aggMapper != nil}.Init(b.ctx, b.getSelectOffset())
selection := LogicalSelection{}.Init(b.ctx, b.getSelectOffset())
for _, cond := range conditions {
expr, np, err := b.rewrite(ctx, cond, p, aggMapper, false)
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,6 @@ type LogicalSelection struct {
// but after we converted to CNF(Conjunctive normal form), it can be
// split into a list of AND conditions.
Conditions []expression.Expression

// having selection can't be pushed down, because it must above the aggregation.
buildByHaving bool
}

// ExtractCorrelatedCols implements LogicalPlan interface.
Expand Down
201 changes: 201 additions & 0 deletions planner/core/rule_predicate_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (p *LogicalSelection) PredicatePushDown(predicates []expression.Expression)
p.Conditions = DeleteTrueExprs(p, p.Conditions)
var child LogicalPlan
var retConditions []expression.Expression
<<<<<<< HEAD
if p.buildByHaving {
retConditions, child = p.children[0].PredicatePushDown(predicates)
retConditions = append(retConditions, p.Conditions...)
Expand All @@ -93,6 +94,13 @@ func (p *LogicalSelection) PredicatePushDown(predicates []expression.Expression)
retConditions, child = p.children[0].PredicatePushDown(append(canBePushDown, predicates...))
retConditions = append(retConditions, canNotBePushDown...)
}
=======
var originConditions []expression.Expression
canBePushDown, canNotBePushDown := splitSetGetVarFunc(p.Conditions)
originConditions = canBePushDown
retConditions, child = p.children[0].PredicatePushDown(append(canBePushDown, predicates...), opt)
retConditions = append(retConditions, canNotBePushDown...)
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
if len(retConditions) > 0 {
p.Conditions = expression.PropagateConstant(p.ctx, retConditions)
// Return table dual when filter is constant false or null.
Expand Down Expand Up @@ -634,3 +642,196 @@ func (p *LogicalMemTable) PredicatePushDown(predicates []expression.Expression)
func (*ppdSolver) name() string {
return "predicate_push_down"
}
<<<<<<< HEAD
=======

func appendTableDualTraceStep(replaced LogicalPlan, dual LogicalPlan, conditions []expression.Expression, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v is replaced by %v_%v", replaced.TP(), replaced.ID(), dual.TP(), dual.ID())
}
reason := func() string {
buffer := bytes.NewBufferString("The conditions[")
for i, cond := range conditions {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(cond.String())
}
buffer.WriteString("] are constant false or null")
return buffer.String()
}
opt.appendStepToCurrent(dual.ID(), dual.TP(), reason, action)
}

func appendSelectionPredicatePushDownTraceStep(p *LogicalSelection, conditions []expression.Expression, opt *logicalOptimizeOp) {
action := func() string {
return fmt.Sprintf("%v_%v is removed", p.TP(), p.ID())
}
reason := func() string {
return ""
}
if len(conditions) > 0 {
reason = func() string {
buffer := bytes.NewBufferString("The conditions[")
for i, cond := range conditions {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(cond.String())
}
buffer.WriteString(fmt.Sprintf("] in %v_%v are pushed down", p.TP(), p.ID()))
return buffer.String()
}
}
opt.appendStepToCurrent(p.ID(), p.TP(), reason, action)
}

func appendDataSourcePredicatePushDownTraceStep(ds *DataSource, opt *logicalOptimizeOp) {
if len(ds.pushedDownConds) < 1 {
return
}
reason := func() string {
return ""
}
action := func() string {
buffer := bytes.NewBufferString("The conditions[")
for i, cond := range ds.pushedDownConds {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(cond.String())
}
buffer.WriteString(fmt.Sprintf("] are pushed down across %v_%v", ds.TP(), ds.ID()))
return buffer.String()
}
opt.appendStepToCurrent(ds.ID(), ds.TP(), reason, action)
}

func appendAddSelectionTraceStep(p LogicalPlan, child LogicalPlan, sel *LogicalSelection, opt *logicalOptimizeOp) {
reason := func() string {
return ""
}
action := func() string {
return fmt.Sprintf("add %v_%v to connect %v_%v and %v_%v", sel.TP(), sel.ID(), p.TP(), p.ID(), child.TP(), child.ID())
}
opt.appendStepToCurrent(sel.ID(), sel.TP(), reason, action)
}

// AddPrefix4ShardIndexes add expression prefix for shard index. e.g. an index is test.uk(tidb_shard(a), a).
// It transforms the sql "SELECT * FROM test WHERE a = 10" to
// "SELECT * FROM test WHERE tidb_shard(a) = val AND a = 10", val is the value of tidb_shard(10).
// It also transforms the sql "SELECT * FROM test WHERE a IN (10, 20, 30)" to
// "SELECT * FROM test WHERE tidb_shard(a) = val1 AND a = 10 OR tidb_shard(a) = val2 AND a = 20"
// @param[in] conds the original condtion of this datasource
// @retval - the new condition after adding expression prefix
func (ds *DataSource) AddPrefix4ShardIndexes(sc sessionctx.Context, conds []expression.Expression) []expression.Expression {
if !ds.containExprPrefixUk {
return conds
}

var err error
newConds := conds

for _, path := range ds.possibleAccessPaths {
if !path.IsUkShardIndexPath {
continue
}
newConds, err = ds.addExprPrefixCond(sc, path, newConds)
if err != nil {
logutil.BgLogger().Error("Add tidb_shard expression failed",
zap.Error(err),
zap.Uint64("connection id", sc.GetSessionVars().ConnectionID),
zap.String("database name", ds.DBName.L),
zap.String("table name", ds.tableInfo.Name.L),
zap.String("index name", path.Index.Name.L))
return conds
}
}

return newConds
}

func (ds *DataSource) addExprPrefixCond(sc sessionctx.Context, path *util.AccessPath,
conds []expression.Expression) ([]expression.Expression, error) {
IdxCols, IdxColLens :=
expression.IndexInfo2PrefixCols(ds.Columns, ds.schema.Columns, path.Index)
if len(IdxCols) == 0 {
return conds, nil
}

adder := &exprPrefixAdder{
sctx: sc,
OrigConds: conds,
cols: IdxCols,
lengths: IdxColLens,
}

return adder.addExprPrefix4ShardIndex()
}

// AddExprPrefix4ShardIndex
// if original condition is a LogicOr expression, such as `WHERE a = 1 OR a = 10`,
// call the function AddExprPrefix4DNFCond to add prefix expression tidb_shard(a) = xxx for shard index.
// Otherwise, if the condition is `WHERE a = 1`, `WHERE a = 1 AND b = 10`, `WHERE a IN (1, 2, 3)`......,
// call the function AddExprPrefix4CNFCond to add prefix expression for shard index.
func (adder *exprPrefixAdder) addExprPrefix4ShardIndex() ([]expression.Expression, error) {
if len(adder.OrigConds) == 1 {
if sf, ok := adder.OrigConds[0].(*expression.ScalarFunction); ok && sf.FuncName.L == ast.LogicOr {
return adder.addExprPrefix4DNFCond(sf)
}
}
return adder.addExprPrefix4CNFCond(adder.OrigConds)
}

// AddExprPrefix4CNFCond
// add the prefix expression for CNF condition, e.g. `WHERE a = 1`, `WHERE a = 1 AND b = 10`, ......
// @param[in] conds the original condtion of the datasoure. e.g. `WHERE t1.a = 1 AND t1.b = 10 AND t2.a = 20`.
// if current datasource is `t1`, conds is {t1.a = 1, t1.b = 10}. if current datasource is
// `t2`, conds is {t2.a = 20}
// @return - the new condition after adding expression prefix
func (adder *exprPrefixAdder) addExprPrefix4CNFCond(conds []expression.Expression) ([]expression.Expression, error) {
newCondtionds, err := ranger.AddExpr4EqAndInCondition(adder.sctx,
conds, adder.cols)

return newCondtionds, err
}

// AddExprPrefix4DNFCond
// add the prefix expression for DNF condition, e.g. `WHERE a = 1 OR a = 10`, ......
// The condition returned is `WHERE (tidb_shard(a) = 214 AND a = 1) OR (tidb_shard(a) = 142 AND a = 10)`
// @param[in] condition the original condtion of the datasoure. e.g. `WHERE a = 1 OR a = 10`.
// condtion is `a = 1 OR a = 10`
// @return - the new condition after adding expression prefix. It's still a LogicOr expression.
func (adder *exprPrefixAdder) addExprPrefix4DNFCond(condition *expression.ScalarFunction) ([]expression.Expression, error) {
var err error
dnfItems := expression.FlattenDNFConditions(condition)
newAccessItems := make([]expression.Expression, 0, len(dnfItems))

for _, item := range dnfItems {
if sf, ok := item.(*expression.ScalarFunction); ok {
var accesses []expression.Expression
if sf.FuncName.L == ast.LogicAnd {
cnfItems := expression.FlattenCNFConditions(sf)
accesses, err = adder.addExprPrefix4CNFCond(cnfItems)
if err != nil {
return []expression.Expression{condition}, err
}
newAccessItems = append(newAccessItems, expression.ComposeCNFCondition(adder.sctx, accesses...))
} else if sf.FuncName.L == ast.EQ || sf.FuncName.L == ast.In {
// only add prefix expression for EQ or IN function
accesses, err = adder.addExprPrefix4CNFCond([]expression.Expression{sf})
if err != nil {
return []expression.Expression{condition}, err
}
newAccessItems = append(newAccessItems, expression.ComposeCNFCondition(adder.sctx, accesses...))
} else {
newAccessItems = append(newAccessItems, item)
}
} else {
newAccessItems = append(newAccessItems, item)
}
}

return []expression.Expression{expression.ComposeDNFCondition(adder.sctx, newAccessItems...)}, nil
}
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
12 changes: 6 additions & 6 deletions planner/core/testdata/ordered_result_mode_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,12 @@
},
{
"Plan": [
"Selection_8 6400.00 root lt(Column#6, 20)",
"└─Sort_9 8000.00 root Column#5, Column#6, Column#7",
" └─HashAgg_15 8000.00 root group by:test.t1.d, funcs:min(Column#11)->Column#5, funcs:max(Column#12)->Column#6, funcs:sum(Column#13)->Column#7",
" └─TableReader_16 8000.00 root data:HashAgg_11",
" └─HashAgg_11 8000.00 cop[tikv] group by:test.t1.d, funcs:min(test.t1.a)->Column#11, funcs:max(test.t1.b)->Column#12, funcs:sum(test.t1.c)->Column#13",
" └─TableFullScan_14 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo"
"Sort_9 6400.00 root Column#5, Column#6, Column#7",
"└─Selection_11 6400.00 root lt(Column#6, 20)",
" └─HashAgg_16 8000.00 root group by:test.t1.d, funcs:min(Column#11)->Column#5, funcs:max(Column#12)->Column#6, funcs:sum(Column#13)->Column#7",
" └─TableReader_17 8000.00 root data:HashAgg_12",
" └─HashAgg_12 8000.00 cop[tikv] group by:test.t1.d, funcs:min(test.t1.a)->Column#11, funcs:max(test.t1.b)->Column#12, funcs:sum(test.t1.c)->Column#13",
" └─TableFullScan_15 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo"
]
},
{
Expand Down
16 changes: 12 additions & 4 deletions planner/core/testdata/plan_suite_unexported_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"Name": "TestEagerAggregation",
"Cases": [
"DataScan(t)->Aggr(sum(test.t.a),sum(plus(test.t.a, 1)),count(test.t.a))->Projection",
"DataScan(t)->Aggr(sum(plus(test.t.a, test.t.b)),sum(plus(test.t.a, test.t.c)),count(test.t.a))->Projection->Sel([gt(Column#13, 0)])->Sort->Projection",
"DataScan(t)->Aggr(sum(plus(test.t.a, test.t.b)),sum(plus(test.t.a, test.t.c)),count(test.t.a))->Sel([gt(Column#13, 0)])->Projection->Sort->Projection",
"Join{DataScan(a)->Aggr(sum(test.t.a),firstrow(test.t.c))->DataScan(b)}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection",
"Join{DataScan(a)->DataScan(b)->Aggr(sum(test.t.a),firstrow(test.t.c))}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection",
"Join{DataScan(a)->DataScan(b)->Aggr(sum(test.t.a),firstrow(test.t.c))}(test.t.c,test.t.c)->Aggr(sum(Column#26),firstrow(test.t.a))->Projection",
Expand Down Expand Up @@ -89,7 +89,7 @@
"DataScan(t)->Aggr(sum(test.t.b),firstrow(test.t.a))->Sel([gt(cast(test.t.a, decimal(20,0) BINARY), Column#13)])->Projection->Projection",
"DataScan(t)->Aggr(sum(test.t.b),firstrow(test.t.a))->Sel([gt(test.t.a, 1)])->Projection->Projection",
"Dual->Sel([gt(test.t.a, 1)])->Projection",
"DataScan(t)->Aggr(count(test.t.a),firstrow(test.t.a))->Projection->Sel([lt(Column#13, 1)])",
"DataScan(t)->Aggr(count(test.t.a),firstrow(test.t.a))->Sel([lt(Column#13, 1)])->Projection",
"Join{DataScan(t1)->DataScan(t2)}(test.t.a,test.t.a)->Projection",
"Dual->Projection",
"DataScan(t)->Projection->Projection->Window(min(test.t.a)->Column#14)->Sel([lt(test.t.a, 10) eq(test.t.b, Column#14)])->Projection->Projection",
Expand Down Expand Up @@ -194,7 +194,11 @@
"TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over())->Sort->Projection",
"TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over(partition by test.t.a))->Sort->Projection",
"TableReader(Table(t)->StreamAgg)->StreamAgg->Window(sum(Column#13)->Column#15 over())->Sort->Projection",
<<<<<<< HEAD
"Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#38 over())->MaxOneRow}->Sel([Column#38])->Projection",
=======
"Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(10,0) BINARY))->Column#38 over())->MaxOneRow->Sel([Column#38])}->Projection",
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
"[planner:3594]You cannot use the alias 'w' of an expression containing a window function in this context.'",
"[planner:1247]Reference 'sum_a' not supported (reference to window function)",
"[planner:3579]Window name 'w2' is not defined.",
Expand Down Expand Up @@ -267,7 +271,11 @@
"TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over())->Sort->Projection",
"TableReader(Table(t))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#14 over(partition by test.t.a))->Sort->Projection",
"TableReader(Table(t)->StreamAgg)->StreamAgg->Window(sum(Column#13)->Column#15 over())->Sort->Projection",
<<<<<<< HEAD
"Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#38 over())->MaxOneRow}->Sel([Column#38])->Projection",
=======
"Apply{IndexReader(Index(t.f)[[NULL,+inf]])->IndexReader(Index(t.f)[[NULL,+inf]]->Sel([gt(test.t.a, test.t.a)]))->Window(sum(cast(test.t.a, decimal(10,0) BINARY))->Column#38 over())->MaxOneRow->Sel([Column#38])}->Projection",
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
"[planner:3594]You cannot use the alias 'w' of an expression containing a window function in this context.'",
"[planner:1247]Reference 'sum_a' not supported (reference to window function)",
"[planner:3579]Window name 'w2' is not defined.",
Expand Down Expand Up @@ -482,12 +490,12 @@
"test.t.f"
]
],
"4": [
"5": [
[
"test.t.f"
]
],
"5": [
"6": [
[
"test.t.f"
]
Expand Down
10 changes: 9 additions & 1 deletion statistics/testdata/stats_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -706,12 +706,16 @@
{
"Start": 300,
"End": 899,
"Count": 4500
"Count": 4498.5
},
{
"Start": 800,
"End": 1000,
<<<<<<< HEAD
"Count": 1219.196869573942
=======
"Count": 1201.196869573942
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
},
{
"Start": 900,
Expand All @@ -736,7 +740,11 @@
{
"Start": 200,
"End": 400,
<<<<<<< HEAD
"Count": 1186.5288209899081
=======
"Count": 1211.5288209899081
>>>>>>> 47e4b5bf3... *: revert #27021 to fix a bug that selection can not be pushed down when having clause above aggregation (#33168)
},
{
"Start": 200,
Expand Down

0 comments on commit 22ed9aa

Please sign in to comment.