Skip to content

Commit

Permalink
planner: fix bugs and make it more effective in outer join eliminatio…
Browse files Browse the repository at this point in the history
…n rule. (#11160) (#11377)

All tests passed, auto merged by Bot
  • Loading branch information
lzmhhh123 authored and sre-bot committed Jul 29, 2019
1 parent 58aae12 commit aede000
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 64 deletions.
59 changes: 59 additions & 0 deletions cmd/explaintest/r/explain_complex.result
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,62 @@ HashAgg_34 72000.00 root group by:col_1, funcs:sum(col_0)
│ └─TableScan_58 10000.00 cop table:tbl_008, range:[-inf,+inf], keep order:false, stats:pseudo
└─TableReader_62 10000.00 root data:TableScan_61
└─TableScan_61 10000.00 cop table:tbl_009, range:[-inf,+inf], keep order:false, stats:pseudo
CREATE TABLE org_department (
id int(11) NOT NULL AUTO_INCREMENT,
ctx int(11) DEFAULT '0' COMMENT 'organization id',
name varchar(128) DEFAULT NULL,
left_value int(11) DEFAULT NULL,
right_value int(11) DEFAULT NULL,
depth int(11) DEFAULT NULL,
leader_id bigint(20) DEFAULT NULL,
status int(11) DEFAULT '1000',
created_on datetime DEFAULT NULL,
updated_on datetime DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY org_department_id_uindex (id),
KEY org_department_leader_id_index (leader_id),
KEY org_department_ctx_index (ctx)
);
CREATE TABLE org_position (
id int(11) NOT NULL AUTO_INCREMENT,
ctx int(11) DEFAULT NULL,
name varchar(128) DEFAULT NULL,
left_value int(11) DEFAULT NULL,
right_value int(11) DEFAULT NULL,
depth int(11) DEFAULT NULL,
department_id int(11) DEFAULT NULL,
status int(2) DEFAULT NULL,
created_on datetime DEFAULT NULL,
updated_on datetime DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY org_position_id_uindex (id),
KEY org_position_department_id_index (department_id)
) ENGINE=InnoDB AUTO_INCREMENT=22 DEFAULT CHARSET=utf8;
CREATE TABLE org_employee_position (
hotel_id int(11) DEFAULT NULL,
user_id bigint(20) DEFAULT NULL,
position_id int(11) DEFAULT NULL,
status int(11) DEFAULT NULL,
created_on datetime DEFAULT NULL,
updated_on datetime DEFAULT NULL,
UNIQUE KEY org_employee_position_pk (hotel_id,user_id,position_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
explain SELECT d.id, d.ctx, d.name, d.left_value, d.right_value, d.depth, d.leader_id, d.status, d.created_on, d.updated_on FROM org_department AS d LEFT JOIN org_position AS p ON p.department_id = d.id AND p.status = 1000 LEFT JOIN org_employee_position AS ep ON ep.position_id = p.id AND ep.status = 1000 WHERE (d.ctx = 1 AND (ep.user_id = 62 OR d.id = 20 OR d.id = 20) AND d.status = 1000) GROUP BY d.id ORDER BY d.left_value;
id count task operator info
Sort_10 1.00 root test.d.left_value:asc
└─HashAgg_15 1.00 root group by:test.d.id, funcs:firstrow(test.d.id), firstrow(test.d.ctx), firstrow(test.d.name), firstrow(test.d.left_value), firstrow(test.d.right_value), firstrow(test.d.depth), firstrow(test.d.leader_id), firstrow(test.d.status), firstrow(test.d.created_on), firstrow(test.d.updated_on)
└─Selection_20 0.01 root or(eq(test.ep.user_id, 62), or(eq(test.d.id, 20), eq(test.d.id, 20)))
└─HashLeftJoin_21 0.02 root left outer join, inner:TableReader_55, equal:[eq(test.p.id, test.ep.position_id)]
├─IndexJoin_29 0.01 root left outer join, inner:IndexLookUp_28, outer key:test.d.id, inner key:test.p.department_id
│ ├─IndexLookUp_45 0.01 root
│ │ ├─IndexScan_42 10.00 cop table:d, index:ctx, range:[1,1], keep order:false, stats:pseudo
│ │ └─Selection_44 0.01 cop eq(test.d.status, 1000)
│ │ └─TableScan_43 10.00 cop table:org_department, keep order:false, stats:pseudo
│ └─IndexLookUp_28 0.01 root
│ ├─Selection_26 9.99 cop not(isnull(test.p.department_id))
│ │ └─IndexScan_24 10.00 cop table:p, index:department_id, range: decided by [eq(test.p.department_id, test.d.id)], keep order:false, stats:pseudo
│ └─Selection_27 0.01 cop eq(test.p.status, 1000)
│ └─TableScan_25 9.99 cop table:org_position, keep order:false, stats:pseudo
└─TableReader_55 9.99 root data:Selection_54
└─Selection_54 9.99 cop eq(test.ep.status, 1000), not(isnull(test.ep.position_id))
└─TableScan_53 10000.00 cop table:ep, range:[-inf,+inf], keep order:false, stats:pseudo
43 changes: 43 additions & 0 deletions cmd/explaintest/t/explain_complex.test
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,46 @@ CREATE TABLE `tbl_008` (`a` int, `b` int);
CREATE TABLE `tbl_009` (`a` int, `b` int);

explain select sum(a) from (select * from tbl_001 union all select * from tbl_002 union all select * from tbl_003 union all select * from tbl_004 union all select * from tbl_005 union all select * from tbl_006 union all select * from tbl_007 union all select * from tbl_008 union all select * from tbl_009) x group by b;

CREATE TABLE org_department (
id int(11) NOT NULL AUTO_INCREMENT,
ctx int(11) DEFAULT '0' COMMENT 'organization id',
name varchar(128) DEFAULT NULL,
left_value int(11) DEFAULT NULL,
right_value int(11) DEFAULT NULL,
depth int(11) DEFAULT NULL,
leader_id bigint(20) DEFAULT NULL,
status int(11) DEFAULT '1000',
created_on datetime DEFAULT NULL,
updated_on datetime DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY org_department_id_uindex (id),
KEY org_department_leader_id_index (leader_id),
KEY org_department_ctx_index (ctx)
);
CREATE TABLE org_position (
id int(11) NOT NULL AUTO_INCREMENT,
ctx int(11) DEFAULT NULL,
name varchar(128) DEFAULT NULL,
left_value int(11) DEFAULT NULL,
right_value int(11) DEFAULT NULL,
depth int(11) DEFAULT NULL,
department_id int(11) DEFAULT NULL,
status int(2) DEFAULT NULL,
created_on datetime DEFAULT NULL,
updated_on datetime DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY org_position_id_uindex (id),
KEY org_position_department_id_index (department_id)
) ENGINE=InnoDB AUTO_INCREMENT=22 DEFAULT CHARSET=utf8;
CREATE TABLE org_employee_position (
hotel_id int(11) DEFAULT NULL,
user_id bigint(20) DEFAULT NULL,
position_id int(11) DEFAULT NULL,
status int(11) DEFAULT NULL,
created_on datetime DEFAULT NULL,
updated_on datetime DEFAULT NULL,
UNIQUE KEY org_employee_position_pk (hotel_id,user_id,position_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

explain SELECT d.id, d.ctx, d.name, d.left_value, d.right_value, d.depth, d.leader_id, d.status, d.created_on, d.updated_on FROM org_department AS d LEFT JOIN org_position AS p ON p.department_id = d.id AND p.status = 1000 LEFT JOIN org_employee_position AS ep ON ep.position_id = p.id AND ep.status = 1000 WHERE (d.ctx = 1 AND (ep.user_id = 62 OR d.id = 20 OR d.id = 20) AND d.status = 1000) GROUP BY d.id ORDER BY d.left_value;
16 changes: 15 additions & 1 deletion planner/core/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2066,7 +2066,21 @@ func (s *testPlanSuite) TestOuterJoinEliminator(c *C) {
// For complex join query
{
sql: "select max(t3.b) from (t t1 left join t t2 on t1.a = t2.a) right join t t3 on t1.b = t3.b",
best: "DataScan(t3)->TopN([test.t3.b true],0,1)->Aggr(max(test.t3.b))->Projection",
best: "Join{Join{DataScan(t1)->DataScan(t2)}(test.t1.a,test.t2.a)->DataScan(t3)->TopN([test.t3.b true],0,1)}(test.t1.b,test.t3.b)->TopN([test.t3.b true],0,1)->Aggr(max(test.t3.b))->Projection",
},
{
sql: "select t1.a ta, t1.b tb from t t1 left join t t2 on t1.a = t2.a",
best: "DataScan(t1)->Projection",
},
{
// Because the `order by` uses t2.a, the `join` can't be eliminated.
sql: "select t1.a, t1.b from t t1 left join t t2 on t1.a = t2.a order by t2.a",
best: "Join{DataScan(t1)->DataScan(t2)}(test.t1.a,test.t2.a)->Sort->Projection",
},
// For issue 11167
{
sql: "select a.a from t a natural left join t b natural left join t c",
best: "DataScan(a)->Projection",
},
}

Expand Down
8 changes: 8 additions & 0 deletions planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ type LogicalPlan interface {

// SetChildren sets the children for the plan.
SetChildren(...LogicalPlan)

// SetChild sets the ith child for the plan.
SetChild(i int, child LogicalPlan)
}

// PhysicalPlan is a tree of the physical operators.
Expand Down Expand Up @@ -299,6 +302,11 @@ func (p *basePhysicalPlan) SetChildren(children ...PhysicalPlan) {
p.children = children
}

// SetChild implements LogicalPlan SetChild interface.
func (p *baseLogicalPlan) SetChild(i int, child LogicalPlan) {
p.children[i] = child
}

// SetChild implements PhysicalPlan SetChild interface.
func (p *basePhysicalPlan) SetChild(i int, child PhysicalPlan) {
p.children[i] = child
Expand Down
153 changes: 90 additions & 63 deletions planner/core/rule_join_elimination.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core
import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/util/set"
)

type outerJoinEliminator struct {
Expand All @@ -28,40 +29,50 @@ type outerJoinEliminator struct {
// 2. outer join elimination with duplicate agnostic aggregate functions: For example left outer join.
// If the parent only use the columns from left table with 'distinct' label. The left outer join can
// be eliminated.
func (o *outerJoinEliminator) tryToEliminateOuterJoin(p *LogicalJoin, aggCols []*expression.Column, parentSchema *expression.Schema) (LogicalPlan, error) {
func (o *outerJoinEliminator) tryToEliminateOuterJoin(p *LogicalJoin, aggCols []*expression.Column, parentCols []*expression.Column) (LogicalPlan, bool, error) {
var innerChildIdx int
switch p.JoinType {
case LeftOuterJoin:
innerChildIdx = 1
case RightOuterJoin:
innerChildIdx = 0
default:
return p, nil
return p, false, nil
}

outerPlan := p.children[1^innerChildIdx]
innerPlan := p.children[innerChildIdx]
outerUniqueIDs := set.NewInt64Set()
for _, outerCol := range outerPlan.Schema().Columns {
outerUniqueIDs.Insert(outerCol.UniqueID)
}
matched := o.isColsAllFromOuterTable(parentCols, outerUniqueIDs)
if !matched {
return p, false, nil
}
// outer join elimination with duplicate agnostic aggregate functions
matched, err := o.isAggColsAllFromOuterTable(outerPlan, aggCols)
if err != nil || matched {
return outerPlan, err
matched = o.isColsAllFromOuterTable(aggCols, outerUniqueIDs)
if matched {
return outerPlan, true, nil
}
// outer join elimination without duplicate agnostic aggregate functions
matched, err = o.isParentColsAllFromOuterTable(outerPlan, parentSchema)
if err != nil || !matched {
return p, err
}
innerJoinKeys := o.extractInnerJoinKeys(p, innerChildIdx)
contain, err := o.isInnerJoinKeysContainUniqueKey(innerPlan, innerJoinKeys)
if err != nil || contain {
return outerPlan, err
if err != nil {
return p, false, err
}
if contain {
return outerPlan, true, nil
}
contain, err = o.isInnerJoinKeysContainIndex(innerPlan, innerJoinKeys)
if err != nil || contain {
return outerPlan, err
if err != nil {
return p, false, err
}
if contain {
return outerPlan, true, nil
}

return p, nil
return p, false, nil
}

// extract join keys as a schema for inner child of a outer join
Expand All @@ -73,33 +84,20 @@ func (o *outerJoinEliminator) extractInnerJoinKeys(join *LogicalJoin, innerChild
return expression.NewSchema(joinKeys...)
}

func (o *outerJoinEliminator) isAggColsAllFromOuterTable(outerPlan LogicalPlan, aggCols []*expression.Column) (bool, error) {
if len(aggCols) == 0 {
return false, nil
}
for _, col := range aggCols {
columnName := &ast.ColumnName{Schema: col.DBName, Table: col.TblName, Name: col.ColName}
c, err := outerPlan.Schema().FindColumn(columnName)
if err != nil || c == nil {
return false, err
}
}
return true, nil
}

// check whether schema cols of join's parent plan are all from outer join table
func (o *outerJoinEliminator) isParentColsAllFromOuterTable(outerPlan LogicalPlan, parentSchema *expression.Schema) (bool, error) {
if parentSchema == nil {
return false, nil
}
for _, col := range parentSchema.Columns {
columnName := &ast.ColumnName{Schema: col.DBName, Table: col.TblName, Name: col.ColName}
c, err := outerPlan.Schema().FindColumn(columnName)
if err != nil || c == nil {
return false, err
// check whether the cols all from outer plan
func (o *outerJoinEliminator) isColsAllFromOuterTable(cols []*expression.Column, outerUniqueIDs set.Int64Set) bool {
// There are two cases "return false" here:
// 1. If cols represents aggCols, then "len(cols) == 0" means not all aggregate functions are duplicate agnostic before.
// 2. If cols represents parentCols, then "len(cols) == 0" means no parent logical plan of this join plan.
if len(cols) == 0 {
return false
}
for _, col := range cols {
if !outerUniqueIDs.Exist(col.UniqueID) {
return false
}
}
return true, nil
return true
}

// check whether one of unique keys sets is contained by inner join keys
Expand Down Expand Up @@ -157,52 +155,81 @@ func (o *outerJoinEliminator) isInnerJoinKeysContainIndex(innerPlan LogicalPlan,
return false, nil
}

// Check whether a LogicalPlan is a LogicalAggregation and its all aggregate functions is duplicate agnostic.
// Also, check all the args are expression.Column.
func (o *outerJoinEliminator) isDuplicateAgnosticAgg(p LogicalPlan) (_ bool, cols []*expression.Column) {
// getDupAgnosticAggCols checks whether a LogicalPlan is LogicalAggregation.
// It extracts all the columns from the duplicate agnostic aggregate functions.
// The returned column set is nil if not all the aggregate functions are duplicate agnostic.
// Only the following functions are considered to be duplicate agnostic:
// 1. MAX(arg)
// 2. MIN(arg)
// 3. FIRST_ROW(arg)
// 4. Other agg functions with DISTINCT flag, like SUM(DISTINCT arg)
func (o *outerJoinEliminator) getDupAgnosticAggCols(
p LogicalPlan,
oldAggCols []*expression.Column, // Reuse the original buffer.
) (isAgg bool, newAggCols []*expression.Column) {
agg, ok := p.(*LogicalAggregation)
if !ok {
return false, nil
}
cols = agg.groupByCols
newAggCols = oldAggCols[:0]
for _, aggDesc := range agg.AggFuncs {
if !aggDesc.HasDistinct &&
aggDesc.Name != ast.AggFuncFirstRow &&
aggDesc.Name != ast.AggFuncMax &&
aggDesc.Name != ast.AggFuncMin {
return false, nil
// If not all aggregate functions are duplicate agnostic,
// we should clean the aggCols, so `return true, newAggCols[:0]`.
return true, newAggCols[:0]
}
for _, expr := range aggDesc.Args {
if col, ok := expr.(*expression.Column); ok {
cols = append(cols, col)
} else {
return false, nil
}
newAggCols = append(newAggCols, expression.ExtractColumns(expr)...)
}
}
return true, cols
return true, newAggCols
}

func (o *outerJoinEliminator) doOptimize(p LogicalPlan, aggCols []*expression.Column, parentSchema *expression.Schema) (LogicalPlan, error) {
// check the duplicate agnostic aggregate functions
if ok, newCols := o.isDuplicateAgnosticAgg(p); ok {
func (o *outerJoinEliminator) doOptimize(p LogicalPlan, aggCols []*expression.Column, parentCols []*expression.Column) (LogicalPlan, error) {
var err error
var isEliminated bool
for join, isJoin := p.(*LogicalJoin); isJoin; join, isJoin = p.(*LogicalJoin) {
p, isEliminated, err = o.tryToEliminateOuterJoin(join, aggCols, parentCols)
if err != nil {
return p, err
}
if !isEliminated {
break
}
}

switch x := p.(type) {
case *LogicalProjection:
parentCols = parentCols[:0]
for _, expr := range x.Exprs {
parentCols = append(parentCols, expression.ExtractColumns(expr)...)
}
case *LogicalAggregation:
parentCols = append(parentCols[:0], x.groupByCols...)
for _, aggDesc := range x.AggFuncs {
for _, expr := range aggDesc.Args {
parentCols = append(parentCols, expression.ExtractColumns(expr)...)
}
}
default:
parentCols = append(parentCols[:0], p.Schema().Columns...)
}

if ok, newCols := o.getDupAgnosticAggCols(p, aggCols); ok {
aggCols = newCols
}

newChildren := make([]LogicalPlan, 0, len(p.Children()))
for _, child := range p.Children() {
newChild, err := o.doOptimize(child, aggCols, p.Schema())
for i, child := range p.Children() {
newChild, err := o.doOptimize(child, aggCols, parentCols)
if err != nil {
return nil, err
}
newChildren = append(newChildren, newChild)
}
p.SetChildren(newChildren...)
join, isJoin := p.(*LogicalJoin)
if !isJoin {
return p, nil
p.SetChild(i, newChild)
}
return o.tryToEliminateOuterJoin(join, aggCols, parentSchema)
return p, nil
}

func (o *outerJoinEliminator) optimize(p LogicalPlan) (LogicalPlan, error) {
Expand Down

0 comments on commit aede000

Please sign in to comment.