Skip to content

Commit

Permalink
address the comment
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer committed Oct 24, 2022
1 parent 591ecdd commit 0e071e1
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 31 deletions.
6 changes: 3 additions & 3 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,8 +1580,8 @@ func (do *Domain) SetStatsUpdating(val bool) {
}
}

// ReturnAnalyzeExtraExec returned extra exec for Analyze
func (do *Domain) ReturnAnalyzeExtraExec(sctxs []sessionctx.Context) {
// AvailableAnalyzeExec make sctxs available
func (do *Domain) AvailableAnalyzeExec(sctxs []sessionctx.Context) {
do.analyzeMu.Lock()
defer do.analyzeMu.Unlock()
for _, ctx := range sctxs {
Expand All @@ -1595,7 +1595,7 @@ func (do *Domain) DemandAnalyzeExec(need int) []sessionctx.Context {
return nil
}
count := 0
r := make([]sessionctx.Context, 0)
r := make([]sessionctx.Context, 0, need)
do.analyzeMu.Lock()
defer do.analyzeMu.Unlock()
for sctx, used := range do.analyzeMu.sctxs {
Expand Down
18 changes: 4 additions & 14 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,12 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
if len(subSctxs) > 0 {
internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
dom.ReturnAnalyzeExtraExec(subSctxs)
dom.AvailableAnalyzeExec(subSctxs)
return err
}
}

// save analyze results in single-thread.
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
panicCnt := 0
var err error
for panicCnt < concurrency {
Expand All @@ -251,19 +250,10 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)

if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil {
err = err1
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(e.ctx, results.Job, err)
} else {
finishJobWithLog(e.ctx, results.Job, nil)
// Dump stats to historical storage.
if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
err = saveTableStatsToStorage(ctx, e.ctx, results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot)
if err != nil {
return err
}
invalidInfoSchemaStatCache(results.TableID.GetStatisticsID())
}
return err
}
Expand Down
36 changes: 23 additions & 13 deletions executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package executor

import (
"context"

"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
Expand Down Expand Up @@ -44,22 +43,33 @@ func newAnalyzeSaveStatsWorker(
}

func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze save stats worker panicked", zap.Any("recover", r), zap.Stack("stack"))
worker.errCh <- getAnalyzePanicErr(r)
}
}()
for results := range worker.resultsCh {
err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot)
err := saveTableStatsToStorage(ctx, worker.sctx, results, analyzeSnapshot)
if err != nil {
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(worker.sctx, results.Job, err)
worker.errCh <- err
} else {
finishJobWithLog(worker.sctx, results.Job, nil)
// Dump stats to historical storage.
if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
invalidInfoSchemaStatCache(results.TableID.GetStatisticsID())
if err != nil {
return
}
}
}

func saveTableStatsToStorage(ctx context.Context, sctx sessionctx.Context, results *statistics.AnalyzeResults, analyzeSnapshot bool) error {
err := handle.SaveTableStatsToStorage(sctx, results, analyzeSnapshot)
if err != nil {
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(sctx, results.Job, err)
} else {
finishJobWithLog(sctx, results.Job, nil)
// Dump stats to historical storage.
if err := recordHistoricalStats(sctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
invalidInfoSchemaStatCache(results.TableID.GetStatisticsID())
return err
}
2 changes: 1 addition & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ const (
DefTiDBRcWriteCheckTs = false
DefTiDBConstraintCheckInPlacePessimistic = true
DefTiDBForeignKeyChecks = false
DefTiDBAnalyzePartitionConcurrency = 4
DefTiDBAnalyzePartitionConcurrency = 1
DefTiDBOptRangeMaxSize = 64 * int64(size.MB) // 64 MB
DefTiDBCostModelVer = 1
DefTiDBServerMemoryLimitSessMinSize = 128 << 20
Expand Down

0 comments on commit 0e071e1

Please sign in to comment.