Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner/core: make join reorder by dp work #8816

Merged
merged 24 commits into from
Apr 28, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func (er *expressionRewriter) handleInSubquery(v *ast.PatternInExpr) (ast.Node,
// We need to try to eliminate the agg and the projection produced by this operation.
er.b.optFlag |= flagEliminateAgg
er.b.optFlag |= flagEliminateProjection
er.b.optFlag |= flagJoinReOrderGreedy
er.b.optFlag |= flagJoinReOrder
// Build distinct for the inner query.
agg := er.b.buildDistinct(np, np.Schema().Len())
for _, col := range agg.schema.Columns {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (b *PlanBuilder) buildJoin(joinNode *ast.Join) (LogicalPlan, error) {
joinPlan.JoinType = RightOuterJoin
resetNotNullFlag(joinPlan.schema, 0, leftPlan.Schema().Len())
default:
b.optFlag = b.optFlag | flagJoinReOrderGreedy
b.optFlag = b.optFlag | flagJoinReOrder
joinPlan.JoinType = InnerJoin
}

Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ func (s *testPlanSuite) TestJoinReOrder(c *C) {

p, err := BuildLogicalPlan(s.ctx, stmt, s.is)
c.Assert(err, IsNil)
p, err = logicalOptimize(flagPredicatePushDown|flagJoinReOrderGreedy, p.(LogicalPlan))
p, err = logicalOptimize(flagPredicatePushDown|flagJoinReOrder, p.(LogicalPlan))
c.Assert(err, IsNil)
c.Assert(ToString(p), Equals, tt.best, Commentf("for %s", tt.sql))
}
Expand Down
2 changes: 1 addition & 1 deletion planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
flagPartitionProcessor
flagPushDownAgg
flagPushDownTopN
flagJoinReOrderGreedy
flagJoinReOrder
)

var optRuleList = []logicalOptRule{
Expand Down
29 changes: 15 additions & 14 deletions planner/core/rule_join_reorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,19 @@ func (s *joinReOrderSolver) optimizeRecursive(ctx sessionctx.Context, p LogicalP
ctx: ctx,
otherConds: otherConds,
}
groupSolver := &joinReorderGreedySingleGroupSolver{
baseSingleGroupJoinOrderSolver: baseGroupSolver,
eqEdges: eqEdges,
if len(curJoinGroup) > ctx.GetSessionVars().TiDBOptJoinOrderAlgoThreshold {
groupSolver := &joinReorderGreedySingleGroupSolver{
winoros marked this conversation as resolved.
Show resolved Hide resolved
baseSingleGroupJoinOrderSolver: baseGroupSolver,
eqEdges: eqEdges,
}
p, err = groupSolver.solve(curJoinGroup)
} else {
dpSolver := &joinReorderDPSolver{
baseSingleGroupJoinOrderSolver: baseGroupSolver,
}
dpSolver.newJoin = dpSolver.newJoinWithEdges
p, err = dpSolver.solve(curJoinGroup, expression.ScalarFuncs2Exprs(eqEdges))
}
p, err = groupSolver.solve(curJoinGroup)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -143,22 +151,15 @@ func (s *baseSingleGroupJoinOrderSolver) newCartesianJoin(lChild, rChild Logical
return join
}

func (s *baseSingleGroupJoinOrderSolver) newJoinWithEdges(eqEdges []*expression.ScalarFunction, remainedOtherConds []expression.Expression,
lChild, rChild LogicalPlan) (*LogicalJoin, []expression.Expression) {
func (s *baseSingleGroupJoinOrderSolver) newJoinWithEdges(lChild, rChild LogicalPlan, eqEdges []*expression.ScalarFunction, otherConds []expression.Expression) LogicalPlan {
newJoin := s.newCartesianJoin(lChild, rChild)
newJoin.EqualConditions = eqEdges
newJoin.OtherConditions = otherConds
for _, eqCond := range newJoin.EqualConditions {
newJoin.LeftJoinKeys = append(newJoin.LeftJoinKeys, eqCond.GetArgs()[0].(*expression.Column))
newJoin.RightJoinKeys = append(newJoin.RightJoinKeys, eqCond.GetArgs()[1].(*expression.Column))
}
for i := len(remainedOtherConds) - 1; i >= 0; i-- {
cols := expression.ExtractColumns(remainedOtherConds[i])
if newJoin.schema.ColumnsIndices(cols) != nil {
newJoin.OtherConditions = append(newJoin.OtherConditions, remainedOtherConds[i])
remainedOtherConds = append(remainedOtherConds[:i], remainedOtherConds[i+1:]...)
}
}
return newJoin, remainedOtherConds
return newJoin
}

// calcJoinCumCost calculates the cumulative cost of the join node.
Expand Down
168 changes: 133 additions & 35 deletions planner/core/rule_join_reorder_dp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,47 @@ import (

"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
)

type joinReorderDPSolver struct {
ctx sessionctx.Context
newJoin func(lChild, rChild LogicalPlan, eqConds []*expression.ScalarFunction) LogicalPlan
*baseSingleGroupJoinOrderSolver
newJoin func(lChild, rChild LogicalPlan, eqConds []*expression.ScalarFunction, otherConds []expression.Expression) LogicalPlan
}

type joinGroupEdge struct {
type joinGroupEqEdge struct {
nodeIDs []int
edge *expression.ScalarFunction
}

func (s *joinReorderDPSolver) solve(joinGroup []LogicalPlan, conds []expression.Expression) (LogicalPlan, error) {
adjacents := make([][]int, len(joinGroup))
totalEdges := make([]joinGroupEdge, 0, len(conds))
addEdge := func(node1, node2 int, edgeContent *expression.ScalarFunction) {
totalEdges = append(totalEdges, joinGroupEdge{
type joinGroupNonEqEdge struct {
nodeIDs []int
idMask uint
expr expression.Expression
winoros marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *joinReorderDPSolver) solve(joinGroup []LogicalPlan, eqConds []expression.Expression) (LogicalPlan, error) {
for _, node := range joinGroup {
_, err := node.recursiveDeriveStats()
if err != nil {
return nil, err
}
s.curJoinGroup = append(s.curJoinGroup, &jrNode{
p: node,
cumCost: s.baseNodeCumCost(node),
})
}
adjacents := make([][]int, len(s.curJoinGroup))
totalEqEdges := make([]joinGroupEqEdge, 0, len(eqConds))
addEqEdge := func(node1, node2 int, edgeContent *expression.ScalarFunction) {
totalEqEdges = append(totalEqEdges, joinGroupEqEdge{
nodeIDs: []int{node1, node2},
edge: edgeContent,
})
adjacents[node1] = append(adjacents[node1], node2)
adjacents[node2] = append(adjacents[node2], node1)
}
// Build Graph for join group
for _, cond := range conds {
for _, cond := range eqConds {
sf := cond.(*expression.ScalarFunction)
lCol := sf.GetArgs()[0].(*expression.Column)
rCol := sf.GetArgs()[1].(*expression.Column)
Expand All @@ -55,7 +70,26 @@ func (s *joinReorderDPSolver) solve(joinGroup []LogicalPlan, conds []expression.
if err != nil {
return nil, err
}
addEdge(lIdx, rIdx, sf)
addEqEdge(lIdx, rIdx, sf)
}
totalNonEqEdges := make([]joinGroupNonEqEdge, 0, len(s.otherConds))
for _, cond := range s.otherConds {
cols := expression.ExtractColumns(cond)
mask := uint(0)
ids := make([]int, 0, len(cols))
for _, col := range cols {
idx, err := findNodeIndexInGroup(joinGroup, col)
if err != nil {
return nil, err
}
ids = append(ids, idx)
mask |= 1 << uint(idx)
}
totalNonEqEdges = append(totalNonEqEdges, joinGroupNonEqEdge{
nodeIDs: ids,
idMask: mask,
expr: cond,
})
}
visited := make([]bool, len(joinGroup))
nodeID2VisitID := make([]int, len(joinGroup))
Expand All @@ -66,15 +100,37 @@ func (s *joinReorderDPSolver) solve(joinGroup []LogicalPlan, conds []expression.
continue
}
visitID2NodeID := s.bfsGraph(i, visited, adjacents, nodeID2VisitID)
nodeIDMask := uint(0)
for _, nodeID := range visitID2NodeID {
nodeIDMask |= 1 << uint(nodeID)
}
var subNonEqEdges []joinGroupNonEqEdge
for i := len(totalNonEqEdges) - 1; i >= 0; i-- {
// If this edge is not the subset of the current sub graph.
if totalNonEqEdges[i].idMask&nodeIDMask != totalNonEqEdges[i].idMask {
continue
}
newMask := uint(0)
for _, nodeID := range totalNonEqEdges[i].nodeIDs {
newMask |= 1 << uint(nodeID2VisitID[nodeID])
}
totalNonEqEdges[i].idMask = newMask
subNonEqEdges = append(subNonEqEdges, totalNonEqEdges[i])
totalNonEqEdges = append(totalNonEqEdges[:i], totalNonEqEdges[i+1:]...)
}
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
// Do DP on each sub graph.
join, err := s.dpGraph(visitID2NodeID, nodeID2VisitID, joinGroup, totalEdges)
join, err := s.dpGraph(visitID2NodeID, nodeID2VisitID, joinGroup, totalEqEdges, subNonEqEdges)
if err != nil {
return nil, err
}
joins = append(joins, join)
}
remainedOtherConds := make([]expression.Expression, 0, len(totalNonEqEdges))
for _, edge := range totalNonEqEdges {
remainedOtherConds = append(remainedOtherConds, edge.expr)
}
// Build bushy tree for cartesian joins.
return s.makeBushyJoin(joins), nil
return s.makeBushyJoin(joins, remainedOtherConds), nil
}

// bfsGraph bfs a sub graph starting at startPos. And relabel its label for future use.
Expand All @@ -98,13 +154,13 @@ func (s *joinReorderDPSolver) bfsGraph(startNode int, visited []bool, adjacents
return visitID2NodeID
}

func (s *joinReorderDPSolver) dpGraph(newPos2OldPos, oldPos2NewPos []int, joinGroup []LogicalPlan, totalEdges []joinGroupEdge) (LogicalPlan, error) {
func (s *joinReorderDPSolver) dpGraph(newPos2OldPos, oldPos2NewPos []int, joinGroup []LogicalPlan,
winoros marked this conversation as resolved.
Show resolved Hide resolved
totalEqEdges []joinGroupEqEdge, totalNonEqEdges []joinGroupNonEqEdge) (LogicalPlan, error) {
nodeCnt := uint(len(newPos2OldPos))
bestPlan := make([]LogicalPlan, 1<<nodeCnt)
bestCost := make([]int64, 1<<nodeCnt)
bestPlan := make([]*jrNode, 1<<nodeCnt)
// bestPlan[s] is nil can be treated as bestCost[s] = +inf.
for i := uint(0); i < nodeCnt; i++ {
bestPlan[1<<i] = joinGroup[newPos2OldPos[i]]
bestPlan[1<<i] = s.curJoinGroup[newPos2OldPos[i]]
}
// Enumerate the nodeBitmap from small to big, make sure that S1 must be enumerated before S2 if S1 belongs to S2.
for nodeBitmap := uint(1); nodeBitmap < (1 << nodeCnt); nodeBitmap++ {
Expand All @@ -122,38 +178,58 @@ func (s *joinReorderDPSolver) dpGraph(newPos2OldPos, oldPos2NewPos []int, joinGr
continue
}
// Get the edge connecting the two parts.
usedEdges := s.nodesAreConnected(sub, remain, oldPos2NewPos, totalEdges)
usedEdges, otherConds := s.nodesAreConnected(sub, remain, oldPos2NewPos, totalEqEdges, totalNonEqEdges)
// Here we only check equal condition currently.
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
if len(usedEdges) == 0 {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
continue
}
join, err := s.newJoinWithEdge(bestPlan[sub], bestPlan[remain], usedEdges)
join, err := s.newJoinWithEdge(bestPlan[sub].p, bestPlan[remain].p, usedEdges, otherConds)
if err != nil {
return nil, err
}
if bestPlan[nodeBitmap] == nil || bestCost[nodeBitmap] > join.statsInfo().Count()+bestCost[remain]+bestCost[sub] {
bestPlan[nodeBitmap] = join
bestCost[nodeBitmap] = join.statsInfo().Count() + bestCost[remain] + bestCost[sub]
curCost := s.calcJoinCumCost(join, bestPlan[sub], bestPlan[remain])
if bestPlan[nodeBitmap] == nil {
bestPlan[nodeBitmap] = &jrNode{
p: join,
cumCost: curCost,
}
} else if bestPlan[nodeBitmap].cumCost > curCost {
bestPlan[nodeBitmap].p = join
bestPlan[nodeBitmap].cumCost = curCost
}
}
}
return bestPlan[(1<<nodeCnt)-1], nil
return bestPlan[(1<<nodeCnt)-1].p, nil
}

func (s *joinReorderDPSolver) nodesAreConnected(leftMask, rightMask uint, oldPos2NewPos []int, totalEdges []joinGroupEdge) []joinGroupEdge {
var usedEdges []joinGroupEdge
for _, edge := range totalEdges {
func (s *joinReorderDPSolver) nodesAreConnected(leftMask, rightMask uint, oldPos2NewPos []int,
totalEqEdges []joinGroupEqEdge, totalNonEqEdges []joinGroupNonEqEdge) ([]joinGroupEqEdge, []expression.Expression) {
var (
usedEqEdges []joinGroupEqEdge
otherConds []expression.Expression
)
for _, edge := range totalEqEdges {
lIdx := uint(oldPos2NewPos[edge.nodeIDs[0]])
rIdx := uint(oldPos2NewPos[edge.nodeIDs[1]])
if (leftMask&(1<<lIdx)) > 0 && (rightMask&(1<<rIdx)) > 0 {
usedEdges = append(usedEdges, edge)
} else if (leftMask&(1<<rIdx)) > 0 && (rightMask&(1<<lIdx)) > 0 {
usedEdges = append(usedEdges, edge)
if ((leftMask&(1<<lIdx)) > 0 && (rightMask&(1<<rIdx)) > 0) || ((leftMask&(1<<rIdx)) > 0 && (rightMask&(1<<lIdx)) > 0) {
usedEqEdges = append(usedEqEdges, edge)
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
}
}
for _, edge := range totalNonEqEdges {
// If the result is false, means that the current group hasn't covered the columns involved in the expression.
if edge.idMask&(leftMask|rightMask) != edge.idMask {
continue
}
// Check whether this expression is only built from one side of the join.
if edge.idMask&leftMask == 0 || edge.idMask&rightMask == 0 {
continue
}
otherConds = append(otherConds, edge.expr)
}
return usedEdges
return usedEqEdges, otherConds
}

func (s *joinReorderDPSolver) newJoinWithEdge(leftPlan, rightPlan LogicalPlan, edges []joinGroupEdge) (LogicalPlan, error) {
func (s *joinReorderDPSolver) newJoinWithEdge(leftPlan, rightPlan LogicalPlan, edges []joinGroupEqEdge, otherConds []expression.Expression) (LogicalPlan, error) {
var eqConds []*expression.ScalarFunction
for _, edge := range edges {
lCol := edge.edge.GetArgs()[0].(*expression.Column)
Expand All @@ -165,21 +241,32 @@ func (s *joinReorderDPSolver) newJoinWithEdge(leftPlan, rightPlan LogicalPlan, e
eqConds = append(eqConds, newSf)
}
}
join := s.newJoin(leftPlan, rightPlan, eqConds)
join := s.newJoin(leftPlan, rightPlan, eqConds, otherConds)
_, err := join.recursiveDeriveStats()
return join, err
}

// Make cartesian join as bushy tree.
func (s *joinReorderDPSolver) makeBushyJoin(cartesianJoinGroup []LogicalPlan) LogicalPlan {
func (s *joinReorderDPSolver) makeBushyJoin(cartesianJoinGroup []LogicalPlan, otherConds []expression.Expression) LogicalPlan {
for len(cartesianJoinGroup) > 1 {
resultJoinGroup := make([]LogicalPlan, 0, len(cartesianJoinGroup))
for i := 0; i < len(cartesianJoinGroup); i += 2 {
if i+1 == len(cartesianJoinGroup) {
resultJoinGroup = append(resultJoinGroup, cartesianJoinGroup[i])
break
}
resultJoinGroup = append(resultJoinGroup, s.newJoin(cartesianJoinGroup[i], cartesianJoinGroup[i+1], nil))
// TODO:Since the other condition may involve no less than two tables, e.g. t1.a = t2.b+t3.c.
winoros marked this conversation as resolved.
Show resolved Hide resolved
// So We'll need a extra stage to deal with it.
// Currently, we just add it when building cartesianJoinGroup.
mergedSchema := expression.MergeSchema(cartesianJoinGroup[i].Schema(), cartesianJoinGroup[i+1].Schema())
var usedOtherConds []expression.Expression
for i := len(otherConds) - 1; i >= 0; i-- {
winoros marked this conversation as resolved.
Show resolved Hide resolved
if expression.ExprFromSchema(otherConds[i], mergedSchema) {
usedOtherConds = append(usedOtherConds, otherConds[i])
otherConds = append(otherConds[:i], otherConds[i+1:]...)
}
}
resultJoinGroup = append(resultJoinGroup, s.newJoin(cartesianJoinGroup[i], cartesianJoinGroup[i+1], nil, usedOtherConds))
}
cartesianJoinGroup = resultJoinGroup
}
Expand All @@ -194,3 +281,14 @@ func findNodeIndexInGroup(group []LogicalPlan, col *expression.Column) (int, err
}
return -1, ErrUnknownColumn.GenWithStackByArgs(col, "JOIN REORDER RULE")
}

func (s *joinReorderDPSolver) newJoinWithConds(leftPlan, rightPlan LogicalPlan, eqConds []*expression.ScalarFunction, otherConds []expression.Expression) LogicalPlan {
join := s.newCartesianJoin(leftPlan, rightPlan)
join.EqualConditions = eqConds
join.OtherConditions = otherConds
for _, eqCond := range join.EqualConditions {
join.LeftJoinKeys = append(join.LeftJoinKeys, eqCond.GetArgs()[0].(*expression.Column))
join.RightJoinKeys = append(join.RightJoinKeys, eqCond.GetArgs()[1].(*expression.Column))
}
return join
}
Loading