Skip to content

Commit

Permalink
Revert "fix unnecessary merge scope in broadcast join (#17778)" (#17787)
Browse files Browse the repository at this point in the history
先revert引入问题的pr,再继续调查

Approved by: @ouyuanning
  • Loading branch information
badboynt1 authored Jul 30, 2024
1 parent 5a98c53 commit c37e5be
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 26 deletions.
57 changes: 31 additions & 26 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -2450,6 +2450,10 @@ func (c *Compile) compileBroadcastJoin(node, left, right *plan.Node, ns []*plan.
leftTyps[i] = dupType(&expr.Typ)
}

if plan2.IsShuffleChildren(left, ns) {
probeScopes = c.mergeShuffleJoinScopeList(probeScopes)
}

switch node.JoinType {
case plan.Node_INNER:
rs = c.newBroadcastJoinScopeList(probeScopes, buildScopes, node)
Expand Down Expand Up @@ -3677,29 +3681,20 @@ func (c *Compile) newJoinScopeListWithBucket(rs, left, right []*Scope, n *plan.N
return rs
}

func findScopeByAddr(addr string, rs []*Scope) *Scope {
for i := range rs {
if isSameCN(rs[i].NodeInfo.Addr, addr) {
return rs[i]
}
}
return nil
}

func (c *Compile) newBroadcastJoinScopeList(probeScopes []*Scope, buildScopes []*Scope, n *plan.Node) []*Scope {
rs := make([]*Scope, 0, 1)
length := len(probeScopes)
rs := make([]*Scope, length)
idx := 0
for i := range probeScopes {
s := findScopeByAddr(probeScopes[i].NodeInfo.Addr, rs)
if s == nil {
s = newScope(Remote)
s.IsJoin = true
s.NodeInfo = probeScopes[i].NodeInfo
s.NodeInfo.Mcpu = c.generateCPUNumber(ncpu, int(n.Stats.BlockNum))
s.BuildIdx = 1
s.Proc = process.NewFromProc(c.proc, c.proc.Ctx, 2)
rs = append(rs, s)
}
s.PreScopes = append(s.PreScopes, probeScopes[i])
rs[i] = newScope(Remote)
rs[i].IsJoin = true
rs[i].NodeInfo = probeScopes[i].NodeInfo
rs[i].BuildIdx = 1
if isSameCN(rs[i].NodeInfo.Addr, c.addr) {
idx = i
}
rs[i].PreScopes = []*Scope{probeScopes[i]}
rs[i].Proc = process.NewFromProc(c.proc, c.proc.Ctx, 2)
probeScopes[i].setRootOperator(
connector.NewArgument().
WithReg(rs[i].Proc.Reg.MergeReceivers[0]))
Expand All @@ -3712,13 +3707,10 @@ func (c *Compile) newBroadcastJoinScopeList(probeScopes []*Scope, buildScopes []
} else {
c.anal.isFirst = false
mergeChildren := c.newMergeScope(buildScopes)

mergeChildren.setRootOperator(constructDispatch(1, rs, c.addr, n, false))
mergeChildren.IsEnd = true
for i := range rs {
if isSameCN(rs[i].NodeInfo.Addr, c.addr) {
rs[i].PreScopes = append(rs[i].PreScopes, mergeChildren)
}
}
rs[idx].PreScopes = append(rs[idx].PreScopes, mergeChildren)
}

for i := range rs {
Expand All @@ -3729,6 +3721,19 @@ func (c *Compile) newBroadcastJoinScopeList(probeScopes []*Scope, buildScopes []
return rs
}

func (c *Compile) mergeShuffleJoinScopeList(child []*Scope) []*Scope {
lenCN := len(c.cnList)
dop := len(child) / lenCN
mergeScope := make([]*Scope, 0, lenCN)
for i, n := range c.cnList {
start := i * dop
end := start + dop
ss := child[start:end]
mergeScope = append(mergeScope, c.newMergeRemoteScope(ss, n))
}
return mergeScope
}

func (c *Compile) newShuffleJoinScopeList(left, right []*Scope, n *plan.Node) ([]*Scope, []*Scope) {
single := len(c.cnList) <= 1
if single {
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/plan/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,15 @@ func shouldUseShuffleRanges(s *pb.ShuffleRange) []float64 {
}
return s.Result
}

func IsShuffleChildren(n *plan.Node, ns []*plan.Node) bool {
switch n.NodeType {
case plan.Node_JOIN:
if n.Stats.HashmapStats.Shuffle {
return true
}
case plan.Node_FILTER:
return IsShuffleChildren(ns[n.Children[0]], ns)
}
return false
}

0 comments on commit c37e5be

Please sign in to comment.