Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#52424
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
hawkingrei authored and ti-chi-bot committed Apr 9, 2024
1 parent 428670a commit be10df0
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 3 deletions.
4 changes: 4 additions & 0 deletions pkg/planner/core/plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ func (collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPlan
return plan, planChanged, nil
}
predicateNeeded := variable.EnableColumnTracking.Load()
<<<<<<< HEAD:pkg/planner/core/plan_stats.go
syncWait := plan.SCtx().GetSessionVars().StatsLoadSyncWait * time.Millisecond.Nanoseconds()
=======
syncWait := plan.SCtx().GetSessionVars().StatsLoadSyncWait.Load()
>>>>>>> 9bb3697349b (statistics: upgrade stats timeout checkpoint after it timeouts (#52424)):pkg/planner/core/rule_collect_plan_stats.go
histNeeded := syncWait > 0
predicateColumns, histNeededColumns := CollectColumnStatsUsage(plan, predicateNeeded, histNeeded)
if len(predicateColumns) > 0 {
Expand Down
8 changes: 6 additions & 2 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,7 @@ type SessionVars struct {
ReadConsistency ReadConsistencyLevel

// StatsLoadSyncWait indicates how long to wait for stats load before timeout.
StatsLoadSyncWait int64
StatsLoadSyncWait atomic.Int64

// SysdateIsNow indicates whether Sysdate is an alias of Now function
SysdateIsNow bool
Expand Down Expand Up @@ -2003,7 +2003,6 @@ func NewSessionVars(hctx HookContext) *SessionVars {
TMPTableSize: DefTiDBTmpTableMaxSize,
MPPStoreFailTTL: DefTiDBMPPStoreFailTTL,
Rng: mathutil.NewWithTime(),
StatsLoadSyncWait: StatsLoadSyncWait.Load(),
EnableLegacyInstanceScope: DefEnableLegacyInstanceScope,
RemoveOrderbyInSubquery: DefTiDBRemoveOrderbyInSubquery,
EnableSkewDistinctAgg: DefTiDBSkewDistinctAgg,
Expand Down Expand Up @@ -2067,6 +2066,11 @@ func NewSessionVars(hctx HookContext) *SessionVars {
vars.DiskTracker = disk.NewTracker(memory.LabelForSession, -1)
vars.MemTracker = memory.NewTracker(memory.LabelForSession, vars.MemQuotaQuery)
vars.MemTracker.IsRootTrackerOfSess = true
<<<<<<< HEAD
=======
vars.MemTracker.Killer = &vars.SQLKiller
vars.StatsLoadSyncWait.Store(StatsLoadSyncWait.Load())
>>>>>>> 9bb3697349b (statistics: upgrade stats timeout checkpoint after it timeouts (#52424))

for _, engine := range config.GetGlobalConfig().IsolationRead.Engines {
switch engine {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2167,7 +2167,7 @@ var defaultSysVars = []*SysVar{
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBStatsLoadSyncWait, Value: strconv.Itoa(DefTiDBStatsLoadSyncWait), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt32,
SetSession: func(s *SessionVars, val string) error {
s.StatsLoadSyncWait = TidbOptInt64(val, DefTiDBStatsLoadSyncWait)
s.StatsLoadSyncWait.Store(TidbOptInt64(val, DefTiDBStatsLoadSyncWait))
return nil
},
GetGlobal: func(_ context.Context, s *SessionVars) (string, error) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,11 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
}
}()
if lastTask == nil {
<<<<<<< HEAD:pkg/statistics/handle/handle_hist.go
task, err = h.drainColTask(exit)
=======
task, err = s.drainColTask(sctx, exit)
>>>>>>> 9bb3697349b (statistics: upgrade stats timeout checkpoint after it timeouts (#52424)):pkg/statistics/handle/syncload/stats_syncload.go
if err != nil {
if err != errExit {
logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err))
Expand All @@ -238,6 +242,7 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
}
return task, result.Err
case <-time.After(timeout):
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
return task, nil
}
}
Expand Down Expand Up @@ -380,7 +385,11 @@ func (*Handle) readStatsForOneItem(sctx sessionctx.Context, item model.TableItem
}

// drainColTask will hang until a column task can return, and either task or error will be returned.
<<<<<<< HEAD:pkg/statistics/handle/handle_hist.go
func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
=======
func (s *statsSyncLoad) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*statstypes.NeededItemTask, error) {
>>>>>>> 9bb3697349b (statistics: upgrade stats timeout checkpoint after it timeouts (#52424)):pkg/statistics/handle/syncload/stats_syncload.go
// select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh
for {
select {
Expand All @@ -393,7 +402,12 @@ func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
// if the task has already timeout, no sql is sync-waiting for it,
// so do not handle it just now, put it to another channel with lower priority
if time.Now().After(task.ToTimeout) {
<<<<<<< HEAD:pkg/statistics/handle/handle_hist.go
h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task)
=======
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task)
>>>>>>> 9bb3697349b (statistics: upgrade stats timeout checkpoint after it timeouts (#52424)):pkg/statistics/handle/syncload/stats_syncload.go
continue
}
return task, nil
Expand Down

0 comments on commit be10df0

Please sign in to comment.