Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: fix bugs and make it more effective in outer join elimination rule. (#11160) #11377

Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions cmd/explaintest/r/explain_complex.result
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,62 @@ HashAgg_34 72000.00 root group by:col_1, funcs:sum(col_0)
│ └─TableScan_58 10000.00 cop table:tbl_008, range:[-inf,+inf], keep order:false, stats:pseudo
└─TableReader_62 10000.00 root data:TableScan_61
└─TableScan_61 10000.00 cop table:tbl_009, range:[-inf,+inf], keep order:false, stats:pseudo
CREATE TABLE org_department (
id int(11) NOT NULL AUTO_INCREMENT,
ctx int(11) DEFAULT '0' COMMENT 'organization id',
name varchar(128) DEFAULT NULL,
left_value int(11) DEFAULT NULL,
right_value int(11) DEFAULT NULL,
depth int(11) DEFAULT NULL,
leader_id bigint(20) DEFAULT NULL,
status int(11) DEFAULT '1000',
created_on datetime DEFAULT NULL,
updated_on datetime DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY org_department_id_uindex (id),
KEY org_department_leader_id_index (leader_id),
KEY org_department_ctx_index (ctx)
);
CREATE TABLE org_position (
id int(11) NOT NULL AUTO_INCREMENT,
ctx int(11) DEFAULT NULL,
name varchar(128) DEFAULT NULL,
left_value int(11) DEFAULT NULL,
right_value int(11) DEFAULT NULL,
depth int(11) DEFAULT NULL,
department_id int(11) DEFAULT NULL,
status int(2) DEFAULT NULL,
created_on datetime DEFAULT NULL,
updated_on datetime DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY org_position_id_uindex (id),
KEY org_position_department_id_index (department_id)
) ENGINE=InnoDB AUTO_INCREMENT=22 DEFAULT CHARSET=utf8;
CREATE TABLE org_employee_position (
hotel_id int(11) DEFAULT NULL,
user_id bigint(20) DEFAULT NULL,
position_id int(11) DEFAULT NULL,
status int(11) DEFAULT NULL,
created_on datetime DEFAULT NULL,
updated_on datetime DEFAULT NULL,
UNIQUE KEY org_employee_position_pk (hotel_id,user_id,position_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
explain SELECT d.id, d.ctx, d.name, d.left_value, d.right_value, d.depth, d.leader_id, d.status, d.created_on, d.updated_on FROM org_department AS d LEFT JOIN org_position AS p ON p.department_id = d.id AND p.status = 1000 LEFT JOIN org_employee_position AS ep ON ep.position_id = p.id AND ep.status = 1000 WHERE (d.ctx = 1 AND (ep.user_id = 62 OR d.id = 20 OR d.id = 20) AND d.status = 1000) GROUP BY d.id ORDER BY d.left_value;
id count task operator info
Sort_10 1.00 root test.d.left_value:asc
└─HashAgg_15 1.00 root group by:test.d.id, funcs:firstrow(test.d.id), firstrow(test.d.ctx), firstrow(test.d.name), firstrow(test.d.left_value), firstrow(test.d.right_value), firstrow(test.d.depth), firstrow(test.d.leader_id), firstrow(test.d.status), firstrow(test.d.created_on), firstrow(test.d.updated_on)
└─Selection_20 0.01 root or(eq(test.ep.user_id, 62), or(eq(test.d.id, 20), eq(test.d.id, 20)))
└─HashLeftJoin_21 0.02 root left outer join, inner:TableReader_55, equal:[eq(test.p.id, test.ep.position_id)]
├─IndexJoin_29 0.01 root left outer join, inner:IndexLookUp_28, outer key:test.d.id, inner key:test.p.department_id
│ ├─IndexLookUp_45 0.01 root
│ │ ├─IndexScan_42 10.00 cop table:d, index:ctx, range:[1,1], keep order:false, stats:pseudo
│ │ └─Selection_44 0.01 cop eq(test.d.status, 1000)
│ │ └─TableScan_43 10.00 cop table:org_department, keep order:false, stats:pseudo
│ └─IndexLookUp_28 0.01 root
│ ├─Selection_26 9.99 cop not(isnull(test.p.department_id))
│ │ └─IndexScan_24 10.00 cop table:p, index:department_id, range: decided by [eq(test.p.department_id, test.d.id)], keep order:false, stats:pseudo
│ └─Selection_27 0.01 cop eq(test.p.status, 1000)
│ └─TableScan_25 9.99 cop table:org_position, keep order:false, stats:pseudo
└─TableReader_55 9.99 root data:Selection_54
└─Selection_54 9.99 cop eq(test.ep.status, 1000), not(isnull(test.ep.position_id))
└─TableScan_53 10000.00 cop table:ep, range:[-inf,+inf], keep order:false, stats:pseudo
43 changes: 43 additions & 0 deletions cmd/explaintest/t/explain_complex.test
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,46 @@ CREATE TABLE `tbl_008` (`a` int, `b` int);
CREATE TABLE `tbl_009` (`a` int, `b` int);

explain select sum(a) from (select * from tbl_001 union all select * from tbl_002 union all select * from tbl_003 union all select * from tbl_004 union all select * from tbl_005 union all select * from tbl_006 union all select * from tbl_007 union all select * from tbl_008 union all select * from tbl_009) x group by b;

CREATE TABLE org_department (
id int(11) NOT NULL AUTO_INCREMENT,
ctx int(11) DEFAULT '0' COMMENT 'organization id',
name varchar(128) DEFAULT NULL,
left_value int(11) DEFAULT NULL,
right_value int(11) DEFAULT NULL,
depth int(11) DEFAULT NULL,
leader_id bigint(20) DEFAULT NULL,
status int(11) DEFAULT '1000',
created_on datetime DEFAULT NULL,
updated_on datetime DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY org_department_id_uindex (id),
KEY org_department_leader_id_index (leader_id),
KEY org_department_ctx_index (ctx)
);
CREATE TABLE org_position (
id int(11) NOT NULL AUTO_INCREMENT,
ctx int(11) DEFAULT NULL,
name varchar(128) DEFAULT NULL,
left_value int(11) DEFAULT NULL,
right_value int(11) DEFAULT NULL,
depth int(11) DEFAULT NULL,
department_id int(11) DEFAULT NULL,
status int(2) DEFAULT NULL,
created_on datetime DEFAULT NULL,
updated_on datetime DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY org_position_id_uindex (id),
KEY org_position_department_id_index (department_id)
) ENGINE=InnoDB AUTO_INCREMENT=22 DEFAULT CHARSET=utf8;
CREATE TABLE org_employee_position (
hotel_id int(11) DEFAULT NULL,
user_id bigint(20) DEFAULT NULL,
position_id int(11) DEFAULT NULL,
status int(11) DEFAULT NULL,
created_on datetime DEFAULT NULL,
updated_on datetime DEFAULT NULL,
UNIQUE KEY org_employee_position_pk (hotel_id,user_id,position_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

explain SELECT d.id, d.ctx, d.name, d.left_value, d.right_value, d.depth, d.leader_id, d.status, d.created_on, d.updated_on FROM org_department AS d LEFT JOIN org_position AS p ON p.department_id = d.id AND p.status = 1000 LEFT JOIN org_employee_position AS ep ON ep.position_id = p.id AND ep.status = 1000 WHERE (d.ctx = 1 AND (ep.user_id = 62 OR d.id = 20 OR d.id = 20) AND d.status = 1000) GROUP BY d.id ORDER BY d.left_value;
16 changes: 15 additions & 1 deletion planner/core/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2066,7 +2066,21 @@ func (s *testPlanSuite) TestOuterJoinEliminator(c *C) {
// For complex join query
{
sql: "select max(t3.b) from (t t1 left join t t2 on t1.a = t2.a) right join t t3 on t1.b = t3.b",
best: "DataScan(t3)->TopN([test.t3.b true],0,1)->Aggr(max(test.t3.b))->Projection",
best: "Join{Join{DataScan(t1)->DataScan(t2)}(test.t1.a,test.t2.a)->DataScan(t3)->TopN([test.t3.b true],0,1)}(test.t1.b,test.t3.b)->TopN([test.t3.b true],0,1)->Aggr(max(test.t3.b))->Projection",
},
{
sql: "select t1.a ta, t1.b tb from t t1 left join t t2 on t1.a = t2.a",
best: "DataScan(t1)->Projection",
},
{
// Because the `order by` uses t2.a, the `join` can't be eliminated.
sql: "select t1.a, t1.b from t t1 left join t t2 on t1.a = t2.a order by t2.a",
best: "Join{DataScan(t1)->DataScan(t2)}(test.t1.a,test.t2.a)->Sort->Projection",
},
// For issue 11167
{
sql: "select a.a from t a natural left join t b natural left join t c",
best: "DataScan(a)->Projection",
},
}

Expand Down
13 changes: 13 additions & 0 deletions planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ type LogicalPlan interface {

// SetChildren sets the children for the plan.
SetChildren(...LogicalPlan)

// SetChild sets the ith child for the plan.
SetChild(i int, child LogicalPlan)
}

// PhysicalPlan is a tree of the physical operators.
Expand Down Expand Up @@ -296,6 +299,16 @@ func (p *basePhysicalPlan) SetChildren(children ...PhysicalPlan) {
p.children = children
}

// SetChild implements LogicalPlan SetChild interface.
func (p *baseLogicalPlan) SetChild(i int, child LogicalPlan) {
p.children[i] = child
}

// SetChild implements PhysicalPlan SetChild interface.
func (p *basePhysicalPlan) SetChild(i int, child PhysicalPlan) {
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
p.children[i] = child
}

func (p *basePlan) context() sessionctx.Context {
return p.ctx
}
Expand Down
153 changes: 90 additions & 63 deletions planner/core/rule_join_elimination.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core
import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/util/set"
)

type outerJoinEliminator struct {
Expand All @@ -28,40 +29,50 @@ type outerJoinEliminator struct {
// 2. outer join elimination with duplicate agnostic aggregate functions: For example left outer join.
// If the parent only use the columns from left table with 'distinct' label. The left outer join can
// be eliminated.
func (o *outerJoinEliminator) tryToEliminateOuterJoin(p *LogicalJoin, aggCols []*expression.Column, parentSchema *expression.Schema) (LogicalPlan, error) {
func (o *outerJoinEliminator) tryToEliminateOuterJoin(p *LogicalJoin, aggCols []*expression.Column, parentCols []*expression.Column) (LogicalPlan, bool, error) {
var innerChildIdx int
switch p.JoinType {
case LeftOuterJoin:
innerChildIdx = 1
case RightOuterJoin:
innerChildIdx = 0
default:
return p, nil
return p, false, nil
}

outerPlan := p.children[1^innerChildIdx]
innerPlan := p.children[innerChildIdx]
outerUniqueIDs := set.NewInt64Set()
for _, outerCol := range outerPlan.Schema().Columns {
outerUniqueIDs.Insert(outerCol.UniqueID)
}
matched := o.isColsAllFromOuterTable(parentCols, outerUniqueIDs)
if !matched {
return p, false, nil
}
// outer join elimination with duplicate agnostic aggregate functions
matched, err := o.isAggColsAllFromOuterTable(outerPlan, aggCols)
if err != nil || matched {
return outerPlan, err
matched = o.isColsAllFromOuterTable(aggCols, outerUniqueIDs)
if matched {
return outerPlan, true, nil
}
// outer join elimination without duplicate agnostic aggregate functions
matched, err = o.isParentColsAllFromOuterTable(outerPlan, parentSchema)
if err != nil || !matched {
return p, err
}
innerJoinKeys := o.extractInnerJoinKeys(p, innerChildIdx)
contain, err := o.isInnerJoinKeysContainUniqueKey(innerPlan, innerJoinKeys)
if err != nil || contain {
return outerPlan, err
if err != nil {
return p, false, err
}
if contain {
return outerPlan, true, nil
}
contain, err = o.isInnerJoinKeysContainIndex(innerPlan, innerJoinKeys)
if err != nil || contain {
return outerPlan, err
if err != nil {
return p, false, err
}
if contain {
return outerPlan, true, nil
}

return p, nil
return p, false, nil
}

// extract join keys as a schema for inner child of a outer join
Expand All @@ -73,33 +84,20 @@ func (o *outerJoinEliminator) extractInnerJoinKeys(join *LogicalJoin, innerChild
return expression.NewSchema(joinKeys...)
}

func (o *outerJoinEliminator) isAggColsAllFromOuterTable(outerPlan LogicalPlan, aggCols []*expression.Column) (bool, error) {
if len(aggCols) == 0 {
return false, nil
}
for _, col := range aggCols {
columnName := &ast.ColumnName{Schema: col.DBName, Table: col.TblName, Name: col.ColName}
c, err := outerPlan.Schema().FindColumn(columnName)
if err != nil || c == nil {
return false, err
}
}
return true, nil
}

// check whether schema cols of join's parent plan are all from outer join table
func (o *outerJoinEliminator) isParentColsAllFromOuterTable(outerPlan LogicalPlan, parentSchema *expression.Schema) (bool, error) {
if parentSchema == nil {
return false, nil
}
for _, col := range parentSchema.Columns {
columnName := &ast.ColumnName{Schema: col.DBName, Table: col.TblName, Name: col.ColName}
c, err := outerPlan.Schema().FindColumn(columnName)
if err != nil || c == nil {
return false, err
// check whether the cols all from outer plan
func (o *outerJoinEliminator) isColsAllFromOuterTable(cols []*expression.Column, outerUniqueIDs set.Int64Set) bool {
// There are two cases "return false" here:
// 1. If cols represents aggCols, then "len(cols) == 0" means not all aggregate functions are duplicate agnostic before.
// 2. If cols represents parentCols, then "len(cols) == 0" means no parent logical plan of this join plan.
if len(cols) == 0 {
return false
}
for _, col := range cols {
if !outerUniqueIDs.Exist(col.UniqueID) {
return false
}
}
return true, nil
return true
}

// check whether one of unique keys sets is contained by inner join keys
Expand Down Expand Up @@ -157,52 +155,81 @@ func (o *outerJoinEliminator) isInnerJoinKeysContainIndex(innerPlan LogicalPlan,
return false, nil
}

// Check whether a LogicalPlan is a LogicalAggregation and its all aggregate functions is duplicate agnostic.
// Also, check all the args are expression.Column.
func (o *outerJoinEliminator) isDuplicateAgnosticAgg(p LogicalPlan) (_ bool, cols []*expression.Column) {
// getDupAgnosticAggCols checks whether a LogicalPlan is LogicalAggregation.
// It extracts all the columns from the duplicate agnostic aggregate functions.
// The returned column set is nil if not all the aggregate functions are duplicate agnostic.
// Only the following functions are considered to be duplicate agnostic:
// 1. MAX(arg)
// 2. MIN(arg)
// 3. FIRST_ROW(arg)
// 4. Other agg functions with DISTINCT flag, like SUM(DISTINCT arg)
func (o *outerJoinEliminator) getDupAgnosticAggCols(
p LogicalPlan,
oldAggCols []*expression.Column, // Reuse the original buffer.
) (isAgg bool, newAggCols []*expression.Column) {
agg, ok := p.(*LogicalAggregation)
if !ok {
return false, nil
}
cols = agg.groupByCols
newAggCols = oldAggCols[:0]
for _, aggDesc := range agg.AggFuncs {
if !aggDesc.HasDistinct &&
aggDesc.Name != ast.AggFuncFirstRow &&
aggDesc.Name != ast.AggFuncMax &&
aggDesc.Name != ast.AggFuncMin {
return false, nil
// If not all aggregate functions are duplicate agnostic,
// we should clean the aggCols, so `return true, newAggCols[:0]`.
return true, newAggCols[:0]
}
for _, expr := range aggDesc.Args {
if col, ok := expr.(*expression.Column); ok {
cols = append(cols, col)
} else {
return false, nil
}
newAggCols = append(newAggCols, expression.ExtractColumns(expr)...)
}
}
return true, cols
return true, newAggCols
}

func (o *outerJoinEliminator) doOptimize(p LogicalPlan, aggCols []*expression.Column, parentSchema *expression.Schema) (LogicalPlan, error) {
// check the duplicate agnostic aggregate functions
if ok, newCols := o.isDuplicateAgnosticAgg(p); ok {
func (o *outerJoinEliminator) doOptimize(p LogicalPlan, aggCols []*expression.Column, parentCols []*expression.Column) (LogicalPlan, error) {
var err error
var isEliminated bool
for join, isJoin := p.(*LogicalJoin); isJoin; join, isJoin = p.(*LogicalJoin) {
p, isEliminated, err = o.tryToEliminateOuterJoin(join, aggCols, parentCols)
if err != nil {
return p, err
}
if !isEliminated {
break
}
}

switch x := p.(type) {
case *LogicalProjection:
parentCols = parentCols[:0]
for _, expr := range x.Exprs {
parentCols = append(parentCols, expression.ExtractColumns(expr)...)
}
case *LogicalAggregation:
parentCols = append(parentCols[:0], x.groupByCols...)
for _, aggDesc := range x.AggFuncs {
for _, expr := range aggDesc.Args {
parentCols = append(parentCols, expression.ExtractColumns(expr)...)
}
}
default:
parentCols = append(parentCols[:0], p.Schema().Columns...)
}

if ok, newCols := o.getDupAgnosticAggCols(p, aggCols); ok {
aggCols = newCols
}

newChildren := make([]LogicalPlan, 0, len(p.Children()))
for _, child := range p.Children() {
newChild, err := o.doOptimize(child, aggCols, p.Schema())
for i, child := range p.Children() {
newChild, err := o.doOptimize(child, aggCols, parentCols)
if err != nil {
return nil, err
}
newChildren = append(newChildren, newChild)
}
p.SetChildren(newChildren...)
join, isJoin := p.(*LogicalJoin)
if !isJoin {
return p, nil
p.SetChild(i, newChild)
}
return o.tryToEliminateOuterJoin(join, aggCols, parentSchema)
return p, nil
}

func (o *outerJoinEliminator) optimize(p LogicalPlan) (LogicalPlan, error) {
Expand Down