From cdfd97878586fbece1c3415102713f7e39b1f87a Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Wed, 11 May 2022 10:59:05 +0800 Subject: [PATCH 1/3] iterator reuse --- executor/join.go | 2 ++ util/chunk/iterator.go | 23 ++++++++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/executor/join.go b/executor/join.go index 702a5cf23632c..8d5af5c4fcf89 100644 --- a/executor/join.go +++ b/executor/join.go @@ -497,6 +497,7 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui } iter := chunk.NewIterator4Slice(buildSideRows) + defer chunk.FreeIterator(iter) var outerMatchStatus []outerRowStatusFlag rowIdx, ok := 0, false for iter.Begin(); iter.Current() != iter.End(); { @@ -533,6 +534,7 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uin return true, joinResult } iter := chunk.NewIterator4Slice(buildSideRows) + defer chunk.FreeIterator(iter) hasMatch, hasNull, ok := false, false, false for iter.Begin(); iter.Current() != iter.End(); { matched, isNull, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter, joinResult.chk) diff --git a/util/chunk/iterator.go b/util/chunk/iterator.go index ed9905b916a93..e62ef08212349 100644 --- a/util/chunk/iterator.go +++ b/util/chunk/iterator.go @@ -14,6 +14,8 @@ package chunk +import "sync" + var ( _ Iterator = (*Iterator4Chunk)(nil) _ Iterator = (*iterator4RowPtr)(nil) @@ -23,6 +25,22 @@ var ( _ Iterator = (*multiIterator)(nil) ) +var ( + iterator4SlicePool = &sync.Pool{New: func() any { return new(iterator4Slice) }} +) + +// FreeIterator try to free and reuse the iterator. +func FreeIterator(it any) { + switch it := it.(type) { + case *iterator4Slice: + it.rows = nil + it.cursor = 0 + iterator4SlicePool.Put(it) + default: + // Do Nothing. + } +} + // Iterator is used to iterate a number of rows. // // for row := it.Begin(); row != it.End(); row = it.Next() { @@ -53,7 +71,10 @@ type Iterator interface { // NewIterator4Slice returns a Iterator for Row slice. func NewIterator4Slice(rows []Row) Iterator { - return &iterator4Slice{rows: rows} + it := iterator4SlicePool.Get().(*iterator4Slice) + it.rows = rows + it.cursor = 0 + return it } type iterator4Slice struct { From 90ffaf2d3746e459390a294d60551d6525f2213c Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Wed, 11 May 2022 11:32:40 +0800 Subject: [PATCH 2/3] row&rowptrs reuse --- executor/hash_table.go | 15 ++++++++------- executor/hash_table_test.go | 2 +- executor/join.go | 17 ++++++++++++++--- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index adf6f65832770..cd108fabb5031 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -112,23 +112,24 @@ func (c *hashRowContainer) ShallowCopy() *hashRowContainer { // GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called // in multiple goroutines while each goroutine should keep its own // h and buf. -func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext) (matched []chunk.Row, matchedPtrs []chunk.RowPtr, err error) { +func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext, matched []chunk.Row, matchedPtrs []chunk.RowPtr) ([]chunk.Row, []chunk.RowPtr, error) { + var err error innerPtrs := c.hashTable.Get(probeKey) if len(innerPtrs) == 0 { - return + return nil, nil, err } - matched = make([]chunk.Row, 0, len(innerPtrs)) + matched = matched[:0] var matchedRow chunk.Row - matchedPtrs = make([]chunk.RowPtr, 0, len(innerPtrs)) + matchedPtrs = matchedPtrs[:0] for _, ptr := range innerPtrs { matchedRow, err = c.rowContainer.GetRow(ptr) if err != nil { - return + return nil, nil, err } var ok bool ok, err = c.matchJoinKey(matchedRow, probeRow, hCtx) if err != nil { - return + return nil, nil, err } if !ok { atomic.AddInt64(&c.stat.probeCollision, 1) @@ -137,7 +138,7 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk matched = append(matched, matchedRow) matchedPtrs = append(matchedPtrs, ptr) } - return + return matched, matchedPtrs, err } // matchJoinKey checks if join keys of buildRow and probeRow are logically equal. diff --git a/executor/hash_table_test.go b/executor/hash_table_test.go index e365f0165b84a..f5e70291efee3 100644 --- a/executor/hash_table_test.go +++ b/executor/hash_table_test.go @@ -158,7 +158,7 @@ func testHashRowContainer(t *testing.T, hashFunc func() hash.Hash64, spill bool) } probeCtx.hasNull = make([]bool, 1) probeCtx.hashVals = append(hCtx.hashVals, hashFunc()) - matched, _, err := rowContainer.GetMatchedRowsAndPtrs(hCtx.hashVals[1].Sum64(), probeRow, probeCtx) + matched, _, err := rowContainer.GetMatchedRowsAndPtrs(hCtx.hashVals[1].Sum64(), probeRow, probeCtx, nil, nil) require.NoError(t, err) require.Equal(t, 2, len(matched)) require.Equal(t, chk0.GetRow(1).GetDatumRow(colTypes), matched[0].GetDatumRow(colTypes)) diff --git a/executor/join.go b/executor/join.go index 8d5af5c4fcf89..dd80cb457ec72 100644 --- a/executor/join.go +++ b/executor/join.go @@ -94,6 +94,9 @@ type HashJoinExec struct { finished atomic.Value stats *hashJoinRuntimeStats + + buildSideRows [][]chunk.Row + buildSideRowPtrs [][]chunk.RowPtr } // probeChkResource stores the result of the join probe side fetch worker, @@ -148,7 +151,8 @@ func (e *HashJoinExec) Close() error { terror.Call(e.rowContainer.Close) } e.outerMatchedStatus = e.outerMatchedStatus[:0] - + e.buildSideRows = nil + e.buildSideRowPtrs = nil if e.stats != nil && e.rowContainer != nil { e.stats.hashStat = *e.rowContainer.stat } @@ -328,6 +332,9 @@ func (e *HashJoinExec) initializeForProbe() { // e.joinResultCh is for transmitting the join result chunks to the main // thread. e.joinResultCh = make(chan *hashjoinWorkerResult, e.concurrency+1) + + e.buildSideRows = make([][]chunk.Row, e.concurrency) + e.buildSideRowPtrs = make([][]chunk.RowPtr, e.concurrency) } func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) { @@ -487,7 +494,9 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { } func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) { - buildSideRows, rowsPtrs, err := rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx) + var err error + e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID], e.buildSideRowPtrs[workerID]) + buildSideRows, rowsPtrs := e.buildSideRows[workerID], e.buildSideRowPtrs[workerID] if err != nil { joinResult.err = err return false, joinResult @@ -524,7 +533,9 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui } func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) { - buildSideRows, _, err := rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx) + var err error + e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID], e.buildSideRowPtrs[workerID]) + buildSideRows := e.buildSideRows[workerID] if err != nil { joinResult.err = err return false, joinResult From 271a46b3dfc6449db529e5815419ab248b9fc20c Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 7 Jun 2022 14:19:52 +0800 Subject: [PATCH 3/3] add comments --- executor/join.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/join.go b/executor/join.go index dd80cb457ec72..b2076d459d82c 100644 --- a/executor/join.go +++ b/executor/join.go @@ -95,6 +95,7 @@ type HashJoinExec struct { stats *hashJoinRuntimeStats + // We pre-alloc and reuse the Rows and RowPtrs for each probe goroutine, to avoid allocation frequently buildSideRows [][]chunk.Row buildSideRowPtrs [][]chunk.RowPtr }