From 343ed5469ed483333764702e0e351e33077d8dbf Mon Sep 17 00:00:00 2001 From: Zhang Jian Date: Wed, 8 Aug 2018 20:35:52 +0800 Subject: [PATCH] executor: refactor `joinResultGenerator` to handle the unmatched outer records (#7288) (#7315) --- executor/index_lookup_join.go | 22 ++- executor/join.go | 44 ++--- executor/join_result_generators.go | 277 ++++++++++++++++------------- executor/join_test.go | 13 ++ executor/merge_join.go | 15 +- 5 files changed, 217 insertions(+), 154 deletions(-) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index f57079d2972b1..f5f048af2f734 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -90,8 +90,9 @@ type lookUpJoinTask struct { lookupMap *mvmap.MVMap matchedInners []chunk.Row - doneCh chan error - cursor int + doneCh chan error + cursor int + hasMatch bool memTracker *memory.Tracker // track memory usage. } @@ -205,16 +206,19 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error { } outerRow := task.outerResult.GetRow(task.cursor) - if e.innerIter.Len() == 0 { - err = e.resultGenerator.emit(outerRow, nil, chk) - } else if e.innerIter.Current() != e.innerIter.End() { - err = e.resultGenerator.emit(outerRow, e.innerIter, chk) - } - if err != nil { - return errors.Trace(err) + if e.innerIter.Current() != e.innerIter.End() { + matched, err := e.resultGenerator.tryToMatch(outerRow, e.innerIter, chk) + if err != nil { + return errors.Trace(err) + } + task.hasMatch = task.hasMatch || matched } if e.innerIter.Current() == e.innerIter.End() { + if !task.hasMatch { + e.resultGenerator.onMissMatch(outerRow, chk) + } task.cursor++ + task.hasMatch = false } if chk.NumRows() == e.maxChunkSize { return nil diff --git a/executor/join.go b/executor/join.go index 1047002cce571..ceac4250e70f3 100644 --- a/executor/join.go +++ b/executor/join.go @@ -356,20 +356,14 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R return false, joinResult } if hasNull { - err = e.resultGenerators[workerID].emit(outerRow, nil, joinResult.chk) - if err != nil { - joinResult.err = errors.Trace(err) - } - return err == nil, joinResult + e.resultGenerators[workerID].onMissMatch(outerRow, joinResult.chk) + return true, joinResult } e.hashTableValBufs[workerID] = e.hashTable.Get(joinKey, e.hashTableValBufs[workerID][:0]) innerPtrs := e.hashTableValBufs[workerID] if len(innerPtrs) == 0 { - err = e.resultGenerators[workerID].emit(outerRow, nil, joinResult.chk) - if err != nil { - joinResult.err = errors.Trace(err) - } - return err == nil, joinResult + e.resultGenerators[workerID].onMissMatch(outerRow, joinResult.chk) + return true, joinResult } innerRows := make([]chunk.Row, 0, len(innerPtrs)) for _, b := range innerPtrs { @@ -378,12 +372,15 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R innerRows = append(innerRows, matchedInner) } iter := chunk.NewIterator4Slice(innerRows) + hasMatch := false for iter.Begin(); iter.Current() != iter.End(); { - err = e.resultGenerators[workerID].emit(outerRow, iter, joinResult.chk) + matched, err := e.resultGenerators[workerID].tryToMatch(outerRow, iter, joinResult.chk) if err != nil { joinResult.err = errors.Trace(err) return false, joinResult } + hasMatch = hasMatch || matched + if joinResult.chk.NumRows() == e.maxChunkSize { ok := true e.joinResultCh <- joinResult @@ -393,6 +390,9 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R } } } + if !hasMatch { + e.resultGenerators[workerID].onMissMatch(outerRow, joinResult.chk) + } return true, joinResult } @@ -419,11 +419,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu } for i := range selected { if !selected[i] { // process unmatched outer rows - err = e.resultGenerators[workerID].emit(outerChk.GetRow(i), nil, joinResult.chk) - if err != nil { - joinResult.err = errors.Trace(err) - return false, joinResult - } + e.resultGenerators[workerID].onMissMatch(outerChk.GetRow(i), joinResult.chk) } else { // process matched outer rows ok, joinResult = e.joinMatchedOuterRow2Chunk(workerID, outerChk.GetRow(i), joinResult) if !ok { @@ -530,6 +526,7 @@ type NestedLoopApplyExec struct { innerSelected []bool innerIter chunk.Iterator outerRow *chunk.Row + hasMatch bool memTracker *memory.Tracker // track memory usage. } @@ -587,9 +584,9 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch if selected { return &outerRow, nil } else if e.outer { - err := e.resultGenerator.emit(outerRow, nil, chk) - if err != nil || chk.NumRows() == e.maxChunkSize { - return nil, errors.Trace(err) + e.resultGenerator.onMissMatch(outerRow, chk) + if chk.NumRows() == e.maxChunkSize { + return nil, nil } } } @@ -630,10 +627,15 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e chk.Reset() for { if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() { + if e.outerRow != nil && !e.hasMatch { + e.resultGenerator.onMissMatch(*e.outerRow, chk) + } e.outerRow, err = e.fetchSelectedOuterRow(ctx, chk) if e.outerRow == nil || err != nil { return errors.Trace(err) } + e.hasMatch = false + for _, col := range e.outerSchema { *col.Data = e.outerRow.GetDatum(col.Index, col.RetType) } @@ -645,7 +647,9 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e e.innerIter.Begin() } - err = e.resultGenerator.emit(*e.outerRow, e.innerIter, chk) + matched, err := e.resultGenerator.tryToMatch(*e.outerRow, e.innerIter, chk) + e.hasMatch = e.hasMatch || matched + if err != nil || chk.NumRows() == e.maxChunkSize { return errors.Trace(err) } diff --git a/executor/join_result_generators.go b/executor/join_result_generators.go index 62e6d2a2321fc..de4c94e8b3103 100644 --- a/executor/join_result_generators.go +++ b/executor/join_result_generators.go @@ -32,21 +32,51 @@ var ( _ joinResultGenerator = &innerJoinResultGenerator{} ) -// joinResultGenerator is used to generate join results according the join type, see every implementor for detailed information. +// joinResultGenerator is used to generate join results according to the join +// type. A typical instruction flow is: +// +// hasMatch := false +// for innerIter.Current() != innerIter.End() { +// matched, err := g.tryToMatch(outer, innerIter, chk) +// // handle err +// hasMatch = hasMatch || matched +// } +// if !hasMatch { +// g.onMissMatch(outer) +// } +// +// NOTE: This interface is **not** thread-safe. type joinResultGenerator interface { - // emit tries to join an outer row with a batch of inner rows. - // When inners == nil or inners.Len() == 0, it means that the outer row can not be joined with any inner row: - // 1. SemiJoin: unmatched outer row is ignored. - // 2. AntiSemiJoin: unmatched outer row is appended to the result buffer. - // 3. LeftOuterSemiJoin: unmatched outer row is appended with 0 and appended to the result buffer. - // 4. AntiLeftOuterSemiJoin: unmatched outer row is appended with 1 and appended to the result buffer. - // 5. LeftOuterJoin: unmatched outer row is joined with a row of NULLs and appended to the result buffer. - // 6. RightOuterJoin: unmatched outer row is joined with a row of NULLs and appended to the result buffer. - // 7. InnerJoin: unmatched outer row is ignored. - // When inners.Len != 0 but all the joined rows are filtered, this means that the outer row is unmatched and the above action is tacked as well. - // Otherwise, the outer row is matched and some joined rows is appended to the `chk`. - // The size of `chk` is MaxChunkSize at most. - emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error + // tryToMatch tries to join an outer row with a batch of inner rows. When + // 'inners.Len != 0' but all the joined rows are filtered, the outer row is + // considered unmatched. Otherwise, the outer row is matched and some joined + // rows are appended to `chk`. The size of `chk` is limited to MaxChunkSize. + // + // NOTE: Callers need to call this function multiple times to consume all + // the inner rows for an outer row, and dicide whether the outer row can be + // matched with at lease one inner row. + tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) (bool, error) + + // onMissMatch operates on the unmatched outer row according to the join + // type. An outer row can be considered miss matched if: + // 1. it can not pass the filter on the outer table side. + // 2. there is no inner row with the same join key. + // 3. all the joined rows can not pass the filter on the join result. + // + // On these conditions, the caller calls this function to handle the + // unmatched outer rows according to the current join type: + // 1. 'SemiJoin': ignores the unmatched outer row. + // 2. 'AntiSemiJoin': appends the unmatched outer row to the result buffer. + // 3. 'LeftOuterSemiJoin': concats the unmatched outer row with 0 and + // appends it to the result buffer. + // 4. 'AntiLeftOuterSemiJoin': concats the unmatched outer row with 0 and + // appends it to the result buffer. + // 5. 'LeftOuterJoin': concats the unmatched outer row with a row of NULLs + // and appends it to the result buffer. + // 6. 'RightOuterJoin': concats the unmatched outer row with a row of NULLs + // and appends it to the result buffer. + // 7. 'InnerJoin': ignores the unmatched outer row. + onMissMatch(outer chunk.Row, chk *chunk.Chunk) } func newJoinResultGenerator(ctx sessionctx.Context, joinType plan.JoinType, @@ -89,8 +119,6 @@ func newJoinResultGenerator(ctx sessionctx.Context, joinType plan.JoinType, panic("unsupported join type in func newJoinResultGenerator()") } -// baseJoinResultGenerator is not thread-safe, -// so we should build individual generator for every join goroutine. type baseJoinResultGenerator struct { ctx sessionctx.Context conditions []expression.Expression @@ -133,15 +161,15 @@ type semiJoinResultGenerator struct { baseJoinResultGenerator } -// emit implements joinResultGenerator interface. -func (outputer *semiJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { - if inners == nil || inners.Len() == 0 { - return nil +func (outputer *semiJoinResultGenerator) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) (matched bool, err error) { + if inners.Len() == 0 { + return false, nil } - defer inners.ReachEnd() + if len(outputer.conditions) == 0 { chk.AppendPartialRow(0, outer) - return nil + inners.ReachEnd() + return true, nil } for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { @@ -151,31 +179,36 @@ func (outputer *semiJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iter } else { outputer.makeJoinRowToChunk(outputer.chk, outer, inner) } - selected, err := expression.EvalBool(outputer.ctx, outputer.conditions, outputer.chk.GetRow(0)) + + matched, err = expression.EvalBool(outputer.ctx, outputer.conditions, outputer.chk.GetRow(0)) if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) } - if selected { - chk.AppendRow(outer) - return nil + if matched { + chk.AppendPartialRow(0, outer) + inners.ReachEnd() + return true, nil } } - return nil + return false, nil +} + +func (outputer *semiJoinResultGenerator) onMissMatch(outer chunk.Row, chk *chunk.Chunk) { } type antiSemiJoinResultGenerator struct { baseJoinResultGenerator } -// emit implements joinResultGenerator interface. -func (outputer *antiSemiJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { - if inners == nil || inners.Len() == 0 { - chk.AppendRow(outer) - return nil +// tryToMatch implements joinResultGenerator interface. +func (outputer *antiSemiJoinResultGenerator) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) (matched bool, err error) { + if inners.Len() == 0 { + return false, nil } - defer inners.ReachEnd() + if len(outputer.conditions) == 0 { - return nil + inners.ReachEnd() + return true, nil } for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { @@ -186,183 +219,186 @@ func (outputer *antiSemiJoinResultGenerator) emit(outer chunk.Row, inners chunk. outputer.makeJoinRowToChunk(outputer.chk, outer, inner) } - matched, err := expression.EvalBool(outputer.ctx, outputer.conditions, outputer.chk.GetRow(0)) + matched, err = expression.EvalBool(outputer.ctx, outputer.conditions, outputer.chk.GetRow(0)) if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) } if matched { - return nil + inners.ReachEnd() + return true, nil } } + return false, nil +} + +func (outputer *antiSemiJoinResultGenerator) onMissMatch(outer chunk.Row, chk *chunk.Chunk) { chk.AppendRow(outer) - return nil } type leftOuterSemiJoinResultGenerator struct { baseJoinResultGenerator } -// emit implements joinResultGenerator interface. -func (outputer *leftOuterSemiJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { - if inners == nil || inners.Len() == 0 { - chk.AppendPartialRow(0, outer) - chk.AppendInt64(outer.Len(), 0) - return nil +// tryToMatch implements joinResultGenerator interface. +func (outputer *leftOuterSemiJoinResultGenerator) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) (matched bool, err error) { + if inners.Len() == 0 { + return false, nil } - defer inners.ReachEnd() if len(outputer.conditions) == 0 { - chk.AppendPartialRow(0, outer) - chk.AppendInt64(outer.Len(), 1) - return nil + outputer.onMatch(outer, chk) + inners.ReachEnd() + return true, nil } for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { outputer.chk.Reset() outputer.makeJoinRowToChunk(outputer.chk, outer, inner) - matched, err := expression.EvalBool(outputer.ctx, outputer.conditions, outputer.chk.GetRow(0)) + + matched, err = expression.EvalBool(outputer.ctx, outputer.conditions, outputer.chk.GetRow(0)) if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) } if matched { - chk.AppendPartialRow(0, outer) - chk.AppendInt64(outer.Len(), 1) - return nil + outputer.onMatch(outer, chk) + inners.ReachEnd() + return true, nil } } + return false, nil +} + +func (outputer *leftOuterSemiJoinResultGenerator) onMatch(outer chunk.Row, chk *chunk.Chunk) { + chk.AppendPartialRow(0, outer) + chk.AppendInt64(outer.Len(), 1) +} + +func (outputer *leftOuterSemiJoinResultGenerator) onMissMatch(outer chunk.Row, chk *chunk.Chunk) { chk.AppendPartialRow(0, outer) chk.AppendInt64(outer.Len(), 0) - return nil } type antiLeftOuterSemiJoinResultGenerator struct { baseJoinResultGenerator } -// emit implements joinResultGenerator interface. -func (outputer *antiLeftOuterSemiJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { - // outer row can not be joined with any inner row. - if inners == nil || inners.Len() == 0 { - chk.AppendPartialRow(0, outer) - chk.AppendInt64(outer.Len(), 1) - return nil +// tryToMatch implements joinResultGenerator interface. +func (outputer *antiLeftOuterSemiJoinResultGenerator) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) (matched bool, err error) { + if inners.Len() == 0 { + return false, nil } - defer inners.ReachEnd() - // outer row can be joined with an inner row. if len(outputer.conditions) == 0 { - chk.AppendPartialRow(0, outer) - chk.AppendInt64(outer.Len(), 0) - return nil + outputer.onMatch(outer, chk) + inners.ReachEnd() + return true, nil } for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { outputer.chk.Reset() outputer.makeJoinRowToChunk(outputer.chk, outer, inner) matched, err := expression.EvalBool(outputer.ctx, outputer.conditions, outputer.chk.GetRow(0)) + if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) } - // outer row can be joined with an inner row. if matched { - chk.AppendPartialRow(0, outer) - chk.AppendInt64(outer.Len(), 0) - return nil + outputer.onMatch(outer, chk) + inners.ReachEnd() + return true, nil } } + return false, nil +} - // outer row can not be joined with any inner row. +func (outputer *antiLeftOuterSemiJoinResultGenerator) onMatch(outer chunk.Row, chk *chunk.Chunk) { + chk.AppendPartialRow(0, outer) + chk.AppendInt64(outer.Len(), 0) +} + +func (outputer *antiLeftOuterSemiJoinResultGenerator) onMissMatch(outer chunk.Row, chk *chunk.Chunk) { chk.AppendPartialRow(0, outer) chk.AppendInt64(outer.Len(), 1) - return nil } type leftOuterJoinResultGenerator struct { baseJoinResultGenerator } -// emit implements joinResultGenerator interface. -func (outputer *leftOuterJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { - // outer row can not be joined with any inner row. - if inners == nil || inners.Len() == 0 { - chk.AppendPartialRow(0, outer) - chk.AppendPartialRow(outer.Len(), outputer.defaultInner) - return nil +// tryToMatch implements joinResultGenerator interface. +func (outputer *leftOuterJoinResultGenerator) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) (bool, error) { + if inners.Len() == 0 { + return false, nil } + outputer.chk.Reset() chkForJoin := outputer.chk if len(outputer.conditions) == 0 { chkForJoin = chk } + numToAppend := outputer.maxChunkSize - chk.NumRows() for ; inners.Current() != inners.End() && numToAppend > 0; numToAppend-- { outputer.makeJoinRowToChunk(chkForJoin, outer, inners.Current()) inners.Next() } if len(outputer.conditions) == 0 { - return nil + return true, nil } + // reach here, chkForJoin is outputer.chk matched, err := outputer.filter(chkForJoin, chk) - if err != nil { - return errors.Trace(err) - } - chkForJoin.Reset() - if !matched { - // outer row can not be joined with any inner row. - chk.AppendPartialRow(0, outer) - chk.AppendPartialRow(outer.Len(), outputer.defaultInner) - } - return nil + return matched, errors.Trace(err) +} + +func (outputer *leftOuterJoinResultGenerator) onMissMatch(outer chunk.Row, chk *chunk.Chunk) { + chk.AppendPartialRow(0, outer) + chk.AppendPartialRow(outer.Len(), outputer.defaultInner) } type rightOuterJoinResultGenerator struct { baseJoinResultGenerator } -// emit implements joinResultGenerator interface. -func (outputer *rightOuterJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { - // outer row can not be joined with any inner row. - if inners == nil || inners.Len() == 0 { - chk.AppendPartialRow(0, outputer.defaultInner) - chk.AppendPartialRow(outputer.defaultInner.Len(), outer) - return nil +// tryToMatch implements joinResultGenerator interface. +func (outputer *rightOuterJoinResultGenerator) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) (bool, error) { + if inners.Len() == 0 { + return false, nil } + outputer.chk.Reset() chkForJoin := outputer.chk if len(outputer.conditions) == 0 { chkForJoin = chk } + numToAppend := outputer.maxChunkSize - chk.NumRows() for ; inners.Current() != inners.End() && numToAppend > 0; numToAppend-- { outputer.makeJoinRowToChunk(chkForJoin, inners.Current(), outer) inners.Next() } if len(outputer.conditions) == 0 { - return nil + return true, nil } + // reach here, chkForJoin is outputer.chk matched, err := outputer.filter(chkForJoin, chk) - if err != nil { - return errors.Trace(err) - } - chkForJoin.Reset() - // outer row can not be joined with any inner row. - if !matched { - chk.AppendPartialRow(0, outputer.defaultInner) - chk.AppendPartialRow(outputer.defaultInner.Len(), outer) - } - return nil + return matched, errors.Trace(err) +} + +func (outputer *rightOuterJoinResultGenerator) onMissMatch(outer chunk.Row, chk *chunk.Chunk) { + chk.AppendPartialRow(0, outputer.defaultInner) + chk.AppendPartialRow(outputer.defaultInner.Len(), outer) } type innerJoinResultGenerator struct { baseJoinResultGenerator } -// emit implements joinResultGenerator interface. -func (outputer *innerJoinResultGenerator) emit(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) error { - if inners == nil || inners.Len() == 0 { - return nil +// tryToMatch implements joinResultGenerator interface. +func (outputer *innerJoinResultGenerator) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk) (bool, error) { + if inners.Len() == 0 { + return false, nil } outputer.chk.Reset() chkForJoin := outputer.chk @@ -378,14 +414,13 @@ func (outputer *innerJoinResultGenerator) emit(outer chunk.Row, inners chunk.Ite } } if len(outputer.conditions) == 0 { - return nil + return true, nil } + // reach here, chkForJoin is outputer.chk - _, err := outputer.filter(chkForJoin, chk) - if err != nil { - return errors.Trace(err) - } - chkForJoin.Reset() + matched, err := outputer.filter(chkForJoin, chk) + return matched, errors.Trace(err) +} - return nil +func (outputer *innerJoinResultGenerator) onMissMatch(outer chunk.Row, chk *chunk.Chunk) { } diff --git a/executor/join_test.go b/executor/join_test.go index 0b36b2b9ad2ec..a78f6ac93a30d 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -853,6 +853,19 @@ func (s *testSuite) TestIndexLookupJoin(c *C) { `1.01`, `2.02`, )) + + tk.MustExec(`drop table if exists t;`) + tk.MustExec(`create table t(a bigint, b bigint, unique key idx1(a, b));`) + tk.MustExec(`insert into t values(1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6);`) + tk.MustExec(`set @@tidb_max_chunk_size = 2;`) + tk.MustQuery(`select /*+ TIDB_INLJ(t1) */ * from t t1 left join t t2 on t1.a = t2.a and t1.b = t2.b + 4;`).Check(testkit.Rows( + `1 1 `, + `1 2 `, + `1 3 `, + `1 4 `, + `1 5 1 1`, + `1 6 1 2`, + )) } func (s *testSuite) TestMergejoinOrder(c *C) { diff --git a/executor/merge_join.go b/executor/merge_join.go index eaf39429f2dca..4be9c9facb94c 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -56,8 +56,9 @@ type mergeJoinOuterTable struct { chk *chunk.Chunk selected []bool - iter *chunk.Iterator4Chunk - row chunk.Row + iter *chunk.Iterator4Chunk + row chunk.Row + hasMatch bool } // mergeJoinInnerTable represents the inner table of merge join. @@ -290,12 +291,13 @@ func (e *MergeJoinExec) joinToChunk(ctx context.Context, chk *chunk.Chunk) (hasM } if cmpResult < 0 { - err = e.resultGenerator.emit(e.outerTable.row, nil, chk) + e.resultGenerator.onMissMatch(e.outerTable.row, chk) if err != nil { return false, errors.Trace(err) } e.outerTable.row = e.outerTable.iter.Next() + e.outerTable.hasMatch = false if chk.NumRows() == e.maxChunkSize { return true, nil @@ -303,14 +305,19 @@ func (e *MergeJoinExec) joinToChunk(ctx context.Context, chk *chunk.Chunk) (hasM continue } - err = e.resultGenerator.emit(e.outerTable.row, e.innerIter4Row, chk) + matched, err := e.resultGenerator.tryToMatch(e.outerTable.row, e.innerIter4Row, chk) if err != nil { return false, errors.Trace(err) } + e.outerTable.hasMatch = e.outerTable.hasMatch || matched if e.innerIter4Row.Current() == e.innerIter4Row.End() { + if !e.outerTable.hasMatch { + e.resultGenerator.onMissMatch(e.outerTable.row, chk) + } e.outerTable.row = e.outerTable.iter.Next() e.innerIter4Row.Begin() + e.outerTable.hasMatch = false } if chk.NumRows() >= e.maxChunkSize {