From 2bbda78617a5753418b2f311049e4bb1492270ec Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 2 Aug 2022 15:28:00 +0800 Subject: [PATCH] Revert "executor: reuse iterator4Slice and Row/RowPtrs slice in HashJoin. (#34878)" This reverts commit 8a4d45706872141fe37ba5ea35d6b1833ffb4cad. --- executor/hash_table.go | 15 +++++++-------- executor/hash_table_test.go | 2 +- executor/join.go | 20 +++----------------- util/chunk/iterator.go | 23 +---------------------- 4 files changed, 12 insertions(+), 48 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index 2794b3f2c2d83..c57405597aa71 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -115,24 +115,23 @@ func (c *hashRowContainer) ShallowCopy() *hashRowContainer { // GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called // in multiple goroutines while each goroutine should keep its own // h and buf. -func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext, matched []chunk.Row, matchedPtrs []chunk.RowPtr) ([]chunk.Row, []chunk.RowPtr, error) { - var err error +func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext) (matched []chunk.Row, matchedPtrs []chunk.RowPtr, err error) { innerPtrs := c.hashTable.Get(probeKey) if len(innerPtrs) == 0 { - return nil, nil, err + return } - matched = matched[:0] + matched = make([]chunk.Row, 0, len(innerPtrs)) var matchedRow chunk.Row - matchedPtrs = matchedPtrs[:0] + matchedPtrs = make([]chunk.RowPtr, 0, len(innerPtrs)) for _, ptr := range innerPtrs { matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunk(ptr, c.chkBuf) if err != nil { - return nil, nil, err + return } var ok bool ok, err = c.matchJoinKey(matchedRow, probeRow, hCtx) if err != nil { - return nil, nil, err + return } if !ok { atomic.AddInt64(&c.stat.probeCollision, 1) @@ -141,7 +140,7 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk matched = append(matched, matchedRow) matchedPtrs = append(matchedPtrs, ptr) } - return matched, matchedPtrs, err + return } // matchJoinKey checks if join keys of buildRow and probeRow are logically equal. diff --git a/executor/hash_table_test.go b/executor/hash_table_test.go index f5e70291efee3..e365f0165b84a 100644 --- a/executor/hash_table_test.go +++ b/executor/hash_table_test.go @@ -158,7 +158,7 @@ func testHashRowContainer(t *testing.T, hashFunc func() hash.Hash64, spill bool) } probeCtx.hasNull = make([]bool, 1) probeCtx.hashVals = append(hCtx.hashVals, hashFunc()) - matched, _, err := rowContainer.GetMatchedRowsAndPtrs(hCtx.hashVals[1].Sum64(), probeRow, probeCtx, nil, nil) + matched, _, err := rowContainer.GetMatchedRowsAndPtrs(hCtx.hashVals[1].Sum64(), probeRow, probeCtx) require.NoError(t, err) require.Equal(t, 2, len(matched)) require.Equal(t, chk0.GetRow(1).GetDatumRow(colTypes), matched[0].GetDatumRow(colTypes)) diff --git a/executor/join.go b/executor/join.go index b2076d459d82c..702a5cf23632c 100644 --- a/executor/join.go +++ b/executor/join.go @@ -94,10 +94,6 @@ type HashJoinExec struct { finished atomic.Value stats *hashJoinRuntimeStats - - // We pre-alloc and reuse the Rows and RowPtrs for each probe goroutine, to avoid allocation frequently - buildSideRows [][]chunk.Row - buildSideRowPtrs [][]chunk.RowPtr } // probeChkResource stores the result of the join probe side fetch worker, @@ -152,8 +148,7 @@ func (e *HashJoinExec) Close() error { terror.Call(e.rowContainer.Close) } e.outerMatchedStatus = e.outerMatchedStatus[:0] - e.buildSideRows = nil - e.buildSideRowPtrs = nil + if e.stats != nil && e.rowContainer != nil { e.stats.hashStat = *e.rowContainer.stat } @@ -333,9 +328,6 @@ func (e *HashJoinExec) initializeForProbe() { // e.joinResultCh is for transmitting the join result chunks to the main // thread. e.joinResultCh = make(chan *hashjoinWorkerResult, e.concurrency+1) - - e.buildSideRows = make([][]chunk.Row, e.concurrency) - e.buildSideRowPtrs = make([][]chunk.RowPtr, e.concurrency) } func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) { @@ -495,9 +487,7 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { } func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) { - var err error - e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID], e.buildSideRowPtrs[workerID]) - buildSideRows, rowsPtrs := e.buildSideRows[workerID], e.buildSideRowPtrs[workerID] + buildSideRows, rowsPtrs, err := rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx) if err != nil { joinResult.err = err return false, joinResult @@ -507,7 +497,6 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui } iter := chunk.NewIterator4Slice(buildSideRows) - defer chunk.FreeIterator(iter) var outerMatchStatus []outerRowStatusFlag rowIdx, ok := 0, false for iter.Begin(); iter.Current() != iter.End(); { @@ -534,9 +523,7 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui } func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) { - var err error - e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID], e.buildSideRowPtrs[workerID]) - buildSideRows := e.buildSideRows[workerID] + buildSideRows, _, err := rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx) if err != nil { joinResult.err = err return false, joinResult @@ -546,7 +533,6 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uin return true, joinResult } iter := chunk.NewIterator4Slice(buildSideRows) - defer chunk.FreeIterator(iter) hasMatch, hasNull, ok := false, false, false for iter.Begin(); iter.Current() != iter.End(); { matched, isNull, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter, joinResult.chk) diff --git a/util/chunk/iterator.go b/util/chunk/iterator.go index 7928f748fc4b4..8b16c43e28dcc 100644 --- a/util/chunk/iterator.go +++ b/util/chunk/iterator.go @@ -14,8 +14,6 @@ package chunk -import "sync" - var ( _ Iterator = (*Iterator4Chunk)(nil) _ Iterator = (*iterator4RowPtr)(nil) @@ -25,22 +23,6 @@ var ( _ Iterator = (*multiIterator)(nil) ) -var ( - iterator4SlicePool = &sync.Pool{New: func() any { return new(iterator4Slice) }} -) - -// FreeIterator try to free and reuse the iterator. -func FreeIterator(it any) { - switch it := it.(type) { - case *iterator4Slice: - it.rows = nil - it.cursor = 0 - iterator4SlicePool.Put(it) - default: - // Do Nothing. - } -} - // Iterator is used to iterate a number of rows. // // for row := it.Begin(); row != it.End(); row = it.Next() { @@ -71,10 +53,7 @@ type Iterator interface { // NewIterator4Slice returns a Iterator for Row slice. func NewIterator4Slice(rows []Row) Iterator { - it := iterator4SlicePool.Get().(*iterator4Slice) - it.rows = rows - it.cursor = 0 - return it + return &iterator4Slice{rows: rows} } type iterator4Slice struct {