diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel index f0bcd2e4af2ec..c46cc54c4995c 100644 --- a/pkg/statistics/handle/syncload/BUILD.bazel +++ b/pkg/statistics/handle/syncload/BUILD.bazel @@ -33,7 +33,7 @@ go_test( srcs = ["stats_syncload_test.go"], flaky = True, race = "on", - shard_count = 5, + shard_count = 6, deps = [ ":syncload", "//pkg/config", diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index f7525c6f08371..952aaf844cb86 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -40,7 +40,7 @@ import ( ) // RetryCount is the max retry count for a sync load task. -const RetryCount = 3 +const RetryCount = 2 type statsSyncLoad struct { statsHandle statstypes.StatsHandle @@ -96,9 +96,13 @@ func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHis } select { case s.StatsLoad.NeededItemsCh <- task: - result, ok := <-task.ResultCh - intest.Assert(ok, "task.ResultCh cannot be closed") - return result, nil + select { + case <-timer.C: + return nil, errors.New("sync load took too long to return") + case result, ok := <-task.ResultCh: + intest.Assert(ok, "task.ResultCh cannot be closed") + return result, nil + } case <-timer.C: return nil, errors.New("sync load stats channel is full and timeout sending task to channel") } diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index 8a8929d9d93e5..3483a94f5225c 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -344,3 +344,47 @@ func TestRetry(t *testing.T) { } require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail")) } + +func TestSendLoadRequestsWaitTooLong(t *testing.T) { + originConfig := config.GetGlobalConfig() + newConfig := config.NewConfig() + newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel + newConfig.Performance.StatsLoadQueueSize = 10000 + config.StoreGlobalConfig(newConfig) + defer config.StoreGlobalConfig(originConfig) + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b,c))") + tk.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)") + + oriLease := dom.StatsHandle().Lease() + dom.StatsHandle().SetLease(1) + defer func() { + dom.StatsHandle().SetLease(oriLease) + }() + tk.MustExec("analyze table t all columns") + h := dom.StatsHandle() + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + neededColumns := make([]model.StatsLoadItem, 0, len(tableInfo.Columns)) + for _, col := range tableInfo.Columns { + neededColumns = append(neededColumns, model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: col.ID, IsIndex: false}, FullLoad: true}) + } + stmtCtx := stmtctx.NewStmtCtx() + timeout := time.Nanosecond * 100 + require.NoError(t, h.SendLoadRequests(stmtCtx, neededColumns, timeout)) + for _, resultCh := range stmtCtx.StatsLoad.ResultCh { + rs1 := <-resultCh + require.Error(t, rs1.Err) + } + stmtCtx1 := stmtctx.NewStmtCtx() + require.NoError(t, h.SendLoadRequests(stmtCtx1, neededColumns, timeout)) + for _, resultCh := range stmtCtx1.StatsLoad.ResultCh { + rs1 := <-resultCh + require.Error(t, rs1.Err) + } +}