Skip to content

Commit

Permalink
Revert "executor: reuse iterator4Slice and Row/RowPtrs slice in HashJ…
Browse files Browse the repository at this point in the history
…oin. (pingcap#34878)"

This reverts commit 8a4d457.
  • Loading branch information
XuHuaiyu committed Aug 2, 2022
1 parent 29fbcdf commit 2bbda78
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 48 deletions.
15 changes: 7 additions & 8 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,23 @@ func (c *hashRowContainer) ShallowCopy() *hashRowContainer {
// GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called
// in multiple goroutines while each goroutine should keep its own
// h and buf.
func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext, matched []chunk.Row, matchedPtrs []chunk.RowPtr) ([]chunk.Row, []chunk.RowPtr, error) {
var err error
func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk.Row, hCtx *hashContext) (matched []chunk.Row, matchedPtrs []chunk.RowPtr, err error) {
innerPtrs := c.hashTable.Get(probeKey)
if len(innerPtrs) == 0 {
return nil, nil, err
return
}
matched = matched[:0]
matched = make([]chunk.Row, 0, len(innerPtrs))
var matchedRow chunk.Row
matchedPtrs = matchedPtrs[:0]
matchedPtrs = make([]chunk.RowPtr, 0, len(innerPtrs))
for _, ptr := range innerPtrs {
matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunk(ptr, c.chkBuf)
if err != nil {
return nil, nil, err
return
}
var ok bool
ok, err = c.matchJoinKey(matchedRow, probeRow, hCtx)
if err != nil {
return nil, nil, err
return
}
if !ok {
atomic.AddInt64(&c.stat.probeCollision, 1)
Expand All @@ -141,7 +140,7 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
matched = append(matched, matchedRow)
matchedPtrs = append(matchedPtrs, ptr)
}
return matched, matchedPtrs, err
return
}

// matchJoinKey checks if join keys of buildRow and probeRow are logically equal.
Expand Down
2 changes: 1 addition & 1 deletion executor/hash_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func testHashRowContainer(t *testing.T, hashFunc func() hash.Hash64, spill bool)
}
probeCtx.hasNull = make([]bool, 1)
probeCtx.hashVals = append(hCtx.hashVals, hashFunc())
matched, _, err := rowContainer.GetMatchedRowsAndPtrs(hCtx.hashVals[1].Sum64(), probeRow, probeCtx, nil, nil)
matched, _, err := rowContainer.GetMatchedRowsAndPtrs(hCtx.hashVals[1].Sum64(), probeRow, probeCtx)
require.NoError(t, err)
require.Equal(t, 2, len(matched))
require.Equal(t, chk0.GetRow(1).GetDatumRow(colTypes), matched[0].GetDatumRow(colTypes))
Expand Down
20 changes: 3 additions & 17 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ type HashJoinExec struct {
finished atomic.Value

stats *hashJoinRuntimeStats

// We pre-alloc and reuse the Rows and RowPtrs for each probe goroutine, to avoid allocation frequently
buildSideRows [][]chunk.Row
buildSideRowPtrs [][]chunk.RowPtr
}

// probeChkResource stores the result of the join probe side fetch worker,
Expand Down Expand Up @@ -152,8 +148,7 @@ func (e *HashJoinExec) Close() error {
terror.Call(e.rowContainer.Close)
}
e.outerMatchedStatus = e.outerMatchedStatus[:0]
e.buildSideRows = nil
e.buildSideRowPtrs = nil

if e.stats != nil && e.rowContainer != nil {
e.stats.hashStat = *e.rowContainer.stat
}
Expand Down Expand Up @@ -333,9 +328,6 @@ func (e *HashJoinExec) initializeForProbe() {
// e.joinResultCh is for transmitting the join result chunks to the main
// thread.
e.joinResultCh = make(chan *hashjoinWorkerResult, e.concurrency+1)

e.buildSideRows = make([][]chunk.Row, e.concurrency)
e.buildSideRowPtrs = make([][]chunk.RowPtr, e.concurrency)
}

func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) {
Expand Down Expand Up @@ -495,9 +487,7 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) {
}

func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) {
var err error
e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID], e.buildSideRowPtrs[workerID])
buildSideRows, rowsPtrs := e.buildSideRows[workerID], e.buildSideRowPtrs[workerID]
buildSideRows, rowsPtrs, err := rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx)
if err != nil {
joinResult.err = err
return false, joinResult
Expand All @@ -507,7 +497,6 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui
}

iter := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter)
var outerMatchStatus []outerRowStatusFlag
rowIdx, ok := 0, false
for iter.Begin(); iter.Current() != iter.End(); {
Expand All @@ -534,9 +523,7 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui
}
func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext,
rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) {
var err error
e.buildSideRows[workerID], e.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.buildSideRows[workerID], e.buildSideRowPtrs[workerID])
buildSideRows := e.buildSideRows[workerID]
buildSideRows, _, err := rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx)
if err != nil {
joinResult.err = err
return false, joinResult
Expand All @@ -546,7 +533,6 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uin
return true, joinResult
}
iter := chunk.NewIterator4Slice(buildSideRows)
defer chunk.FreeIterator(iter)
hasMatch, hasNull, ok := false, false, false
for iter.Begin(); iter.Current() != iter.End(); {
matched, isNull, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter, joinResult.chk)
Expand Down
23 changes: 1 addition & 22 deletions util/chunk/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

package chunk

import "sync"

var (
_ Iterator = (*Iterator4Chunk)(nil)
_ Iterator = (*iterator4RowPtr)(nil)
Expand All @@ -25,22 +23,6 @@ var (
_ Iterator = (*multiIterator)(nil)
)

var (
iterator4SlicePool = &sync.Pool{New: func() any { return new(iterator4Slice) }}
)

// FreeIterator try to free and reuse the iterator.
func FreeIterator(it any) {
switch it := it.(type) {
case *iterator4Slice:
it.rows = nil
it.cursor = 0
iterator4SlicePool.Put(it)
default:
// Do Nothing.
}
}

// Iterator is used to iterate a number of rows.
//
// for row := it.Begin(); row != it.End(); row = it.Next() {
Expand Down Expand Up @@ -71,10 +53,7 @@ type Iterator interface {

// NewIterator4Slice returns a Iterator for Row slice.
func NewIterator4Slice(rows []Row) Iterator {
it := iterator4SlicePool.Get().(*iterator4Slice)
it.rows = rows
it.cursor = 0
return it
return &iterator4Slice{rows: rows}
}

type iterator4Slice struct {
Expand Down

0 comments on commit 2bbda78

Please sign in to comment.