Skip to content

Commit

Permalink
executor: split hashjoin into workers(part2) (#39079)
Browse files Browse the repository at this point in the history
ref #39061
  • Loading branch information
XuHuaiyu authored Nov 11, 2022
1 parent 481f5ab commit b5e1f7a
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 83 deletions.
4 changes: 2 additions & 2 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,7 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
probeSideTupleFetcher: probeSideTupleFetcher{
probeSideExec: outerExec,
},
probeWorkers: make([]probeWorker, testCase.concurrency),
concurrency: uint(testCase.concurrency),
joinType: testCase.joinType, // 0 for InnerJoin, 1 for LeftOutersJoin, 2 for RightOuterJoin
isOuterJoin: false,
Expand All @@ -932,9 +933,8 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
childrenUsedSchema := markChildrenUsedCols(e.Schema(), e.children[0].Schema(), e.children[1].Schema())
defaultValues := make([]types.Datum, e.buildSideExec.Schema().Len())
lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec)
e.probeWorker.joiners = make([]joiner, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.probeWorker.joiners[i] = newJoiner(testCase.ctx, e.joinType, true, defaultValues,
e.probeWorkers[i].joiner = newJoiner(testCase.ctx, e.joinType, true, defaultValues,
nil, lhsTypes, rhsTypes, childrenUsedSchema, false)
}
memLimit := int64(-1)
Expand Down
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,9 +1448,9 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
isNAJoin := len(v.LeftNAJoinKeys) > 0
e.buildSideEstCount = b.buildSideEstCount(v)
childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema())
e.probeWorker.joiners = make([]joiner, e.concurrency)
e.probeWorkers = make([]probeWorker, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.probeWorker.joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,
e.probeWorkers[i].joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,
v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin)
}
executorCountHashJoinExec.Inc()
Expand Down
Loading

0 comments on commit b5e1f7a

Please sign in to comment.