Skip to content

Commit

Permalink
remove pipeline in shuffle join probe side (#18040)
Browse files Browse the repository at this point in the history
移除shuffle join probe端一个冗余的merge scope。

Approved by: @ouyuanning
  • Loading branch information
badboynt1 authored Aug 11, 2024
1 parent 94af48c commit 787bb7b
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 30 deletions.
26 changes: 1 addition & 25 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ import (
const (
DistributedThreshold uint64 = 10 * mpool.MB
SingleLineSizeEstimate uint64 = 300 * mpool.B
shuffleChannelBufferSize = 16
shuffleChannelBufferSize = 32

NoAccountId = -1
)
Expand Down Expand Up @@ -3682,30 +3682,6 @@ func (c *Compile) newShuffleJoinScopeList(left, right []*Scope, n *plan.Node) ([
return parent, children
}

func (c *Compile) newJoinProbeScopeWithBidx(s *Scope) *Scope {
rs := newScope(Merge)
mergeOp := merge.NewArgument()
mergeOp.SetIdx(vm.GetLeafOp(s.RootOp).GetOperatorBase().GetIdx())
mergeOp.SetIsFirst(true)
rs.setRootOperator(mergeOp)
rs.Proc = s.Proc.NewContextChildProc(s.BuildIdx)
for i := 0; i < s.BuildIdx; i++ {
regTransplant(s, rs, i, i)
}

s.Proc.Reg.MergeReceivers[0] = &process.WaitRegister{
Ctx: s.Proc.Ctx,
Ch: make(chan *process.RegisterMessage, shuffleChannelBufferSize),
}
rs.setRootOperator(
connector.NewArgument().
WithReg(s.Proc.Reg.MergeReceivers[0]),
)
s.Proc.Reg.MergeReceivers = s.Proc.Reg.MergeReceivers[:1]
rs.IsEnd = true
return rs
}

func (c *Compile) newBroadcastJoinProbeScope(s *Scope, ss []*Scope) *Scope {
rs := newScope(Merge)
mergeOp := merge.NewArgument()
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,7 @@ func buildJoinParallelRun(s *Scope, c *Compile) (*Scope, error) {
if s.ShuffleIdx > 0 { //shuffle join
buildScope := c.newJoinBuildScope(s, 1)
s.PreScopes = append(s.PreScopes, buildScope)
if s.BuildIdx > 1 {
probeScope := c.newJoinProbeScopeWithBidx(s)
s.PreScopes = append(s.PreScopes, probeScope)
}
s.Proc.Reg.MergeReceivers = s.Proc.Reg.MergeReceivers[:1]
s.Proc.Reg.MergeReceivers = s.Proc.Reg.MergeReceivers[:s.BuildIdx]
return s, nil
}

Expand Down

0 comments on commit 787bb7b

Please sign in to comment.