From a0d24096ca893244ae7c3eab33e2c148ae8679b8 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 3 Nov 2023 10:59:09 +0800 Subject: [PATCH] executor: improve parallel hash aggregation (#47428) close pingcap/tidb#47427 --- .../aggregate/agg_hash_base_worker.go | 2 +- pkg/executor/aggregate/agg_hash_executor.go | 39 ++++-- .../aggregate/agg_hash_final_worker.go | 101 +++++++------- .../aggregate/agg_hash_partial_worker.go | 106 +++++++++------ pkg/executor/aggregate/agg_util.go | 15 +-- pkg/executor/test/aggregate/BUILD.bazel | 2 +- pkg/executor/test/aggregate/aggregate_test.go | 123 ++++++++++++++++++ 7 files changed, 272 insertions(+), 116 deletions(-) diff --git a/pkg/executor/aggregate/agg_hash_base_worker.go b/pkg/executor/aggregate/agg_hash_base_worker.go index 67fa32f356a26..b2ed73cd06f0a 100644 --- a/pkg/executor/aggregate/agg_hash_base_worker.go +++ b/pkg/executor/aggregate/agg_hash_base_worker.go @@ -84,7 +84,7 @@ func (w *baseHashAggWorker) getPartialResult(_ *stmtctx.StatementContext, groupK func (w *baseHashAggWorker) getPartialResultSliceLenConsiderByteAlign() int { length := len(w.aggFuncs) - if len(w.aggFuncs) == 1 { + if length == 1 { return 1 } return length + length&1 diff --git a/pkg/executor/aggregate/agg_hash_executor.go b/pkg/executor/aggregate/agg_hash_executor.go index 9c4283dc6920b..a3bc512ece2fb 100644 --- a/pkg/executor/aggregate/agg_hash_executor.go +++ b/pkg/executor/aggregate/agg_hash_executor.go @@ -104,7 +104,7 @@ type HashAggExec struct { finishCh chan struct{} finalOutputCh chan *AfFinalResult - partialOutputChs []chan *HashAggIntermData + partialOutputChs []chan *AggPartialResultMapper inputCh chan *HashAggInput partialInputChs []chan *chunk.Chunk partialWorkers []HashAggPartialWorker @@ -264,9 +264,9 @@ func (e *HashAggExec) initForParallelExec(_ sessionctx.Context) { for i := range e.partialInputChs { e.partialInputChs[i] = make(chan *chunk.Chunk, 1) } - e.partialOutputChs = make([]chan *HashAggIntermData, finalConcurrency) + e.partialOutputChs = make([]chan *AggPartialResultMapper, finalConcurrency) for i := range e.partialOutputChs { - e.partialOutputChs[i] = make(chan *HashAggIntermData, partialConcurrency) + e.partialOutputChs[i] = make(chan *AggPartialResultMapper, partialConcurrency) } e.partialWorkers = make([]HashAggPartialWorker, partialConcurrency) @@ -275,17 +275,30 @@ func (e *HashAggExec) initForParallelExec(_ sessionctx.Context) { // Init partial workers. for i := 0; i < partialConcurrency; i++ { + partialResultsMap := make([]AggPartialResultMapper, finalConcurrency) + for i := 0; i < finalConcurrency; i++ { + partialResultsMap[i] = make(AggPartialResultMapper) + } + w := HashAggPartialWorker{ - baseHashAggWorker: newBaseHashAggWorker(e.Ctx(), e.finishCh, e.PartialAggFuncs, e.MaxChunkSize(), e.memTracker), - inputCh: e.partialInputChs[i], - outputChs: e.partialOutputChs, - giveBackCh: e.inputCh, - globalOutputCh: e.finalOutputCh, - partialResultsMap: make(AggPartialResultMapper), - groupByItems: e.GroupByItems, - chk: exec.TryNewCacheChunk(e.Children(0)), - groupKey: make([][]byte, 0, 8), + baseHashAggWorker: newBaseHashAggWorker(e.Ctx(), e.finishCh, e.PartialAggFuncs, e.MaxChunkSize(), e.memTracker), + inputCh: e.partialInputChs[i], + outputChs: e.partialOutputChs, + giveBackCh: e.inputCh, + BInMaps: make([]int, finalConcurrency), + partialResultsBuffer: make([][]aggfuncs.PartialResult, 0, 2048), + globalOutputCh: e.finalOutputCh, + partialResultsMap: partialResultsMap, + groupByItems: e.GroupByItems, + chk: exec.TryNewCacheChunk(e.Children(0)), + groupKey: make([][]byte, 0, 8), } + + w.partialResultNumInRow = w.getPartialResultSliceLenConsiderByteAlign() + for i := 0; i < finalConcurrency; i++ { + w.BInMaps[i] = 0 + } + // There is a bucket in the empty partialResultsMap. failpoint.Inject("ConsumeRandomPanic", nil) e.memTracker.Consume(hack.DefBucketMemoryUsageForMapStrToSlice * (1 << w.BInMap)) @@ -309,6 +322,8 @@ func (e *HashAggExec) initForParallelExec(_ sessionctx.Context) { w := HashAggFinalWorker{ baseHashAggWorker: newBaseHashAggWorker(e.Ctx(), e.finishCh, e.FinalAggFuncs, e.MaxChunkSize(), e.memTracker), partialResultMap: make(AggPartialResultMapper), + BInMap: 0, + isFirstInput: true, groupSet: groupSet, inputCh: e.partialOutputChs[i], outputCh: e.finalOutputCh, diff --git a/pkg/executor/aggregate/agg_hash_final_worker.go b/pkg/executor/aggregate/agg_hash_final_worker.go index a2e9f2edae1f5..ab139f491508a 100644 --- a/pkg/executor/aggregate/agg_hash_final_worker.go +++ b/pkg/executor/aggregate/agg_hash_final_worker.go @@ -19,10 +19,10 @@ import ( "time" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/pkg/executor/aggfuncs" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/hack" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/set" "go.uber.org/zap" @@ -43,14 +43,16 @@ type HashAggFinalWorker struct { rowBuffer []types.Datum mutableRow chunk.MutRow partialResultMap AggPartialResultMapper + BInMap int + isFirstInput bool groupSet set.StringSetWithMemoryUsage - inputCh chan *HashAggIntermData + inputCh chan *AggPartialResultMapper outputCh chan *AfFinalResult finalResultHolderCh chan *chunk.Chunk groupKeys [][]byte } -func (w *HashAggFinalWorker) getPartialInput() (input *HashAggIntermData, ok bool) { +func (w *HashAggFinalWorker) getPartialInput() (input *AggPartialResultMapper, ok bool) { select { case <-w.finishCh: return nil, false @@ -62,55 +64,60 @@ func (w *HashAggFinalWorker) getPartialInput() (input *HashAggIntermData, ok boo return } +func (w *HashAggFinalWorker) initBInMap() { + w.BInMap = 0 + mapLen := len(w.partialResultMap) + for mapLen > (1< bucketNum * loadFactor. The memory usage will double. + if len(w.partialResultMap)+1 > (1< bucketNum * loadFactor. The memory usage will double. + if len(mapper[finalWorkerIdx])+1 > (1<