diff --git a/statistics/handle/BUILD.bazel b/statistics/handle/BUILD.bazel index 34b15ad7d9c41..95667a4e63b84 100644 --- a/statistics/handle/BUILD.bazel +++ b/statistics/handle/BUILD.bazel @@ -80,6 +80,7 @@ go_test( "//domain", "//parser/model", "//parser/mysql", + "//sessionctx", "//sessionctx/stmtctx", "//sessionctx/variable", "//statistics", diff --git a/statistics/handle/handle_hist.go b/statistics/handle/handle_hist.go index 231a748cead9c..19625c2057a13 100644 --- a/statistics/handle/handle_hist.go +++ b/statistics/handle/handle_hist.go @@ -214,7 +214,7 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW // if the last task is not successfully handled in last round for error or panic, pass it to this round to retry var lastTask *NeededItemTask for { - task, err := h.HandleOneTask(lastTask, readerCtx, ctx.(sqlexec.RestrictedSQLExecutor), exit) + task, err := h.HandleOneTask(ctx, lastTask, readerCtx, ctx.(sqlexec.RestrictedSQLExecutor), exit) lastTask = task if err != nil { switch err { @@ -235,7 +235,7 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW // - If the task is handled successfully, return nil, nil. // - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep. // - If the task is failed, return the task, error. The caller should retry the timeout task with sleep. -func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) { +func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) { defer func() { // recover for each task, worker keeps working if r := recover(); r != nil { @@ -244,7 +244,7 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC } }() if lastTask == nil { - task, err = h.drainColTask(exit) + task, err = h.drainColTask(sctx, exit) if err != nil { if err != errExit { logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err)) @@ -447,7 +447,7 @@ func (h *Handle) readStatsForOneItem(item model.TableItemID, w *statsWrapper, re } // drainColTask will hang until a column task can return, and either task or error will be returned. -func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) { +func (h *Handle) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*NeededItemTask, error) { // select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh for { select { @@ -460,6 +460,7 @@ func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) { // if the task has already timeout, no sql is sync-waiting for it, // so do not handle it just now, put it to another channel with lower priority if time.Now().After(task.ToTimeout) { + task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task) continue } diff --git a/statistics/handle/handle_hist_test.go b/statistics/handle/handle_hist_test.go index 6189281d99c42..b0a2aa38cebc4 100644 --- a/statistics/handle/handle_hist_test.go +++ b/statistics/handle/handle_hist_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/testkit" @@ -205,7 +206,7 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { exitCh := make(chan struct{}) require.NoError(t, failpoint.Enable(fp.failPath, fp.inTerms)) - task1, err1 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) + task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.Error(t, err1) require.NotNil(t, task1) for _, resultCh := range stmtCtx1.StatsLoad.ResultCh { @@ -226,7 +227,7 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { } require.NoError(t, failpoint.Disable(fp.failPath)) - task3, err3 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) + task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.NoError(t, err3) require.Nil(t, task3) @@ -305,7 +306,8 @@ func TestRetry(t *testing.T) { ) readerCtx := &handle.StatsReaderContext{} for i := 0; i < handle.RetryCount; i++ { - task1, err1 = h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) + task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), + task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.Error(t, err1) require.NotNil(t, task1) select { @@ -315,7 +317,8 @@ func TestRetry(t *testing.T) { default: } } - result, err1 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) + result, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), + task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh) require.NoError(t, err1) require.Nil(t, result) for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {