From 1d3b53e68415f8746b63fce00139c38020cc1edf Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Mon, 23 Oct 2023 01:54:32 -0500 Subject: [PATCH] executor: optimize (left outer) (anti) semi join which has no other condition (#47764) close pingcap/tidb#47424 --- pkg/executor/cte.go | 9 ++-- pkg/executor/hash_table.go | 70 ++++++++++++++++++++------ pkg/executor/index_lookup_hash_join.go | 11 ++-- pkg/executor/join.go | 14 +++++- pkg/executor/joiner.go | 43 +++++++++++++++- 5 files changed, 117 insertions(+), 30 deletions(-) diff --git a/pkg/executor/cte.go b/pkg/executor/cte.go index 9928bc652b639..cb538c4ab1c29 100644 --- a/pkg/executor/cte.go +++ b/pkg/executor/cte.go @@ -633,13 +633,10 @@ func (p *cteProducer) checkHasDup(probeKey uint64, curChk *chunk.Chunk, storage cteutil.Storage, hashTbl baseHashTable) (hasDup bool, err error) { - ptrs := hashTbl.Get(probeKey) + entry := hashTbl.Get(probeKey) - if len(ptrs) == 0 { - return false, nil - } - - for _, ptr := range ptrs { + for ; entry != nil; entry = entry.next { + ptr := entry.ptr var matchedRow chunk.Row if curChk != nil { matchedRow = curChk.GetRow(int(ptr.RowIdx)) diff --git a/pkg/executor/hash_table.go b/pkg/executor/hash_table.go index ffed29de0da4c..fa1dff03935f7 100644 --- a/pkg/executor/hash_table.go +++ b/pkg/executor/hash_table.go @@ -152,6 +152,47 @@ func (c *hashRowContainer) GetMatchedRows(probeKey uint64, probeRow chunk.Row, h return matchedRows, err } +// GetOneMatchedRow get one matched rows from probeRow. +func (c *hashRowContainer) GetOneMatchedRow(probeKey uint64, probeRow chunk.Row, hCtx *hashContext) (*chunk.Row, error) { + var err error + innerEntry := c.hashTable.Get(probeKey) + if innerEntry == nil { + return nil, err + } + var matchedRow chunk.Row + + if c.chkBuf != nil { + c.chkBuf.Reset() + } + capacity := 0 + + for i := 0; innerEntry != nil; i, innerEntry = i+1, innerEntry.next { + ptr := innerEntry.ptr + matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunkIfInDisk(ptr, c.chkBuf) + if err != nil { + return nil, err + } + var ok bool + ok, err = c.matchJoinKey(matchedRow, probeRow, hCtx) + if err != nil { + return nil, err + } + if ok { + return &matchedRow, nil + } + atomic.AddInt64(&c.stat.probeCollision, 1) + if i == 0 { + capacity = c.chkBuf.Capacity() + if capacity < 128 { + capacity = 128 + } + } else if (i+1)%capacity == 0 { + c.chkBuf.Reset() + } + } + return nil, err +} + func (c *hashRowContainer) GetAllMatchedRows(probeHCtx *hashContext, probeSideRow chunk.Row, probeKeyNullBits *bitmap.ConcurrentBitmap, matched []chunk.Row, needCheckBuildColPos, needCheckProbeColPos []int, needCheckBuildTypes, needCheckProbeTypes []*types.FieldType) ([]chunk.Row, error) { // for NAAJ probe row with null, we should match them with all build rows. @@ -232,7 +273,11 @@ const rowPtrSize = int64(unsafe.Sizeof(chunk.RowPtr{})) // h and buf. func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext, matched []chunk.Row, matchedPtrs []chunk.RowPtr, needPtr bool) ([]chunk.Row, []chunk.RowPtr, error) { var err error - innerPtrs := c.hashTable.Get(probeKey) + entry := c.hashTable.Get(probeKey) + var innerPtrs []chunk.RowPtr + for ; entry != nil; entry = entry.next { + innerPtrs = append(innerPtrs, entry.ptr) + } if len(innerPtrs) == 0 { return nil, nil, err } @@ -565,7 +610,12 @@ func (es *entryStore) GetStore() (e *entry, memDelta int64) { type baseHashTable interface { Put(hashKey uint64, rowPtr chunk.RowPtr) - Get(hashKey uint64) (rowPtrs []chunk.RowPtr) + // e := Get(hashKey) + // for ; e != nil; e = e.next { + // rowPtr := e.ptr + // ... + // } + Get(hashKey uint64) *entry Len() uint64 // GetAndCleanMemoryDelta gets and cleans the memDelta of the baseHashTable. Memory delta will be cleared after each fetch. // It indicates the memory delta of the baseHashTable since the last calling GetAndCleanMemoryDelta(). @@ -611,13 +661,9 @@ func (ht *unsafeHashTable) Put(hashKey uint64, rowPtr chunk.RowPtr) { } // Get gets the values of the "key" and appends them to "values". -func (ht *unsafeHashTable) Get(hashKey uint64) (rowPtrs []chunk.RowPtr) { +func (ht *unsafeHashTable) Get(hashKey uint64) *entry { entryAddr := ht.hashMap[hashKey] - for entryAddr != nil { - rowPtrs = append(rowPtrs, entryAddr.ptr) - entryAddr = entryAddr.next - } - return + return entryAddr } // Len returns the number of rowPtrs in the unsafeHashTable, the number of keys may be less than Len @@ -674,13 +720,9 @@ func (ht *concurrentMapHashTable) Put(hashKey uint64, rowPtr chunk.RowPtr) { } // Get gets the values of the "key" and appends them to "values". -func (ht *concurrentMapHashTable) Get(hashKey uint64) (rowPtrs []chunk.RowPtr) { +func (ht *concurrentMapHashTable) Get(hashKey uint64) *entry { entryAddr, _ := ht.hashMap.Get(hashKey) - for entryAddr != nil { - rowPtrs = append(rowPtrs, entryAddr.ptr) - entryAddr = entryAddr.next - } - return + return entryAddr } // Iter gets the every value of the hash table. diff --git a/pkg/executor/index_lookup_hash_join.go b/pkg/executor/index_lookup_hash_join.go index 22eb3a16ae366..93989d55f4790 100644 --- a/pkg/executor/index_lookup_hash_join.go +++ b/pkg/executor/index_lookup_hash_join.go @@ -87,7 +87,6 @@ type indexHashJoinOuterWorker struct { type indexHashJoinInnerWorker struct { innerWorker - matchedOuterPtrs []chunk.RowPtr joiner joiner joinChkResourceCh chan *chunk.Chunk // resultCh is valid only when indexNestedLoopHashJoin do not need to keep @@ -436,7 +435,6 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, joiner: e.joiners[workerID], joinChkResourceCh: e.joinChkResourceCh[workerID], resultCh: e.resultCh, - matchedOuterPtrs: make([]chunk.RowPtr, 0, e.MaxChunkSize()), joinKeyBuf: make([]byte, 1), outerRowStatus: make([]outerRowStatusFlag, 0, e.MaxChunkSize()), rowIter: chunk.NewIterator4Slice([]chunk.Row{}).(*chunk.Iterator4Slice), @@ -712,15 +710,14 @@ func (iw *indexHashJoinInnerWorker) getMatchedOuterRows(innerRow chunk.Row, task if err != nil { return nil, nil, err } - iw.matchedOuterPtrs = task.lookupMap.Get(h.Sum64()) - if len(iw.matchedOuterPtrs) == 0 { + matchedOuterEntry := task.lookupMap.Get(h.Sum64()) + if matchedOuterEntry == nil { return nil, nil, nil } joinType := JoinerType(iw.joiner) isSemiJoin := joinType == plannercore.SemiJoin || joinType == plannercore.LeftOuterSemiJoin - matchedRows = make([]chunk.Row, 0, len(iw.matchedOuterPtrs)) - matchedRowPtr = make([]chunk.RowPtr, 0, len(iw.matchedOuterPtrs)) - for _, ptr := range iw.matchedOuterPtrs { + for ; matchedOuterEntry != nil; matchedOuterEntry = matchedOuterEntry.next { + ptr := matchedOuterEntry.ptr outerRow := task.outerResult.GetRow(ptr) ok, err := codec.EqualChunkRow(iw.ctx.GetSessionVars().StmtCtx, innerRow, iw.hashTypes, iw.hashCols, outerRow, iw.outerCtx.hashTypes, iw.outerCtx.hashCols) if err != nil { diff --git a/pkg/executor/join.go b/pkg/executor/join.go index efaa228b74168..8942c353db3f9 100644 --- a/pkg/executor/join.go +++ b/pkg/executor/join.go @@ -925,8 +925,18 @@ func (w *probeWorker) joinNAAJMatchProbeSideRow2Chunk(probeKey uint64, probeKeyN func (w *probeWorker) joinMatchedProbeSideRow2Chunk(probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) { var err error - w.buildSideRows, err = w.rowContainerForProbe.GetMatchedRows(probeKey, probeSideRow, hCtx, w.buildSideRows) - buildSideRows := w.buildSideRows + var buildSideRows []chunk.Row + if w.joiner.isSemiJoinWithoutCondition() { + var rowPtr *chunk.Row + rowPtr, err = w.rowContainerForProbe.GetOneMatchedRow(probeKey, probeSideRow, hCtx) + if rowPtr != nil { + buildSideRows = append(buildSideRows, *rowPtr) + } + } else { + w.buildSideRows, err = w.rowContainerForProbe.GetMatchedRows(probeKey, probeSideRow, hCtx, w.buildSideRows) + buildSideRows = w.buildSideRows + } + if err != nil { joinResult.err = err return false, joinResult diff --git a/pkg/executor/joiner.go b/pkg/executor/joiner.go index db46d6ccebb3f..381fe590aac6f 100644 --- a/pkg/executor/joiner.go +++ b/pkg/executor/joiner.go @@ -106,6 +106,11 @@ type joiner interface { // parameter passed to `onMissMatch`. onMissMatch(hasNull bool, outer chunk.Row, chk *chunk.Chunk) + // isSemiJoinWithoutCondition returns if it's a semi join and has no condition. + // If true, at most one matched row is needed to match inners, which can optimize a lot when + // there are a lot of matched rows. + isSemiJoinWithoutCondition() bool + // Clone deep copies a joiner. Clone() joiner } @@ -426,6 +431,10 @@ func (j *semiJoiner) tryToMatchOuters(outers chunk.Iterator, inner chunk.Row, ch func (*semiJoiner) onMissMatch(bool, chunk.Row, *chunk.Chunk) {} +func (j *semiJoiner) isSemiJoinWithoutCondition() bool { + return len(j.conditions) == 0 +} + // Clone implements joiner interface. func (j *semiJoiner) Clone() joiner { return &semiJoiner{baseJoiner: j.baseJoiner.Clone()} @@ -490,6 +499,10 @@ func (naaj *nullAwareAntiSemiJoiner) onMissMatch(_ bool, outer chunk.Row, chk *c chk.AppendRowByColIdxs(outer, naaj.lUsed) } +func (naaj *nullAwareAntiSemiJoiner) isSemiJoinWithoutCondition() bool { + return len(naaj.conditions) == 0 +} + func (naaj *nullAwareAntiSemiJoiner) Clone() joiner { return &nullAwareAntiSemiJoiner{baseJoiner: naaj.baseJoiner.Clone()} } @@ -559,6 +572,10 @@ func (j *antiSemiJoiner) onMissMatch(hasNull bool, outer chunk.Row, chk *chunk.C } } +func (j *antiSemiJoiner) isSemiJoinWithoutCondition() bool { + return len(j.conditions) == 0 +} + func (j *antiSemiJoiner) Clone() joiner { return &antiSemiJoiner{baseJoiner: j.baseJoiner.Clone()} } @@ -641,6 +658,10 @@ func (j *leftOuterSemiJoiner) onMissMatch(hasNull bool, outer chunk.Row, chk *ch } } +func (j *leftOuterSemiJoiner) isSemiJoinWithoutCondition() bool { + return len(j.conditions) == 0 +} + func (j *leftOuterSemiJoiner) Clone() joiner { return &leftOuterSemiJoiner{baseJoiner: j.baseJoiner.Clone()} } @@ -713,8 +734,12 @@ func (*nullAwareAntiLeftOuterSemiJoiner) tryToMatchOuters(chunk.Iterator, chunk. return nil, err } +func (naal *nullAwareAntiLeftOuterSemiJoiner) isSemiJoinWithoutCondition() bool { + return len(naal.conditions) == 0 +} + func (naal *nullAwareAntiLeftOuterSemiJoiner) Clone() joiner { - return &antiLeftOuterSemiJoiner{baseJoiner: naal.baseJoiner.Clone()} + return &nullAwareAntiLeftOuterSemiJoiner{baseJoiner: naal.baseJoiner.Clone()} } type antiLeftOuterSemiJoiner struct { @@ -798,6 +823,10 @@ func (j *antiLeftOuterSemiJoiner) onMissMatch(hasNull bool, outer chunk.Row, chk } } +func (j *antiLeftOuterSemiJoiner) isSemiJoinWithoutCondition() bool { + return len(j.conditions) == 0 +} + func (j *antiLeftOuterSemiJoiner) Clone() joiner { return &antiLeftOuterSemiJoiner{baseJoiner: j.baseJoiner.Clone()} } @@ -877,6 +906,10 @@ func (j *leftOuterJoiner) onMissMatch(_ bool, outer chunk.Row, chk *chunk.Chunk) chk.AppendPartialRowByColIdxs(lWide, j.defaultInner, j.rUsed) } +func (*leftOuterJoiner) isSemiJoinWithoutCondition() bool { + return false +} + func (j *leftOuterJoiner) Clone() joiner { return &leftOuterJoiner{baseJoiner: j.baseJoiner.Clone()} } @@ -952,6 +985,10 @@ func (j *rightOuterJoiner) onMissMatch(_ bool, outer chunk.Row, chk *chunk.Chunk chk.AppendPartialRowByColIdxs(lWide, outer, j.rUsed) } +func (*rightOuterJoiner) isSemiJoinWithoutCondition() bool { + return false +} + func (j *rightOuterJoiner) Clone() joiner { return &rightOuterJoiner{baseJoiner: j.baseJoiner.Clone()} } @@ -1035,6 +1072,10 @@ func (j *innerJoiner) tryToMatchOuters(outers chunk.Iterator, inner chunk.Row, c func (*innerJoiner) onMissMatch(bool, chunk.Row, *chunk.Chunk) {} +func (*innerJoiner) isSemiJoinWithoutCondition() bool { + return false +} + func (j *innerJoiner) Clone() joiner { return &innerJoiner{baseJoiner: j.baseJoiner.Clone()} }