Skip to content

Commit

Permalink
executor: optimize (left outer) (anti) semi join which has no other c…
Browse files Browse the repository at this point in the history
…ondition (#47764)

close #47424
  • Loading branch information
gengliqi authored Oct 23, 2023
1 parent 98b6c43 commit 1d3b53e
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 30 deletions.
9 changes: 3 additions & 6 deletions pkg/executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,13 +633,10 @@ func (p *cteProducer) checkHasDup(probeKey uint64,
curChk *chunk.Chunk,
storage cteutil.Storage,
hashTbl baseHashTable) (hasDup bool, err error) {
ptrs := hashTbl.Get(probeKey)
entry := hashTbl.Get(probeKey)

if len(ptrs) == 0 {
return false, nil
}

for _, ptr := range ptrs {
for ; entry != nil; entry = entry.next {
ptr := entry.ptr
var matchedRow chunk.Row
if curChk != nil {
matchedRow = curChk.GetRow(int(ptr.RowIdx))
Expand Down
70 changes: 56 additions & 14 deletions pkg/executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,47 @@ func (c *hashRowContainer) GetMatchedRows(probeKey uint64, probeRow chunk.Row, h
return matchedRows, err
}

// GetOneMatchedRow get one matched rows from probeRow.
func (c *hashRowContainer) GetOneMatchedRow(probeKey uint64, probeRow chunk.Row, hCtx *hashContext) (*chunk.Row, error) {
var err error
innerEntry := c.hashTable.Get(probeKey)
if innerEntry == nil {
return nil, err
}
var matchedRow chunk.Row

if c.chkBuf != nil {
c.chkBuf.Reset()
}
capacity := 0

for i := 0; innerEntry != nil; i, innerEntry = i+1, innerEntry.next {
ptr := innerEntry.ptr
matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunkIfInDisk(ptr, c.chkBuf)
if err != nil {
return nil, err
}
var ok bool
ok, err = c.matchJoinKey(matchedRow, probeRow, hCtx)
if err != nil {
return nil, err
}
if ok {
return &matchedRow, nil
}
atomic.AddInt64(&c.stat.probeCollision, 1)
if i == 0 {
capacity = c.chkBuf.Capacity()
if capacity < 128 {
capacity = 128
}
} else if (i+1)%capacity == 0 {
c.chkBuf.Reset()
}
}
return nil, err
}

func (c *hashRowContainer) GetAllMatchedRows(probeHCtx *hashContext, probeSideRow chunk.Row,
probeKeyNullBits *bitmap.ConcurrentBitmap, matched []chunk.Row, needCheckBuildColPos, needCheckProbeColPos []int, needCheckBuildTypes, needCheckProbeTypes []*types.FieldType) ([]chunk.Row, error) {
// for NAAJ probe row with null, we should match them with all build rows.
Expand Down Expand Up @@ -232,7 +273,11 @@ const rowPtrSize = int64(unsafe.Sizeof(chunk.RowPtr{}))
// h and buf.
func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext, matched []chunk.Row, matchedPtrs []chunk.RowPtr, needPtr bool) ([]chunk.Row, []chunk.RowPtr, error) {
var err error
innerPtrs := c.hashTable.Get(probeKey)
entry := c.hashTable.Get(probeKey)
var innerPtrs []chunk.RowPtr
for ; entry != nil; entry = entry.next {
innerPtrs = append(innerPtrs, entry.ptr)
}
if len(innerPtrs) == 0 {
return nil, nil, err
}
Expand Down Expand Up @@ -565,7 +610,12 @@ func (es *entryStore) GetStore() (e *entry, memDelta int64) {

type baseHashTable interface {
Put(hashKey uint64, rowPtr chunk.RowPtr)
Get(hashKey uint64) (rowPtrs []chunk.RowPtr)
// e := Get(hashKey)
// for ; e != nil; e = e.next {
// rowPtr := e.ptr
// ...
// }
Get(hashKey uint64) *entry
Len() uint64
// GetAndCleanMemoryDelta gets and cleans the memDelta of the baseHashTable. Memory delta will be cleared after each fetch.
// It indicates the memory delta of the baseHashTable since the last calling GetAndCleanMemoryDelta().
Expand Down Expand Up @@ -611,13 +661,9 @@ func (ht *unsafeHashTable) Put(hashKey uint64, rowPtr chunk.RowPtr) {
}

// Get gets the values of the "key" and appends them to "values".
func (ht *unsafeHashTable) Get(hashKey uint64) (rowPtrs []chunk.RowPtr) {
func (ht *unsafeHashTable) Get(hashKey uint64) *entry {
entryAddr := ht.hashMap[hashKey]
for entryAddr != nil {
rowPtrs = append(rowPtrs, entryAddr.ptr)
entryAddr = entryAddr.next
}
return
return entryAddr
}

// Len returns the number of rowPtrs in the unsafeHashTable, the number of keys may be less than Len
Expand Down Expand Up @@ -674,13 +720,9 @@ func (ht *concurrentMapHashTable) Put(hashKey uint64, rowPtr chunk.RowPtr) {
}

// Get gets the values of the "key" and appends them to "values".
func (ht *concurrentMapHashTable) Get(hashKey uint64) (rowPtrs []chunk.RowPtr) {
func (ht *concurrentMapHashTable) Get(hashKey uint64) *entry {
entryAddr, _ := ht.hashMap.Get(hashKey)
for entryAddr != nil {
rowPtrs = append(rowPtrs, entryAddr.ptr)
entryAddr = entryAddr.next
}
return
return entryAddr
}

// Iter gets the every value of the hash table.
Expand Down
11 changes: 4 additions & 7 deletions pkg/executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ type indexHashJoinOuterWorker struct {

type indexHashJoinInnerWorker struct {
innerWorker
matchedOuterPtrs []chunk.RowPtr
joiner joiner
joinChkResourceCh chan *chunk.Chunk
// resultCh is valid only when indexNestedLoopHashJoin do not need to keep
Expand Down Expand Up @@ -436,7 +435,6 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask,
joiner: e.joiners[workerID],
joinChkResourceCh: e.joinChkResourceCh[workerID],
resultCh: e.resultCh,
matchedOuterPtrs: make([]chunk.RowPtr, 0, e.MaxChunkSize()),
joinKeyBuf: make([]byte, 1),
outerRowStatus: make([]outerRowStatusFlag, 0, e.MaxChunkSize()),
rowIter: chunk.NewIterator4Slice([]chunk.Row{}).(*chunk.Iterator4Slice),
Expand Down Expand Up @@ -712,15 +710,14 @@ func (iw *indexHashJoinInnerWorker) getMatchedOuterRows(innerRow chunk.Row, task
if err != nil {
return nil, nil, err
}
iw.matchedOuterPtrs = task.lookupMap.Get(h.Sum64())
if len(iw.matchedOuterPtrs) == 0 {
matchedOuterEntry := task.lookupMap.Get(h.Sum64())
if matchedOuterEntry == nil {
return nil, nil, nil
}
joinType := JoinerType(iw.joiner)
isSemiJoin := joinType == plannercore.SemiJoin || joinType == plannercore.LeftOuterSemiJoin
matchedRows = make([]chunk.Row, 0, len(iw.matchedOuterPtrs))
matchedRowPtr = make([]chunk.RowPtr, 0, len(iw.matchedOuterPtrs))
for _, ptr := range iw.matchedOuterPtrs {
for ; matchedOuterEntry != nil; matchedOuterEntry = matchedOuterEntry.next {
ptr := matchedOuterEntry.ptr
outerRow := task.outerResult.GetRow(ptr)
ok, err := codec.EqualChunkRow(iw.ctx.GetSessionVars().StmtCtx, innerRow, iw.hashTypes, iw.hashCols, outerRow, iw.outerCtx.hashTypes, iw.outerCtx.hashCols)
if err != nil {
Expand Down
14 changes: 12 additions & 2 deletions pkg/executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,18 @@ func (w *probeWorker) joinNAAJMatchProbeSideRow2Chunk(probeKey uint64, probeKeyN
func (w *probeWorker) joinMatchedProbeSideRow2Chunk(probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext,
joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) {
var err error
w.buildSideRows, err = w.rowContainerForProbe.GetMatchedRows(probeKey, probeSideRow, hCtx, w.buildSideRows)
buildSideRows := w.buildSideRows
var buildSideRows []chunk.Row
if w.joiner.isSemiJoinWithoutCondition() {
var rowPtr *chunk.Row
rowPtr, err = w.rowContainerForProbe.GetOneMatchedRow(probeKey, probeSideRow, hCtx)
if rowPtr != nil {
buildSideRows = append(buildSideRows, *rowPtr)
}
} else {
w.buildSideRows, err = w.rowContainerForProbe.GetMatchedRows(probeKey, probeSideRow, hCtx, w.buildSideRows)
buildSideRows = w.buildSideRows
}

if err != nil {
joinResult.err = err
return false, joinResult
Expand Down
43 changes: 42 additions & 1 deletion pkg/executor/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ type joiner interface {
// parameter passed to `onMissMatch`.
onMissMatch(hasNull bool, outer chunk.Row, chk *chunk.Chunk)

// isSemiJoinWithoutCondition returns if it's a semi join and has no condition.
// If true, at most one matched row is needed to match inners, which can optimize a lot when
// there are a lot of matched rows.
isSemiJoinWithoutCondition() bool

// Clone deep copies a joiner.
Clone() joiner
}
Expand Down Expand Up @@ -426,6 +431,10 @@ func (j *semiJoiner) tryToMatchOuters(outers chunk.Iterator, inner chunk.Row, ch

func (*semiJoiner) onMissMatch(bool, chunk.Row, *chunk.Chunk) {}

func (j *semiJoiner) isSemiJoinWithoutCondition() bool {
return len(j.conditions) == 0
}

// Clone implements joiner interface.
func (j *semiJoiner) Clone() joiner {
return &semiJoiner{baseJoiner: j.baseJoiner.Clone()}
Expand Down Expand Up @@ -490,6 +499,10 @@ func (naaj *nullAwareAntiSemiJoiner) onMissMatch(_ bool, outer chunk.Row, chk *c
chk.AppendRowByColIdxs(outer, naaj.lUsed)
}

func (naaj *nullAwareAntiSemiJoiner) isSemiJoinWithoutCondition() bool {
return len(naaj.conditions) == 0
}

func (naaj *nullAwareAntiSemiJoiner) Clone() joiner {
return &nullAwareAntiSemiJoiner{baseJoiner: naaj.baseJoiner.Clone()}
}
Expand Down Expand Up @@ -559,6 +572,10 @@ func (j *antiSemiJoiner) onMissMatch(hasNull bool, outer chunk.Row, chk *chunk.C
}
}

func (j *antiSemiJoiner) isSemiJoinWithoutCondition() bool {
return len(j.conditions) == 0
}

func (j *antiSemiJoiner) Clone() joiner {
return &antiSemiJoiner{baseJoiner: j.baseJoiner.Clone()}
}
Expand Down Expand Up @@ -641,6 +658,10 @@ func (j *leftOuterSemiJoiner) onMissMatch(hasNull bool, outer chunk.Row, chk *ch
}
}

func (j *leftOuterSemiJoiner) isSemiJoinWithoutCondition() bool {
return len(j.conditions) == 0
}

func (j *leftOuterSemiJoiner) Clone() joiner {
return &leftOuterSemiJoiner{baseJoiner: j.baseJoiner.Clone()}
}
Expand Down Expand Up @@ -713,8 +734,12 @@ func (*nullAwareAntiLeftOuterSemiJoiner) tryToMatchOuters(chunk.Iterator, chunk.
return nil, err
}

func (naal *nullAwareAntiLeftOuterSemiJoiner) isSemiJoinWithoutCondition() bool {
return len(naal.conditions) == 0
}

func (naal *nullAwareAntiLeftOuterSemiJoiner) Clone() joiner {
return &antiLeftOuterSemiJoiner{baseJoiner: naal.baseJoiner.Clone()}
return &nullAwareAntiLeftOuterSemiJoiner{baseJoiner: naal.baseJoiner.Clone()}
}

type antiLeftOuterSemiJoiner struct {
Expand Down Expand Up @@ -798,6 +823,10 @@ func (j *antiLeftOuterSemiJoiner) onMissMatch(hasNull bool, outer chunk.Row, chk
}
}

func (j *antiLeftOuterSemiJoiner) isSemiJoinWithoutCondition() bool {
return len(j.conditions) == 0
}

func (j *antiLeftOuterSemiJoiner) Clone() joiner {
return &antiLeftOuterSemiJoiner{baseJoiner: j.baseJoiner.Clone()}
}
Expand Down Expand Up @@ -877,6 +906,10 @@ func (j *leftOuterJoiner) onMissMatch(_ bool, outer chunk.Row, chk *chunk.Chunk)
chk.AppendPartialRowByColIdxs(lWide, j.defaultInner, j.rUsed)
}

func (*leftOuterJoiner) isSemiJoinWithoutCondition() bool {
return false
}

func (j *leftOuterJoiner) Clone() joiner {
return &leftOuterJoiner{baseJoiner: j.baseJoiner.Clone()}
}
Expand Down Expand Up @@ -952,6 +985,10 @@ func (j *rightOuterJoiner) onMissMatch(_ bool, outer chunk.Row, chk *chunk.Chunk
chk.AppendPartialRowByColIdxs(lWide, outer, j.rUsed)
}

func (*rightOuterJoiner) isSemiJoinWithoutCondition() bool {
return false
}

func (j *rightOuterJoiner) Clone() joiner {
return &rightOuterJoiner{baseJoiner: j.baseJoiner.Clone()}
}
Expand Down Expand Up @@ -1035,6 +1072,10 @@ func (j *innerJoiner) tryToMatchOuters(outers chunk.Iterator, inner chunk.Row, c

func (*innerJoiner) onMissMatch(bool, chunk.Row, *chunk.Chunk) {}

func (*innerJoiner) isSemiJoinWithoutCondition() bool {
return false
}

func (j *innerJoiner) Clone() joiner {
return &innerJoiner{baseJoiner: j.baseJoiner.Clone()}
}

0 comments on commit 1d3b53e

Please sign in to comment.