From a14be1be7ad866da96365584995720bbe3007dde Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Fri, 11 Nov 2022 11:18:36 +0800 Subject: [PATCH 1/6] executor: split hashjoin into workers(part2) --- executor/benchmark_test.go | 4 +- executor/builder.go | 3 +- executor/join.go | 156 ++++++++++++++++++------------------- 3 files changed, 80 insertions(+), 83 deletions(-) diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index 0cb1f7284159a..9216123ed4c82 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -932,9 +932,9 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) childrenUsedSchema := markChildrenUsedCols(e.Schema(), e.children[0].Schema(), e.children[1].Schema()) defaultValues := make([]types.Datum, e.buildSideExec.Schema().Len()) lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec) - e.probeWorker.joiners = make([]joiner, e.concurrency) + e.probeWorker.joiner = make([]joiner, e.concurrency) for i := uint(0); i < e.concurrency; i++ { - e.probeWorker.joiners[i] = newJoiner(testCase.ctx, e.joinType, true, defaultValues, + e.probeWorker.joiner[i] = newJoiner(testCase.ctx, e.joinType, true, defaultValues, nil, lhsTypes, rhsTypes, childrenUsedSchema, false) } memLimit := int64(-1) diff --git a/executor/builder.go b/executor/builder.go index 5c1fd12535e3b..22cb434a3e288 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1448,9 +1448,8 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo isNAJoin := len(v.LeftNAJoinKeys) > 0 e.buildSideEstCount = b.buildSideEstCount(v) childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()) - e.probeWorker.joiners = make([]joiner, e.concurrency) for i := uint(0); i < e.concurrency; i++ { - e.probeWorker.joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, + e.probeWorkers[i].joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin) } executorCountHashJoinExec.Inc() diff --git a/executor/join.go b/executor/join.go index 456d337830e78..e41a2f70f3dc5 100644 --- a/executor/join.go +++ b/executor/join.go @@ -56,17 +56,17 @@ type probeSideTupleFetcher struct { type probeWorker struct { // We pre-alloc and reuse the Rows and RowPtrs for each probe goroutine, to avoid allocation frequently - buildSideRows [][]chunk.Row - buildSideRowPtrs [][]chunk.RowPtr + buildSideRows []chunk.Row + buildSideRowPtrs []chunk.RowPtr // We build individual joiner for each join worker when use chunk-based // execution, to avoid the concurrency of joiner.chk and joiner.selected. - joiners []joiner - rowIters []*chunk.Iterator4Slice + joiner joiner + rowIters *chunk.Iterator4Slice + rowContainerForProbe *hashRowContainer // for every naaj probe worker, pre-allocate the int slice for store the join column index to check. - needCheckBuildRowPos [][]int - needCheckProbeRowPos [][]int - rowContainerForProbe []*hashRowContainer + needCheckBuildRowPos []int + needCheckProbeRowPos []int } // HashJoinExec implements the hash join algorithm. @@ -74,7 +74,7 @@ type HashJoinExec struct { baseExecutor probeSideTupleFetcher - probeWorker + probeWorkers []probeWorker buildSideExec Executor buildSideEstCount float64 outerFilter expression.CNFExprs @@ -162,10 +162,13 @@ func (e *HashJoinExec) Close() error { e.waiter.Wait() } e.outerMatchedStatus = e.outerMatchedStatus[:0] - e.probeWorker.buildSideRows = nil - e.probeWorker.buildSideRowPtrs = nil - e.probeWorker.needCheckBuildRowPos = nil - e.probeWorker.needCheckProbeRowPos = nil + for _, w := range e.probeWorkers { + w.buildSideRows = nil + w.buildSideRowPtrs = nil + w.needCheckBuildRowPos = nil + w.needCheckProbeRowPos = nil + } + if e.stats != nil && e.rowContainer != nil { e.stats.hashStat = *e.rowContainer.stat } @@ -200,7 +203,7 @@ func (e *HashJoinExec) Open(ctx context.Context) error { } if e.runtimeStats != nil { e.stats = &hashJoinRuntimeStats{ - concurrent: cap(e.probeWorker.joiners), + concurrent: int(e.concurrency), } e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } @@ -259,8 +262,8 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) { } // after building is finished. the hash null bucket slice is allocated and determined. // copy it for multi probe worker. - for i := range e.probeWorker.rowContainerForProbe { - e.probeWorker.rowContainerForProbe[i].hashNANullBucket = e.rowContainer.hashNANullBucket + for _, w := range e.probeWorkers { + w.rowContainerForProbe.hashNANullBucket = e.rowContainer.hashNANullBucket } hasWaitedForBuild = true } @@ -355,11 +358,7 @@ func (e *HashJoinExec) initializeForProbe() { // e.joinResultCh is for transmitting the join result chunks to the main // thread. e.joinResultCh = make(chan *hashjoinWorkerResult, e.concurrency+1) - - e.probeWorker.buildSideRows = make([][]chunk.Row, e.concurrency) - e.probeWorker.buildSideRowPtrs = make([][]chunk.RowPtr, e.concurrency) - e.probeWorker.needCheckBuildRowPos = make([][]int, e.concurrency) - e.probeWorker.needCheckProbeRowPos = make([][]int, e.concurrency) + e.probeWorkers = make([]probeWorker, e.concurrency) } func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) { @@ -420,7 +419,7 @@ func (e *HashJoinExec) handleUnmatchedRowsFromHashTable(workerID uint) { } for j := 0; j < chk.NumRows(); j++ { if !e.outerMatchedStatus[i].UnsafeIsSet(j) { // process unmatched outer rows - e.probeWorker.joiners[workerID].onMissMatch(false, chk.GetRow(j), joinResult.chk) + e.probeWorkers[workerID].joiner.onMissMatch(false, chk.GetRow(j), joinResult.chk) } if joinResult.chk.IsFull() { e.joinResultCh <- joinResult @@ -497,9 +496,9 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx, probeNAKeyCo } start := time.Now() if e.useOuterToBuild { - ok, joinResult = e.join2ChunkForOuterHashJoin(workerID, probeSideResult, hCtx, e.probeWorker.rowContainerForProbe[workerID], joinResult) + ok, joinResult = e.join2ChunkForOuterHashJoin(workerID, probeSideResult, hCtx, e.probeWorkers[workerID].rowContainerForProbe, joinResult) } else { - ok, joinResult = e.join2Chunk(workerID, probeSideResult, hCtx, e.probeWorker.rowContainerForProbe[workerID], joinResult, selected) + ok, joinResult = e.join2Chunk(workerID, probeSideResult, hCtx, e.probeWorkers[workerID].rowContainerForProbe, joinResult, selected) } probeTime += int64(time.Since(start)) if !ok { @@ -521,8 +520,8 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx, probeNAKeyCo func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) { var err error - e.probeWorker.buildSideRows[workerID], e.probeWorker.buildSideRowPtrs[workerID], err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.probeWorker.buildSideRows[workerID], e.probeWorker.buildSideRowPtrs[workerID], true) - buildSideRows, rowsPtrs := e.probeWorker.buildSideRows[workerID], e.probeWorker.buildSideRowPtrs[workerID] + e.probeWorkers[workerID].buildSideRows, e.probeWorkers[workerID].buildSideRowPtrs, err = rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx, e.probeWorkers[workerID].buildSideRows, e.probeWorkers[workerID].buildSideRowPtrs, true) + buildSideRows, rowsPtrs := e.probeWorkers[workerID].buildSideRows, e.probeWorkers[workerID].buildSideRowPtrs if err != nil { joinResult.err = err return false, joinResult @@ -531,12 +530,12 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID ui return true, joinResult } - iter := e.probeWorker.rowIters[workerID] + iter := e.probeWorkers[workerID].rowIters iter.Reset(buildSideRows) var outerMatchStatus []outerRowStatusFlag rowIdx, ok := 0, false for iter.Begin(); iter.Current() != iter.End(); { - outerMatchStatus, err = e.probeWorker.joiners[workerID].tryToMatchOuters(iter, probeSideRow, joinResult.chk, outerMatchStatus) + outerMatchStatus, err = e.probeWorkers[workerID].joiner.tryToMatchOuters(iter, probeSideRow, joinResult.chk, outerMatchStatus) if err != nil { joinResult.err = err return false, joinResult @@ -570,17 +569,17 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe // because AntiLeftOuterSemiJoin cares about the scalar value. If we both have a match from null // bucket and same key bucket, we should return the result as from same-key bucket // rather than from null bucket. - e.probeWorker.buildSideRows[workerID], err = rowContainer.GetMatchedRows(probeKey, probeSideRow, hCtx, e.probeWorker.buildSideRows[workerID]) - buildSideRows := e.probeWorker.buildSideRows[workerID] + e.probeWorkers[workerID].buildSideRows, err = rowContainer.GetMatchedRows(probeKey, probeSideRow, hCtx, e.probeWorkers[workerID].buildSideRows) + buildSideRows := e.probeWorkers[workerID].buildSideRows if err != nil { joinResult.err = err return false, joinResult } if len(buildSideRows) != 0 { - iter1 := e.probeWorker.rowIters[workerID] + iter1 := e.probeWorkers[workerID].rowIters iter1.Reset(buildSideRows) for iter1.Begin(); iter1.Current() != iter1.End(); { - matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk, LeftNotNullRightNotNull) + matched, _, err := e.probeWorkers[workerID].joiner.tryToMatchInners(probeSideRow, iter1, joinResult.chk, LeftNotNullRightNotNull) if err != nil { joinResult.err = err return false, joinResult @@ -600,8 +599,8 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe } } // step2: match the null bucket secondly. - e.probeWorker.buildSideRows[workerID], err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorker.buildSideRows[workerID], e.probeWorker.needCheckBuildRowPos[workerID], e.probeWorker.needCheckProbeRowPos[workerID]) - buildSideRows = e.probeWorker.buildSideRows[workerID] + e.probeWorkers[workerID].buildSideRows, err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorkers[workerID].buildSideRows, e.probeWorkers[workerID].needCheckBuildRowPos, e.probeWorkers[workerID].needCheckProbeRowPos) + buildSideRows = e.probeWorkers[workerID].buildSideRows if err != nil { joinResult.err = err return false, joinResult @@ -609,13 +608,13 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe if len(buildSideRows) == 0 { // when reach here, it means we couldn't find a valid same key match from same-key bucket yet // and the null bucket is empty. so the result should be . - e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorkers[workerID].joiner.onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } - iter2 := e.probeWorker.rowIters[workerID] + iter2 := e.probeWorkers[workerID].rowIters iter2.Reset(buildSideRows) for iter2.Begin(); iter2.Current() != iter2.End(); { - matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk, LeftNotNullRightHasNull) + matched, _, err := e.probeWorkers[workerID].joiner.tryToMatchInners(probeSideRow, iter2, joinResult.chk, LeftNotNullRightHasNull) if err != nil { joinResult.err = err return false, joinResult @@ -637,7 +636,7 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe // case1: x NOT IN (empty set): if other key bucket don't have the valid rows yet. // case2: x NOT IN (l,m,n...): if other key bucket do have the valid rows. // both cases mean the result should be - e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorkers[workerID].joiner.onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } // when left side has null values, all we want is to find a valid build side rows (past other condition) @@ -645,17 +644,17 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe // case1: NOT IN (empty set): ----------------------> result is . // case2: NOT IN (at least a valid inner row) ------------------> result is . // Step1: match null bucket (assumption that null bucket is quite smaller than all hash table bucket rows) - e.probeWorker.buildSideRows[workerID], err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorker.buildSideRows[workerID], e.probeWorker.needCheckBuildRowPos[workerID], e.probeWorker.needCheckProbeRowPos[workerID]) - buildSideRows := e.probeWorker.buildSideRows[workerID] + e.probeWorkers[workerID].buildSideRows, err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorkers[workerID].buildSideRows, e.probeWorkers[workerID].needCheckBuildRowPos, e.probeWorkers[workerID].needCheckProbeRowPos) + buildSideRows := e.probeWorkers[workerID].buildSideRows if err != nil { joinResult.err = err return false, joinResult } if len(buildSideRows) != 0 { - iter1 := e.probeWorker.rowIters[workerID] + iter1 := e.probeWorkers[workerID].rowIters iter1.Reset(buildSideRows) for iter1.Begin(); iter1.Current() != iter1.End(); { - matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk, LeftHasNullRightHasNull) + matched, _, err := e.probeWorkers[workerID].joiner.tryToMatchInners(probeSideRow, iter1, joinResult.chk, LeftHasNullRightHasNull) if err != nil { joinResult.err = err return false, joinResult @@ -675,8 +674,8 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe } } // Step2: match all hash table bucket build rows (use probeKeyNullBits to filter if any). - e.probeWorker.buildSideRows[workerID], err = rowContainer.GetAllMatchedRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorker.buildSideRows[workerID], e.probeWorker.needCheckBuildRowPos[workerID], e.probeWorker.needCheckProbeRowPos[workerID]) - buildSideRows = e.probeWorker.buildSideRows[workerID] + e.probeWorkers[workerID].buildSideRows, err = rowContainer.GetAllMatchedRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorkers[workerID].buildSideRows, e.probeWorkers[workerID].needCheckBuildRowPos, e.probeWorkers[workerID].needCheckProbeRowPos) + buildSideRows = e.probeWorkers[workerID].buildSideRows if err != nil { joinResult.err = err return false, joinResult @@ -684,13 +683,13 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe if len(buildSideRows) == 0 { // when reach here, it means we couldn't return it quickly in null bucket, and same-bucket is empty, // which means x NOT IN (empty set) or x NOT IN (l,m,n), the result should be - e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorkers[workerID].joiner.onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } - iter2 := e.probeWorker.rowIters[workerID] + iter2 := e.probeWorkers[workerID].rowIters iter2.Reset(buildSideRows) for iter2.Begin(); iter2.Current() != iter2.End(); { - matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk, LeftHasNullRightNotNull) + matched, _, err := e.probeWorkers[workerID].joiner.tryToMatchInners(probeSideRow, iter2, joinResult.chk, LeftHasNullRightNotNull) if err != nil { joinResult.err = err return false, joinResult @@ -711,7 +710,7 @@ func (e *HashJoinExec) joinNAALOSJMatchProbeSideRow2Chunk(workerID uint, probeKe // step3: if we couldn't return it quickly in null bucket and all hash bucket, here means only one cases: // case1: NOT IN (empty set): // empty set comes from no rows from all bucket can pass other condition. the result should be - e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorkers[workerID].joiner.onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } @@ -725,17 +724,17 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey if probeKeyNullBits == nil { // step1: match null bucket first. // need fetch the "valid" rows every time. (nullBits map check is necessary) - e.probeWorker.buildSideRows[workerID], err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorker.buildSideRows[workerID], e.probeWorker.needCheckBuildRowPos[workerID], e.probeWorker.needCheckProbeRowPos[workerID]) - buildSideRows := e.probeWorker.buildSideRows[workerID] + e.probeWorkers[workerID].buildSideRows, err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorkers[workerID].buildSideRows, e.probeWorkers[workerID].needCheckBuildRowPos, e.probeWorkers[workerID].needCheckProbeRowPos) + buildSideRows := e.probeWorkers[workerID].buildSideRows if err != nil { joinResult.err = err return false, joinResult } if len(buildSideRows) != 0 { - iter1 := e.probeWorker.rowIters[workerID] + iter1 := e.probeWorkers[workerID].rowIters iter1.Reset(buildSideRows) for iter1.Begin(); iter1.Current() != iter1.End(); { - matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk) + matched, _, err := e.probeWorkers[workerID].joiner.tryToMatchInners(probeSideRow, iter1, joinResult.chk) if err != nil { joinResult.err = err return false, joinResult @@ -755,8 +754,8 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey } } // step2: then same key bucket. - e.probeWorker.buildSideRows[workerID], err = rowContainer.GetMatchedRows(probeKey, probeSideRow, hCtx, e.probeWorker.buildSideRows[workerID]) - buildSideRows = e.probeWorker.buildSideRows[workerID] + e.probeWorkers[workerID].buildSideRows, err = rowContainer.GetMatchedRows(probeKey, probeSideRow, hCtx, e.probeWorkers[workerID].buildSideRows) + buildSideRows = e.probeWorkers[workerID].buildSideRows if err != nil { joinResult.err = err return false, joinResult @@ -764,13 +763,13 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey if len(buildSideRows) == 0 { // when reach here, it means we couldn't return it quickly in null bucket, and same-bucket is empty, // which means x NOT IN (empty set), accept the rhs row. - e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorkers[workerID].joiner.onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } - iter2 := e.probeWorker.rowIters[workerID] + iter2 := e.probeWorkers[workerID].rowIters iter2.Reset(buildSideRows) for iter2.Begin(); iter2.Current() != iter2.End(); { - matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk) + matched, _, err := e.probeWorkers[workerID].joiner.tryToMatchInners(probeSideRow, iter2, joinResult.chk) if err != nil { joinResult.err = err return false, joinResult @@ -792,7 +791,7 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey // case1: x NOT IN (empty set): if other key bucket don't have the valid rows yet. // case2: x NOT IN (l,m,n...): if other key bucket do have the valid rows. // both cases should accept the rhs row. - e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorkers[workerID].joiner.onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } // when left side has null values, all we want is to find a valid build side rows (passed from other condition) @@ -800,17 +799,17 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey // case1: NOT IN (empty set): ----------------------> accept rhs row. // case2: NOT IN (at least a valid inner row) ------------------> unknown result, refuse rhs row. // Step1: match null bucket (assumption that null bucket is quite smaller than all hash table bucket rows) - e.probeWorker.buildSideRows[workerID], err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorker.buildSideRows[workerID], e.probeWorker.needCheckBuildRowPos[workerID], e.probeWorker.needCheckProbeRowPos[workerID]) - buildSideRows := e.probeWorker.buildSideRows[workerID] + e.probeWorkers[workerID].buildSideRows, err = rowContainer.GetNullBucketRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorkers[workerID].buildSideRows, e.probeWorkers[workerID].needCheckBuildRowPos, e.probeWorkers[workerID].needCheckProbeRowPos) + buildSideRows := e.probeWorkers[workerID].buildSideRows if err != nil { joinResult.err = err return false, joinResult } if len(buildSideRows) != 0 { - iter1 := e.probeWorker.rowIters[workerID] + iter1 := e.probeWorkers[workerID].rowIters iter1.Reset(buildSideRows) for iter1.Begin(); iter1.Current() != iter1.End(); { - matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter1, joinResult.chk) + matched, _, err := e.probeWorkers[workerID].joiner.tryToMatchInners(probeSideRow, iter1, joinResult.chk) if err != nil { joinResult.err = err return false, joinResult @@ -830,8 +829,8 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey } } // Step2: match all hash table bucket build rows. - e.probeWorker.buildSideRows[workerID], err = rowContainer.GetAllMatchedRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorker.buildSideRows[workerID], e.probeWorker.needCheckBuildRowPos[workerID], e.probeWorker.needCheckProbeRowPos[workerID]) - buildSideRows = e.probeWorker.buildSideRows[workerID] + e.probeWorkers[workerID].buildSideRows, err = rowContainer.GetAllMatchedRows(hCtx, probeSideRow, probeKeyNullBits, e.probeWorkers[workerID].buildSideRows, e.probeWorkers[workerID].needCheckBuildRowPos, e.probeWorkers[workerID].needCheckProbeRowPos) + buildSideRows = e.probeWorkers[workerID].buildSideRows if err != nil { joinResult.err = err return false, joinResult @@ -839,13 +838,13 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey if len(buildSideRows) == 0 { // when reach here, it means we couldn't return it quickly in null bucket, and same-bucket is empty, // which means NOT IN (empty set) or NOT IN (no valid rows) accept the rhs row. - e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorkers[workerID].joiner.onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } - iter2 := e.probeWorker.rowIters[workerID] + iter2 := e.probeWorkers[workerID].rowIters iter2.Reset(buildSideRows) for iter2.Begin(); iter2.Current() != iter2.End(); { - matched, _, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter2, joinResult.chk) + matched, _, err := e.probeWorkers[workerID].joiner.tryToMatchInners(probeSideRow, iter2, joinResult.chk) if err != nil { joinResult.err = err return false, joinResult @@ -866,7 +865,7 @@ func (e *HashJoinExec) joinNAASJMatchProbeSideRow2Chunk(workerID uint, probeKey // step3: if we couldn't return it quickly in null bucket and all hash bucket, here means only one cases: // case1: NOT IN (empty set): // empty set comes from no rows from all bucket can pass other condition. we should accept the rhs row. - e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorkers[workerID].joiner.onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } @@ -906,21 +905,21 @@ func (e *HashJoinExec) joinNAAJMatchProbeSideRow2Chunk(workerID uint, probeKey u func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext, rowContainer *hashRowContainer, joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) { var err error - e.probeWorker.buildSideRows[workerID], err = rowContainer.GetMatchedRows(probeKey, probeSideRow, hCtx, e.probeWorker.buildSideRows[workerID]) - buildSideRows := e.probeWorker.buildSideRows[workerID] + e.probeWorkers[workerID].buildSideRows, err = rowContainer.GetMatchedRows(probeKey, probeSideRow, hCtx, e.probeWorkers[workerID].buildSideRows) + buildSideRows := e.probeWorkers[workerID].buildSideRows if err != nil { joinResult.err = err return false, joinResult } if len(buildSideRows) == 0 { - e.probeWorker.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk) + e.probeWorkers[workerID].joiner.onMissMatch(false, probeSideRow, joinResult.chk) return true, joinResult } - iter := e.probeWorker.rowIters[workerID] + iter := e.probeWorkers[workerID].rowIters iter.Reset(buildSideRows) hasMatch, hasNull, ok := false, false, false for iter.Begin(); iter.Current() != iter.End(); { - matched, isNull, err := e.probeWorker.joiners[workerID].tryToMatchInners(probeSideRow, iter, joinResult.chk) + matched, isNull, err := e.probeWorkers[workerID].joiner.tryToMatchInners(probeSideRow, iter, joinResult.chk) if err != nil { joinResult.err = err return false, joinResult @@ -937,7 +936,7 @@ func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uin } } if !hasMatch { - e.probeWorker.joiners[workerID].onMissMatch(hasNull, probeSideRow, joinResult.chk) + e.probeWorkers[workerID].joiner.onMissMatch(hasNull, probeSideRow, joinResult.chk) } return true, joinResult } @@ -1011,7 +1010,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx if isNAAJ { if !selected[i] { // since this is the case of using inner to build, so for an outer row unselected, we should fill the result when it's outer join. - e.probeWorker.joiners[workerID].onMissMatch(false, probeSideChk.GetRow(i), joinResult.chk) + e.probeWorkers[workerID].joiner.onMissMatch(false, probeSideChk.GetRow(i), joinResult.chk) } if hCtx.naHasNull[i] { // here means the probe join connecting column has null value in it and this is special for matching all the hash buckets @@ -1033,7 +1032,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx } else { // since this is the case of using inner to build, so for an outer row unselected, we should fill the result when it's outer join. if !selected[i] || hCtx.hasNull[i] { // process unmatched probe side rows - e.probeWorker.joiners[workerID].onMissMatch(false, probeSideChk.GetRow(i), joinResult.chk) + e.probeWorkers[workerID].joiner.onMissMatch(false, probeSideChk.GetRow(i), joinResult.chk) } else { // process matched probe side rows probeKey, probeRow := hCtx.hashVals[i].Sum64(), probeSideChk.GetRow(i) ok, joinResult = e.joinMatchedProbeSideRow2Chunk(workerID, probeKey, probeRow, hCtx, rowContainer, joinResult) @@ -1112,16 +1111,15 @@ func (e *HashJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { } e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx, retTypes(e.buildSideExec)) // we shallow copies rowContainer for each probe worker to avoid lock contention - e.probeWorker.rowContainerForProbe = make([]*hashRowContainer, e.concurrency) for i := uint(0); i < e.concurrency; i++ { if i == 0 { - e.probeWorker.rowContainerForProbe[i] = e.rowContainer + e.probeWorkers[i].rowContainerForProbe = e.rowContainer } else { - e.probeWorker.rowContainerForProbe[i] = e.rowContainer.ShallowCopy() + e.probeWorkers[i].rowContainerForProbe = e.rowContainer.ShallowCopy() } } for i := uint(0); i < e.concurrency; i++ { - e.probeWorker.rowIters = append(e.probeWorker.rowIters, chunk.NewIterator4Slice([]chunk.Row{}).(*chunk.Iterator4Slice)) + e.probeWorkers[i].rowIters = chunk.NewIterator4Slice([]chunk.Row{}).(*chunk.Iterator4Slice) } e.worker.RunWithRecover(func() { defer trace.StartRegion(ctx, "HashJoinHashTableBuilder").End() From f501b2a2dedae5e5236b879b03c78246315c991b Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Fri, 11 Nov 2022 11:21:47 +0800 Subject: [PATCH 2/6] tiny change --- executor/join.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/executor/join.go b/executor/join.go index e41a2f70f3dc5..98c6d2a93ba39 100644 --- a/executor/join.go +++ b/executor/join.go @@ -61,8 +61,8 @@ type probeWorker struct { // We build individual joiner for each join worker when use chunk-based // execution, to avoid the concurrency of joiner.chk and joiner.selected. - joiner joiner - rowIters *chunk.Iterator4Slice + joiner joiner + rowIters *chunk.Iterator4Slice rowContainerForProbe *hashRowContainer // for every naaj probe worker, pre-allocate the int slice for store the join column index to check. needCheckBuildRowPos []int @@ -162,12 +162,7 @@ func (e *HashJoinExec) Close() error { e.waiter.Wait() } e.outerMatchedStatus = e.outerMatchedStatus[:0] - for _, w := range e.probeWorkers { - w.buildSideRows = nil - w.buildSideRowPtrs = nil - w.needCheckBuildRowPos = nil - w.needCheckProbeRowPos = nil - } + e.probeWorkers = nil if e.stats != nil && e.rowContainer != nil { e.stats.hashStat = *e.rowContainer.stat From 0f203c3e20e003d17fd895098c8b7d87ead11125 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Fri, 11 Nov 2022 11:47:19 +0800 Subject: [PATCH 3/6] fix ci --- executor/benchmark_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index 9216123ed4c82..99bb6ceec9103 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -919,6 +919,7 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) probeSideTupleFetcher: probeSideTupleFetcher{ probeSideExec: outerExec, }, + probeWorkers: make([]probeWorker, testCase.concurrency), concurrency: uint(testCase.concurrency), joinType: testCase.joinType, // 0 for InnerJoin, 1 for LeftOutersJoin, 2 for RightOuterJoin isOuterJoin: false, @@ -932,9 +933,8 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) childrenUsedSchema := markChildrenUsedCols(e.Schema(), e.children[0].Schema(), e.children[1].Schema()) defaultValues := make([]types.Datum, e.buildSideExec.Schema().Len()) lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec) - e.probeWorker.joiner = make([]joiner, e.concurrency) for i := uint(0); i < e.concurrency; i++ { - e.probeWorker.joiner[i] = newJoiner(testCase.ctx, e.joinType, true, defaultValues, + e.probeWorkers[i].joiner = newJoiner(testCase.ctx, e.joinType, true, defaultValues, nil, lhsTypes, rhsTypes, childrenUsedSchema, false) } memLimit := int64(-1) From 95a1817cb78486cfeab4763f388b4484bf6981c8 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Fri, 11 Nov 2022 14:30:40 +0800 Subject: [PATCH 4/6] fix ci --- executor/builder.go | 1 + executor/join.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/builder.go b/executor/builder.go index 22cb434a3e288..4300ca70c454e 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1448,6 +1448,7 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo isNAJoin := len(v.LeftNAJoinKeys) > 0 e.buildSideEstCount = b.buildSideEstCount(v) childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()) + e.probeWorkers = make([]probeWorker, e.concurrency) for i := uint(0); i < e.concurrency; i++ { e.probeWorkers[i].joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin) diff --git a/executor/join.go b/executor/join.go index 98c6d2a93ba39..dd7a539c94dbe 100644 --- a/executor/join.go +++ b/executor/join.go @@ -353,7 +353,6 @@ func (e *HashJoinExec) initializeForProbe() { // e.joinResultCh is for transmitting the join result chunks to the main // thread. e.joinResultCh = make(chan *hashjoinWorkerResult, e.concurrency+1) - e.probeWorkers = make([]probeWorker, e.concurrency) } func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) { From cd0841594d5c4dee6377185e94e25748af590a1d Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Fri, 11 Nov 2022 15:06:56 +0800 Subject: [PATCH 5/6] fix ci --- executor/join.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/executor/join.go b/executor/join.go index dd7a539c94dbe..d566dce05c330 100644 --- a/executor/join.go +++ b/executor/join.go @@ -162,6 +162,12 @@ func (e *HashJoinExec) Close() error { e.waiter.Wait() } e.outerMatchedStatus = e.outerMatchedStatus[:0] + for _, w := range e.probeWorkers { + w.buildSideRows = nil + w.buildSideRowPtrs = nil + w.needCheckBuildRowPos = nil + w.needCheckProbeRowPos = nil + } e.probeWorkers = nil if e.stats != nil && e.rowContainer != nil { From 0cf2b61ed6a49d1edca552e667fa0976b662b5c7 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Fri, 11 Nov 2022 15:12:38 +0800 Subject: [PATCH 6/6] fix ci --- executor/join.go | 1 - 1 file changed, 1 deletion(-) diff --git a/executor/join.go b/executor/join.go index d566dce05c330..95ecee42c02d4 100644 --- a/executor/join.go +++ b/executor/join.go @@ -168,7 +168,6 @@ func (e *HashJoinExec) Close() error { w.needCheckBuildRowPos = nil w.needCheckProbeRowPos = nil } - e.probeWorkers = nil if e.stats != nil && e.rowContainer != nil { e.stats.hashStat = *e.rowContainer.stat