Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: reuse iterator4Slice and Row/RowPtrs slice in HashJoin. #34878

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,24 @@ func (c *hashRowContainer) ShallowCopy() *hashRowContainer {
// GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called
// in multiple goroutines while each goroutine should keep its own
// h and buf.
func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext) (matched []chunk.Row, matchedPtrs []chunk.RowPtr, err error) {
func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext, matched []chunk.Row, matchedPtrs []chunk.RowPtr) ([]chunk.Row, []chunk.RowPtr, error) {
var err error
innerPtrs := c.hashTable.Get(probeKey)
if len(innerPtrs) == 0 {
return
return nil, nil, err
}
matched = make([]chunk.Row, 0, len(innerPtrs))
matched = matched[:0]
var matchedRow chunk.Row
matchedPtrs = make([]chunk.RowPtr, 0, len(innerPtrs))
matchedPtrs = matchedPtrs[:0]
for _, ptr := range innerPtrs {
matchedRow, err = c.rowContainer.GetRow(ptr)
if err != nil {
return
return nil, nil, err
}
var ok bool
ok, err = c.matchJoinKey(matchedRow, probeRow, hCtx)
if err != nil {
return
return nil, nil, err
}
if !ok {
atomic.AddInt64(&c.stat.probeCollision, 1)
Expand All @@ -137,7 +138,7 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
matched = append(matched, matchedRow)
matchedPtrs = append(matchedPtrs, ptr)
}
return
return matched, matchedPtrs, err
}

// matchJoinKey checks if join keys of buildRow and probeRow are logically equal.
Expand Down
2 changes: 1 addition & 1 deletion executor/hash_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func testHashRowContainer(t *testing.T, hashFunc func() hash.Hash64, spill bool)
}
probeCtx.hasNull = make([]bool, 1)
probeCtx.hashVals = append(hCtx.hashVals, hashFunc())
matched, _, err := rowContainer.GetMatchedRowsAndPtrs(hCtx.hashVals[1].Sum64(), probeRow, probeCtx)
matched, _, err := rowContainer.GetMatchedRowsAndPtrs(hCtx.hashVals[1].Sum64(), probeRow, probeCtx, nil, nil)
require.NoError(t, err)
require.Equal(t, 2, len(matched))
require.Equal(t, chk0.GetRow(1).GetDatumRow(colTypes), matched[0].GetDatumRow(colTypes))
Expand Down
20 changes: 17 additions & 3 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ type HashJoinExec struct {
finished atomic.Value

stats *hashJoinRuntimeStats

// We pre-alloc and reuse the Rows and RowPtrs for each probe goroutine, to avoid allocation frequently
buildSideRows [][]chunk.Row
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment for these two attributes.

buildSideRowPtrs [][]chunk.RowPtr
}

// probeChkResource stores the result of the join probe side fetch worker,
Expand Down Expand Up @@ -148,7 +152,8 @@ func (e *HashJoinExec) Close() error {
terror.Call(e.rowContainer.Close)
}
e.outerMatchedStatus = e.outerMatchedStatus[:0]

e.buildSideRows = nil
e.buildSideRowPtrs = nil
if e.stats != nil && e.rowContainer != nil {
e.stats.hashStat = *e.rowContainer.stat
}
Expand Down Expand Up @@ -328,6 +333,9 @@ func (e *HashJoinExec) initializeForProbe() {
// e.joinResultCh is for transmitting the join result chunks to the main
// thread.
e.joinResultCh = make(chan *hashjoinWorkerResult, e.concurrency+1)

e.buildSideRows = make([][]chunk.Row, e.concurrency)
e.buildSideRowPtrs = make([][]chunk.RowPtr, e.concurrency)
}

func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) {
Expand Down Expand Up @@ -487,7 +495,9 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) {
}

func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) {
buildSideRows, rowsPtrs, err := rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx)
var err error
e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID], e.buildSideRowPtrs[workerID])
buildSideRows, rowsPtrs := e.buildSideRows[workerID], e.buildSideRowPtrs[workerID]
if err != nil {
joinResult.err = err
return false, joinResult
Expand All @@ -497,6 +507,7 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui
}

iter := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter)
var outerMatchStatus []outerRowStatusFlag
rowIdx, ok := 0, false
for iter.Begin(); iter.Current() != iter.End(); {
Expand All @@ -523,7 +534,9 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui
}
func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext,
rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) {
buildSideRows, _, err := rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx)
var err error
e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID], e.buildSideRowPtrs[workerID])
buildSideRows := e.buildSideRows[workerID]
if err != nil {
joinResult.err = err
return false, joinResult
Expand All @@ -533,6 +546,7 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uin
return true, joinResult
}
iter := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter)
hasMatch, hasNull, ok := false, false, false
for iter.Begin(); iter.Current() != iter.End(); {
matched, isNull, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter, joinResult.chk)
Expand Down
23 changes: 22 additions & 1 deletion util/chunk/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package chunk

import "sync"

var (
_ Iterator = (*Iterator4Chunk)(nil)
_ Iterator = (*iterator4RowPtr)(nil)
Expand All @@ -23,6 +25,22 @@ var (
_ Iterator = (*multiIterator)(nil)
)

var (
iterator4SlicePool = &sync.Pool{New: func() any { return new(iterator4Slice) }}
)

// FreeIterator try to free and reuse the iterator.
func FreeIterator(it any) {
switch it := it.(type) {
case *iterator4Slice:
it.rows = nil
it.cursor = 0
iterator4SlicePool.Put(it)
default:
// Do Nothing.
}
}

// Iterator is used to iterate a number of rows.
//
// for row := it.Begin(); row != it.End(); row = it.Next() {
Expand Down Expand Up @@ -53,7 +71,10 @@ type Iterator interface {

// NewIterator4Slice returns a Iterator for Row slice.
func NewIterator4Slice(rows []Row) Iterator {
return &iterator4Slice{rows: rows}
it := iterator4SlicePool.Get().(*iterator4Slice)
it.rows = rows
it.cursor = 0
return it
}

type iterator4Slice struct {
Expand Down