From 3bc942a2a997639ecd036a25a461862cb477cbca Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 28 Nov 2023 18:03:51 +0800 Subject: [PATCH 1/3] Revert "*: fix wrong result when to concurrency merge global stats (#48852)" This reverts commit 26db5909628fee0605d9e53d949b7b4d9e2b64e3. --- .../handle/globalstats/merge_worker.go | 59 +++++++++++-------- pkg/statistics/handle/globalstats/topn.go | 34 +++++++---- 2 files changed, 54 insertions(+), 39 deletions(-) diff --git a/pkg/statistics/handle/globalstats/merge_worker.go b/pkg/statistics/handle/globalstats/merge_worker.go index b702acaa797f7..7a63adee65fb2 100644 --- a/pkg/statistics/handle/globalstats/merge_worker.go +++ b/pkg/statistics/handle/globalstats/merge_worker.go @@ -43,11 +43,8 @@ type topnStatsMergeWorker struct { respCh chan<- *TopnStatsMergeResponse // the stats in the wrapper should only be read during the worker statsWrapper *StatsWrapper - // Different TopN structures may hold the same value, we have to merge them. - counter map[hack.MutableString]float64 // shardMutex is used to protect `statsWrapper.AllHg` shardMutex []sync.Mutex - mu sync.Mutex } // NewTopnStatsMergeWorker returns topn merge worker @@ -57,9 +54,8 @@ func NewTopnStatsMergeWorker( wrapper *StatsWrapper, killer *sqlkiller.SQLKiller) *topnStatsMergeWorker { worker := &topnStatsMergeWorker{ - taskCh: taskCh, - respCh: respCh, - counter: make(map[hack.MutableString]float64), + taskCh: taskCh, + respCh: respCh, } worker.statsWrapper = wrapper worker.shardMutex = make([]sync.Mutex, len(wrapper.AllHg)) @@ -83,11 +79,15 @@ func NewTopnStatsMergeTask(start, end int) *TopnStatsMergeTask { // TopnStatsMergeResponse indicates topn merge worker response type TopnStatsMergeResponse struct { - Err error + Err error + TopN *statistics.TopN + PopedTopn []statistics.TopNMeta } // Run runs topn merge like statistics.MergePartTopN2GlobalTopN -func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool, version int) { +func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool, + n uint32, + version int) { for task := range worker.taskCh { start := task.start end := task.end @@ -95,12 +95,17 @@ func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool, v allTopNs := worker.statsWrapper.AllTopN allHists := worker.statsWrapper.AllHg resp := &TopnStatsMergeResponse{} - + if statistics.CheckEmptyTopNs(checkTopNs) { + worker.respCh <- resp + return + } partNum := len(allTopNs) - + // Different TopN structures may hold the same value, we have to merge them. + counter := make(map[hack.MutableString]float64) // datumMap is used to store the mapping from the string type to datum type. // The datum is used to find the value in the histogram. datumMap := statistics.NewDatumMapCache() + for i, topN := range checkTopNs { i = i + start if err := worker.killer.HandleSignal(); err != nil { @@ -113,15 +118,12 @@ func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool, v } for _, val := range topN.TopN { encodedVal := hack.String(val.Encoded) - worker.mu.Lock() - _, exists := worker.counter[encodedVal] - worker.counter[encodedVal] += float64(val.Count) + _, exists := counter[encodedVal] + counter[encodedVal] += float64(val.Count) if exists { - worker.mu.Unlock() // We have already calculated the encodedVal from the histogram, so just continue to next topN value. continue } - worker.mu.Unlock() // We need to check whether the value corresponding to encodedVal is contained in other partition-level stats. // 1. Check the topN first. // 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram. @@ -145,26 +147,31 @@ func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool, v } datum = d } - worker.shardMutex[j].Lock() // Get the row count which the value is equal to the encodedVal from histogram. count, _ := allHists[j].EqualRowCount(nil, datum, isIndex) if count != 0 { + counter[encodedVal] += count // Remove the value corresponding to encodedVal from the histogram. + worker.shardMutex[j].Lock() worker.statsWrapper.AllHg[j].BinarySearchRemoveVal(statistics.TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)}) - } - worker.shardMutex[j].Unlock() - if count != 0 { - worker.mu.Lock() - worker.counter[encodedVal] += count - worker.mu.Unlock() + worker.shardMutex[j].Unlock() } } } } + numTop := len(counter) + if numTop == 0 { + worker.respCh <- resp + continue + } + sorted := make([]statistics.TopNMeta, 0, numTop) + for value, cnt := range counter { + data := hack.Slice(string(value)) + sorted = append(sorted, statistics.TopNMeta{Encoded: data, Count: uint64(cnt)}) + } + globalTopN, leftTopN := statistics.GetMergedTopNFromSortedSlice(sorted, n) + resp.TopN = globalTopN + resp.PopedTopn = leftTopN worker.respCh <- resp } } - -func (worker *topnStatsMergeWorker) Result() map[hack.MutableString]float64 { - return worker.counter -} diff --git a/pkg/statistics/handle/globalstats/topn.go b/pkg/statistics/handle/globalstats/topn.go index 171756b82357b..9e9f14a068a54 100644 --- a/pkg/statistics/handle/globalstats/topn.go +++ b/pkg/statistics/handle/globalstats/topn.go @@ -30,12 +30,8 @@ import ( func mergeGlobalStatsTopN(gp *gp.Pool, sc sessionctx.Context, wrapper *StatsWrapper, timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) { - if statistics.CheckEmptyTopNs(wrapper.AllTopN) { - return nil, nil, wrapper.AllHg, nil - } mergeConcurrency := sc.GetSessionVars().AnalyzePartitionMergeConcurrency killer := &sc.GetSessionVars().SQLKiller - // use original method if concurrency equals 1 or for version1 if mergeConcurrency < 2 { return MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex, killer) @@ -82,12 +78,12 @@ func MergeGlobalStatsTopNByConcurrency( taskNum := len(tasks) taskCh := make(chan *TopnStatsMergeTask, taskNum) respCh := make(chan *TopnStatsMergeResponse, taskNum) - worker := NewTopnStatsMergeWorker(taskCh, respCh, wrapper, killer) for i := 0; i < mergeConcurrency; i++ { + worker := NewTopnStatsMergeWorker(taskCh, respCh, wrapper, killer) wg.Add(1) gp.Go(func() { defer wg.Done() - worker.Run(timeZone, isIndex, version) + worker.Run(timeZone, isIndex, n, version) }) } for _, task := range tasks { @@ -96,6 +92,8 @@ func MergeGlobalStatsTopNByConcurrency( close(taskCh) wg.Wait() close(respCh) + resps := make([]*TopnStatsMergeResponse, 0) + // handle Error hasErr := false errMsg := make([]string, 0) @@ -104,21 +102,27 @@ func MergeGlobalStatsTopNByConcurrency( hasErr = true errMsg = append(errMsg, resp.Err.Error()) } + resps = append(resps, resp) } if hasErr { return nil, nil, nil, errors.New(strings.Join(errMsg, ",")) } // fetch the response from each worker and merge them into global topn stats - counter := worker.Result() - numTop := len(counter) - sorted := make([]statistics.TopNMeta, 0, numTop) - for value, cnt := range counter { - data := hack.Slice(string(value)) - sorted = append(sorted, statistics.TopNMeta{Encoded: data, Count: uint64(cnt)}) + sorted := make([]statistics.TopNMeta, 0, mergeConcurrency) + leftTopn := make([]statistics.TopNMeta, 0) + for _, resp := range resps { + if resp.TopN != nil { + sorted = append(sorted, resp.TopN.TopN...) + } + leftTopn = append(leftTopn, resp.PopedTopn...) } + globalTopN, popedTopn := statistics.GetMergedTopNFromSortedSlice(sorted, n) - return globalTopN, popedTopn, wrapper.AllHg, nil + + result := append(leftTopn, popedTopn...) + statistics.SortTopnMeta(result) + return globalTopN, result, wrapper.AllHg, nil } // MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN. @@ -145,6 +149,10 @@ func MergePartTopN2GlobalTopN( isIndex bool, killer *sqlkiller.SQLKiller, ) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) { + if statistics.CheckEmptyTopNs(topNs) { + return nil, nil, hists, nil + } + partNum := len(topNs) // Different TopN structures may hold the same value, we have to merge them. counter := make(map[hack.MutableString]float64) From 38f227da0e61d09be376f941f09741c1d71f2e54 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 28 Nov 2023 18:12:11 +0800 Subject: [PATCH 2/3] statistics: add more test for tidb_merge_partition_stats_concurrency Signed-off-by: Weizhen Wang --- pkg/sessionctx/variable/tidb_vars.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sessionctx/variable/tidb_vars.go b/pkg/sessionctx/variable/tidb_vars.go index 5e8f839450202..16d576e339f58 100644 --- a/pkg/sessionctx/variable/tidb_vars.go +++ b/pkg/sessionctx/variable/tidb_vars.go @@ -1361,7 +1361,7 @@ const ( DefTiDBOptRangeMaxSize = 64 * int64(size.MB) // 64 MB DefTiDBCostModelVer = 2 DefTiDBServerMemoryLimitSessMinSize = 128 << 20 - DefTiDBMergePartitionStatsConcurrency = 1 + DefTiDBMergePartitionStatsConcurrency = 2 DefTiDBServerMemoryLimitGCTrigger = 0.7 DefTiDBEnableGOGCTuner = true // DefTiDBGOGCTunerThreshold is to limit TiDBGOGCTunerThreshold. From afcdde81ba3b53c4122e6ca444768f9eb298c342 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 28 Nov 2023 18:29:09 +0800 Subject: [PATCH 3/3] update Signed-off-by: Weizhen Wang --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 26843e9739290..2079222160370 100644 --- a/Makefile +++ b/Makefile @@ -497,7 +497,7 @@ bazel_test: failpoint-enable bazel_prepare bazel_coverage_test: failpoint-enable bazel_ci_simple_prepare - bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc coverage $(BAZEL_CMD_CONFIG) $(BAZEL_INSTRUMENTATION_FILTER) --jobs=35 --build_tests_only --test_keep_going=false \ + bazel $(BAZEL_GLOBAL_CONFIG) --nohome_rc coverage -k $(BAZEL_CMD_CONFIG) $(BAZEL_INSTRUMENTATION_FILTER) --jobs=35 --build_tests_only --test_keep_going=false \ --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \ -- //... -//cmd/... -//tests/graceshutdown/... \ -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...