diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 54fb27c0de2e2..f506f8329cae1 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -2256,6 +2256,25 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err }, "analyzeJobsCleanupWorker", ) + do.wg.Run( + func() { + // The initStatsCtx is used to store the internal session for initializing stats, + // so we need the gc min start ts calculation to track it as an internal session. + // Since the session manager may not be ready at this moment, `infosync.StoreInternalSession` can fail. + // we need to retry until the session manager is ready or the init stats completes. + for !infosync.StoreInternalSession(initStatsCtx) { + waitRetry := time.After(time.Second) + select { + case <-do.StatsHandle().InitStatsDone: + return + case <-waitRetry: + } + } + <-do.StatsHandle().InitStatsDone + infosync.DeleteInternalSession(initStatsCtx) + }, + "RemoveInitStatsFromInternalSessions", + ) return nil } diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index 51dfbcac6e16b..908d84df8aced 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -1243,16 +1243,18 @@ func ConfigureTiFlashPDForPartitions(accel bool, definitions *[]model.PartitionD } // StoreInternalSession is the entry function for store an internal session to SessionManager. -func StoreInternalSession(se any) { +// return whether the session is stored successfully. +func StoreInternalSession(se any) bool { is, err := getGlobalInfoSyncer() if err != nil { - return + return false } sm := is.GetSessionManager() if sm == nil { - return + return false } sm.StoreInternalSession(se) + return true } // DeleteInternalSession is the entry function for delete an internal session from SessionManager. diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index cc6391de1ddc8..f9cf4a2fd8d4c 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -203,6 +203,7 @@ go_test( "@com_github_prometheus_client_golang//prometheus/testutil", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//error", + "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//testutils", "@com_github_tikv_client_go_v2//tikv", "@org_uber_go_goleak//:goleak", diff --git a/pkg/server/stat_test.go b/pkg/server/stat_test.go index 5b0fca478780d..edc26eed42277 100644 --- a/pkg/server/stat_test.go +++ b/pkg/server/stat_test.go @@ -20,12 +20,14 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/keyspace" "github.com/pingcap/tidb/pkg/server/internal/util" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func TestUptime(t *testing.T) { @@ -63,3 +65,46 @@ func TestUptime(t *testing.T) { require.NoError(t, err) require.GreaterOrEqual(t, stats[upTime].(int64), int64(time.Since(time.Unix(1282967700, 0)).Seconds())) } + +func TestInitStatsSessionBlockGC(t *testing.T) { + origConfig := config.GetGlobalConfig() + defer func() { + config.StoreGlobalConfig(origConfig) + }() + newConfig := *origConfig + for _, lite := range []bool{false, true} { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStats", "pause")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStatsLite", "pause")) + newConfig.Performance.LiteInitStats = lite + config.StoreGlobalConfig(&newConfig) + + store, err := mockstore.NewMockStore() + require.NoError(t, err) + dom, err := session.BootstrapSession(store) + require.NoError(t, err) + + infoSyncer := dom.InfoSyncer() + sv := CreateMockServer(t, store) + sv.SetDomain(dom) + infoSyncer.SetSessionManager(sv) + time.Sleep(time.Second) + require.Eventually(t, func() bool { + now := time.Now() + startTSList := sv.GetInternalSessionStartTSList() + for _, startTs := range startTSList { + if startTs != 0 { + startTime := oracle.GetTimeFromTS(startTs) + // test pass if the min_start_ts is blocked over 1s. + if now.Sub(startTime) > time.Second { + return true + } + } + } + return false + }, 10*time.Second, 10*time.Millisecond, "min_start_ts is not blocked over 1s") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStats")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStatsLite")) + dom.Close() + require.NoError(t, store.Close()) + } +} diff --git a/pkg/statistics/handle/BUILD.bazel b/pkg/statistics/handle/BUILD.bazel index f5787bc595d13..9b8d1bda85aa0 100644 --- a/pkg/statistics/handle/BUILD.bazel +++ b/pkg/statistics/handle/BUILD.bazel @@ -35,6 +35,7 @@ go_library( "//pkg/util/chunk", "//pkg/util/logutil", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/statistics/handle/bootstrap.go b/pkg/statistics/handle/bootstrap.go index d238c620fec8d..4eb70b7b63eea 100644 --- a/pkg/statistics/handle/bootstrap.go +++ b/pkg/statistics/handle/bootstrap.go @@ -20,6 +20,7 @@ import ( "sync/atomic" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" @@ -690,6 +691,7 @@ func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) { if err != nil { return err } + failpoint.Inject("beforeInitStatsLite", func() {}) cache, err := h.initStatsMeta(is) if err != nil { return errors.Trace(err) @@ -717,6 +719,7 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) { if err != nil { return err } + failpoint.Inject("beforeInitStats", func() {}) cache, err := h.initStatsMeta(is) if err != nil { return errors.Trace(err)