From 0e071e11a482516798af74da0ec2f1474e174bcd Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 24 Oct 2022 17:47:48 +0800 Subject: [PATCH] address the comment Signed-off-by: yisaer --- domain/domain.go | 6 +++--- executor/analyze.go | 18 ++++------------ executor/analyze_worker.go | 36 ++++++++++++++++++++------------ sessionctx/variable/tidb_vars.go | 2 +- 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 47de7ceefc95f..339c55e90e20b 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1580,8 +1580,8 @@ func (do *Domain) SetStatsUpdating(val bool) { } } -// ReturnAnalyzeExtraExec returned extra exec for Analyze -func (do *Domain) ReturnAnalyzeExtraExec(sctxs []sessionctx.Context) { +// AvailableAnalyzeExec make sctxs available +func (do *Domain) AvailableAnalyzeExec(sctxs []sessionctx.Context) { do.analyzeMu.Lock() defer do.analyzeMu.Unlock() for _, ctx := range sctxs { @@ -1595,7 +1595,7 @@ func (do *Domain) DemandAnalyzeExec(need int) []sessionctx.Context { return nil } count := 0 - r := make([]sessionctx.Context, 0) + r := make([]sessionctx.Context, 0, need) do.analyzeMu.Lock() defer do.analyzeMu.Unlock() for sctx, used := range do.analyzeMu.sctxs { diff --git a/executor/analyze.go b/executor/analyze.go index 6626a7431a6f4..3dc81940bfbda 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -226,13 +226,12 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n if len(subSctxs) > 0 { internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh) - dom.ReturnAnalyzeExtraExec(subSctxs) + dom.AvailableAnalyzeExec(subSctxs) return err } } // save analyze results in single-thread. - statsHandle := domain.GetDomain(e.ctx).StatsHandle() panicCnt := 0 var err error for panicCnt < concurrency { @@ -251,19 +250,10 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n continue } handleGlobalStats(needGlobalStats, globalStatsMap, results) - - if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil { - err = err1 - logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) - finishJobWithLog(e.ctx, results.Job, err) - } else { - finishJobWithLog(e.ctx, results.Job, nil) - // Dump stats to historical storage. - if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) - } + err = saveTableStatsToStorage(ctx, e.ctx, results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot) + if err != nil { + return err } - invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) } return err } diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go index bb9a8e573b6b5..e151409061257 100644 --- a/executor/analyze_worker.go +++ b/executor/analyze_worker.go @@ -16,7 +16,6 @@ package executor import ( "context" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" @@ -44,22 +43,33 @@ func newAnalyzeSaveStatsWorker( } func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Error("analyze save stats worker panicked", zap.Any("recover", r), zap.Stack("stack")) + worker.errCh <- getAnalyzePanicErr(r) + } + }() for results := range worker.resultsCh { - err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot) + err := saveTableStatsToStorage(ctx, worker.sctx, results, analyzeSnapshot) if err != nil { - logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) - finishJobWithLog(worker.sctx, results.Job, err) worker.errCh <- err - } else { - finishJobWithLog(worker.sctx, results.Job, nil) - // Dump stats to historical storage. - if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) - } - } - invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) - if err != nil { return } } } + +func saveTableStatsToStorage(ctx context.Context, sctx sessionctx.Context, results *statistics.AnalyzeResults, analyzeSnapshot bool) error { + err := handle.SaveTableStatsToStorage(sctx, results, analyzeSnapshot) + if err != nil { + logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) + finishJobWithLog(sctx, results.Job, err) + } else { + finishJobWithLog(sctx, results.Job, nil) + // Dump stats to historical storage. + if err := recordHistoricalStats(sctx, results.TableID.TableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } + invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) + return err +} diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 8497b9bcecfa2..84ec0bab0ffd0 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1062,7 +1062,7 @@ const ( DefTiDBRcWriteCheckTs = false DefTiDBConstraintCheckInPlacePessimistic = true DefTiDBForeignKeyChecks = false - DefTiDBAnalyzePartitionConcurrency = 4 + DefTiDBAnalyzePartitionConcurrency = 1 DefTiDBOptRangeMaxSize = 64 * int64(size.MB) // 64 MB DefTiDBCostModelVer = 1 DefTiDBServerMemoryLimitSessMinSize = 128 << 20