Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: refactor joinResultGenerator to handle the unmatched outer records #7288

Merged
merged 15 commits into from
Aug 8, 2018
10 changes: 5 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func (b *executorBuilder) buildMergeJoin(v *plan.PhysicalMergeJoin) Executor {
e := &MergeJoinExec{
stmtCtx: b.ctx.GetSessionVars().StmtCtx,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), leftExec, rightExec),
resultGenerator: newJoinResultGenerator(b.ctx, v.JoinType, v.JoinType == plan.RightOuterJoin,
resultGenerator: newRecordJoiner(b.ctx, v.JoinType, v.JoinType == plan.RightOuterJoin,
defaultValues, v.OtherConditions,
leftExec.retTypes(), rightExec.retTypes()),
}
Expand Down Expand Up @@ -818,9 +818,9 @@ func (b *executorBuilder) buildHashJoin(v *plan.PhysicalHashJoin) Executor {
defaultValues = make([]types.Datum, e.innerExec.Schema().Len())
}
}
e.resultGenerators = make([]joinResultGenerator, e.concurrency)
e.resultGenerators = make([]recordJoiner, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.resultGenerators[i] = newJoinResultGenerator(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,
e.resultGenerators[i] = newRecordJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,
v.OtherConditions, lhsTypes, rhsTypes)
}
metrics.ExecutorCounter.WithLabelValues("HashJoinExec").Inc()
Expand Down Expand Up @@ -1153,7 +1153,7 @@ func (b *executorBuilder) buildApply(apply *plan.PhysicalApply) *NestedLoopApply
if defaultValues == nil {
defaultValues = make([]types.Datum, v.Children()[v.InnerChildIdx].Schema().Len())
}
generator := newJoinResultGenerator(b.ctx, v.JoinType, v.InnerChildIdx == 0,
generator := newRecordJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0,
defaultValues, otherConditions, leftChild.retTypes(), rightChild.retTypes())
outerExec, innerExec := leftChild, rightChild
outerFilter, innerFilter := v.LeftConditions, v.RightConditions
Expand Down Expand Up @@ -1510,7 +1510,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plan.PhysicalIndexJoin) Execut
rowTypes: innerTypes,
},
workerWg: new(sync.WaitGroup),
resultGenerator: newJoinResultGenerator(b.ctx, v.JoinType, v.OuterIndex == 1, defaultValues, v.OtherConditions, leftTypes, rightTypes),
resultGenerator: newRecordJoiner(b.ctx, v.JoinType, v.OuterIndex == 1, defaultValues, v.OtherConditions, leftTypes, rightTypes),
indexRanges: v.Ranges,
keyOff2IdxOff: v.KeyOff2IdxOff,
}
Expand Down
24 changes: 14 additions & 10 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type IndexLookUpJoin struct {
joinResult *chunk.Chunk
innerIter chunk.Iterator

resultGenerator joinResultGenerator
resultGenerator recordJoiner
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field name needs to be updated accordingly?


indexRanges []*ranger.Range
keyOff2IdxOff []int
Expand Down Expand Up @@ -90,8 +90,9 @@ type lookUpJoinTask struct {
lookupMap *mvmap.MVMap
matchedInners []chunk.Row

doneCh chan error
cursor int
doneCh chan error
cursor int
hasMatch bool

memTracker *memory.Tracker // track memory usage.
}
Expand Down Expand Up @@ -205,16 +206,19 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error {
}

outerRow := task.outerResult.GetRow(task.cursor)
if e.innerIter.Len() == 0 {
err = e.resultGenerator.emit(outerRow, nil, chk)
} else if e.innerIter.Current() != e.innerIter.End() {
err = e.resultGenerator.emit(outerRow, e.innerIter, chk)
}
if err != nil {
return errors.Trace(err)
if e.innerIter.Current() != e.innerIter.End() {
matched, err := e.resultGenerator.tryToMatch(outerRow, e.innerIter, chk)
if err != nil {
return errors.Trace(err)
}
task.hasMatch = task.hasMatch || matched
}
if e.innerIter.Current() == e.innerIter.End() {
if !task.hasMatch {
e.resultGenerator.onMissMatch(outerRow, chk)
}
task.cursor++
task.hasMatch = false
}
if chk.NumRows() == e.maxChunkSize {
return nil
Expand Down
50 changes: 27 additions & 23 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ type HashJoinExec struct {
innerIdx int

// We build individual resultGenerator for each join worker when use chunk-based execution,
// to avoid the concurrency of joinResultGenerator.chk and joinResultGenerator.selected.
resultGenerators []joinResultGenerator
// to avoid the concurrency of recordJoiner.chk and recordJoiner.selected.
resultGenerators []recordJoiner

outerKeyColIdx []int
innerKeyColIdx []int
Expand Down Expand Up @@ -385,20 +385,14 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
return false, joinResult
}
if hasNull {
err = e.resultGenerators[workerID].emit(outerRow, nil, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
}
return err == nil, joinResult
e.resultGenerators[workerID].onMissMatch(outerRow, joinResult.chk)
return true, joinResult
}
e.hashTableValBufs[workerID] = e.hashTable.Get(joinKey, e.hashTableValBufs[workerID][:0])
innerPtrs := e.hashTableValBufs[workerID]
if len(innerPtrs) == 0 {
err = e.resultGenerators[workerID].emit(outerRow, nil, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
}
return err == nil, joinResult
e.resultGenerators[workerID].onMissMatch(outerRow, joinResult.chk)
return true, joinResult
}
innerRows := make([]chunk.Row, 0, len(innerPtrs))
for _, b := range innerPtrs {
Expand All @@ -407,12 +401,15 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
innerRows = append(innerRows, matchedInner)
}
iter := chunk.NewIterator4Slice(innerRows)
hasMatch := false
for iter.Begin(); iter.Current() != iter.End(); {
err = e.resultGenerators[workerID].emit(outerRow, iter, joinResult.chk)
matched, err := e.resultGenerators[workerID].tryToMatch(outerRow, iter, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
return false, joinResult
}
hasMatch = hasMatch || matched

if joinResult.chk.NumRows() == e.maxChunkSize {
ok := true
e.joinResultCh <- joinResult
Expand All @@ -422,6 +419,9 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
}
}
}
if !hasMatch {
e.resultGenerators[workerID].onMissMatch(outerRow, joinResult.chk)
}
return true, joinResult
}

Expand All @@ -448,11 +448,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
}
for i := range selected {
if !selected[i] { // process unmatched outer rows
err = e.resultGenerators[workerID].emit(outerChk.GetRow(i), nil, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
return false, joinResult
}
e.resultGenerators[workerID].onMissMatch(outerChk.GetRow(i), joinResult.chk)
} else { // process matched outer rows
ok, joinResult = e.joinMatchedOuterRow2Chunk(workerID, outerChk.GetRow(i), joinResult)
if !ok {
Expand Down Expand Up @@ -566,7 +562,7 @@ type NestedLoopApplyExec struct {
outerFilter expression.CNFExprs
outer bool

resultGenerator joinResultGenerator
resultGenerator recordJoiner

outerSchema []*expression.CorrelatedColumn

Expand All @@ -578,6 +574,7 @@ type NestedLoopApplyExec struct {
innerSelected []bool
innerIter chunk.Iterator
outerRow *chunk.Row
hasMatch bool

memTracker *memory.Tracker // track memory usage.
}
Expand Down Expand Up @@ -635,9 +632,9 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
if selected {
return &outerRow, nil
} else if e.outer {
err := e.resultGenerator.emit(outerRow, nil, chk)
if err != nil || chk.NumRows() == e.maxChunkSize {
return nil, errors.Trace(err)
e.resultGenerator.onMissMatch(outerRow, chk)
if chk.NumRows() == e.maxChunkSize {
return nil, nil
}
}
}
Expand Down Expand Up @@ -678,10 +675,15 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e
chk.Reset()
for {
if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() {
if e.outerRow != nil && !e.hasMatch {
e.resultGenerator.onMissMatch(*e.outerRow, chk)
}
e.outerRow, err = e.fetchSelectedOuterRow(ctx, chk)
if e.outerRow == nil || err != nil {
return errors.Trace(err)
}
e.hasMatch = false

for _, col := range e.outerSchema {
*col.Data = e.outerRow.GetDatum(col.Index, col.RetType)
}
Expand All @@ -693,7 +695,9 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e
e.innerIter.Begin()
}

err = e.resultGenerator.emit(*e.outerRow, e.innerIter, chk)
matched, err := e.resultGenerator.tryToMatch(*e.outerRow, e.innerIter, chk)
e.hasMatch = e.hasMatch || matched

if err != nil || chk.NumRows() == e.maxChunkSize {
return errors.Trace(err)
}
Expand Down
Loading