From 73fa967c49457aaed93c10fa21470df49081497b Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 8 Apr 2024 22:50:49 +0800 Subject: [PATCH 1/3] statistics: upgrade stats timeout checkpoint after it timeouts Signed-off-by: Weizhen Wang --- pkg/statistics/handle/syncload/stats_syncload.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index 4a7956b1147ae..d5a8659ac77bf 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -239,6 +239,7 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty } return task, result.Err case <-time.After(timeout): + task.ToTimeout.Add(100 * time.Microsecond) return task, nil } } @@ -409,6 +410,7 @@ func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*statstypes.NeededItem // if the task has already timeout, no sql is sync-waiting for it, // so do not handle it just now, put it to another channel with lower priority if time.Now().After(task.ToTimeout) { + task.ToTimeout.Add(100 * time.Microsecond) s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) continue } From 52d73d671cd69a08dc48a58e50af86c6f3ef47fd Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 8 Apr 2024 23:47:40 +0800 Subject: [PATCH 2/3] statistics: upgrade stats timeout checkpoint after it timeouts Signed-off-by: Weizhen Wang --- pkg/statistics/handle/syncload/stats_syncload.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index d5a8659ac77bf..499dd65e0b8e5 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -213,7 +213,7 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty } }() if lastTask == nil { - task, err = s.drainColTask(exit) + task, err = s.drainColTask(sctx, exit) if err != nil { if err != errExit { logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err)) @@ -239,7 +239,7 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty } return task, result.Err case <-time.After(timeout): - task.ToTimeout.Add(100 * time.Microsecond) + task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait) * time.Microsecond) return task, nil } } @@ -397,7 +397,7 @@ func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.Ta } // drainColTask will hang until a column task can return, and either task or error will be returned. -func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*statstypes.NeededItemTask, error) { +func (s *statsSyncLoad) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*statstypes.NeededItemTask, error) { // select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh for { select { @@ -410,7 +410,7 @@ func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*statstypes.NeededItem // if the task has already timeout, no sql is sync-waiting for it, // so do not handle it just now, put it to another channel with lower priority if time.Now().After(task.ToTimeout) { - task.ToTimeout.Add(100 * time.Microsecond) + task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait) * time.Microsecond) s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) continue } From 54524c4203f48dec7bd556eb1320727fe1ce8821 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 9 Apr 2024 09:41:53 +0800 Subject: [PATCH 3/3] statistics: upgrade stats timeout checkpoint after it timeouts Signed-off-by: Weizhen Wang --- pkg/planner/core/rule_collect_plan_stats.go | 2 +- pkg/sessionctx/variable/session.go | 4 ++-- pkg/sessionctx/variable/sysvar.go | 2 +- pkg/statistics/handle/syncload/stats_syncload.go | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/planner/core/rule_collect_plan_stats.go b/pkg/planner/core/rule_collect_plan_stats.go index 027522a111d7f..eee774baefed6 100644 --- a/pkg/planner/core/rule_collect_plan_stats.go +++ b/pkg/planner/core/rule_collect_plan_stats.go @@ -38,7 +38,7 @@ func (collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPlan return plan, planChanged, nil } predicateNeeded := variable.EnableColumnTracking.Load() - syncWait := plan.SCtx().GetSessionVars().StatsLoadSyncWait + syncWait := plan.SCtx().GetSessionVars().StatsLoadSyncWait.Load() histNeeded := syncWait > 0 predicateColumns, histNeededColumns, visitedPhysTblIDs := CollectColumnStatsUsage(plan, predicateNeeded, histNeeded) if len(predicateColumns) > 0 { diff --git a/pkg/sessionctx/variable/session.go b/pkg/sessionctx/variable/session.go index f5f427a3d734a..c4bb81ee8d6ff 100644 --- a/pkg/sessionctx/variable/session.go +++ b/pkg/sessionctx/variable/session.go @@ -1307,7 +1307,7 @@ type SessionVars struct { ReadConsistency ReadConsistencyLevel // StatsLoadSyncWait indicates how long to wait for stats load before timeout. - StatsLoadSyncWait int64 + StatsLoadSyncWait atomic.Int64 // EnableParallelHashaggSpill indicates if parallel hash agg could spill. EnableParallelHashaggSpill bool @@ -2026,7 +2026,6 @@ func NewSessionVars(hctx HookContext) *SessionVars { TMPTableSize: DefTiDBTmpTableMaxSize, MPPStoreFailTTL: DefTiDBMPPStoreFailTTL, Rng: mathutil.NewWithTime(), - StatsLoadSyncWait: StatsLoadSyncWait.Load(), EnableLegacyInstanceScope: DefEnableLegacyInstanceScope, RemoveOrderbyInSubquery: DefTiDBRemoveOrderbyInSubquery, EnableSkewDistinctAgg: DefTiDBSkewDistinctAgg, @@ -2094,6 +2093,7 @@ func NewSessionVars(hctx HookContext) *SessionVars { vars.MemTracker = memory.NewTracker(memory.LabelForSession, vars.MemQuotaQuery) vars.MemTracker.IsRootTrackerOfSess = true vars.MemTracker.Killer = &vars.SQLKiller + vars.StatsLoadSyncWait.Store(StatsLoadSyncWait.Load()) for _, engine := range config.GetGlobalConfig().IsolationRead.Engines { switch engine { diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index ecefaf3be377a..5dfc001483299 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -2336,7 +2336,7 @@ var defaultSysVars = []*SysVar{ }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBStatsLoadSyncWait, Value: strconv.Itoa(DefTiDBStatsLoadSyncWait), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error { - s.StatsLoadSyncWait = TidbOptInt64(val, DefTiDBStatsLoadSyncWait) + s.StatsLoadSyncWait.Store(TidbOptInt64(val, DefTiDBStatsLoadSyncWait)) return nil }, GetGlobal: func(_ context.Context, s *SessionVars) (string, error) { diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index 499dd65e0b8e5..ec89ed7b52361 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -239,7 +239,7 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty } return task, result.Err case <-time.After(timeout): - task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait) * time.Microsecond) + task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) return task, nil } } @@ -410,7 +410,7 @@ func (s *statsSyncLoad) drainColTask(sctx sessionctx.Context, exit chan struct{} // if the task has already timeout, no sql is sync-waiting for it, // so do not handle it just now, put it to another channel with lower priority if time.Now().After(task.ToTimeout) { - task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait) * time.Microsecond) + task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) continue }