diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 99bf7c4d86b31..11cb020eb340b 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -2293,6 +2293,25 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err }, "gcAnalyzeHistory", ) + do.wg.Run( + func() { + // The initStatsCtx is used to store the internal session for initializing stats, + // so we need the gc min start ts calculation to track it as an internal session. + // Since the session manager may not be ready at this moment, `infosync.StoreInternalSession` can fail. + // we need to retry until the session manager is ready or the init stats completes. + for !infosync.StoreInternalSession(initStatsCtx) { + waitRetry := time.After(time.Second) + select { + case <-do.StatsHandle().InitStatsDone: + return + case <-waitRetry: + } + } + <-do.StatsHandle().InitStatsDone + infosync.DeleteInternalSession(initStatsCtx) + }, + "RemoveInitStatsFromInternalSessions", + ) return nil } diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index 5c4ee1889d58a..ed054cbb8bd7b 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -1283,16 +1283,22 @@ func ConfigureTiFlashPDForPartitions(accel bool, definitions *[]model.PartitionD } // StoreInternalSession is the entry function for store an internal session to SessionManager. +<<<<<<< HEAD func StoreInternalSession(se interface{}) { +======= +// return whether the session is stored successfully. +func StoreInternalSession(se any) bool { +>>>>>>> bf704fd635c (domain: make the transaction from `initStatsCtx` blocking gc (#53602)) is, err := getGlobalInfoSyncer() if err != nil { - return + return false } sm := is.GetSessionManager() if sm == nil { - return + return false } sm.StoreInternalSession(se) + return true } // DeleteInternalSession is the entry function for delete an internal session from SessionManager. diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 9a051f117b75e..cb54511209d19 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -186,6 +186,7 @@ go_test( "@com_github_prometheus_client_golang//prometheus/testutil", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//error", + "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//testutils", "@com_github_tikv_client_go_v2//tikv", "@org_uber_go_goleak//:goleak", diff --git a/pkg/server/stat_test.go b/pkg/server/stat_test.go index 31f7983cd000d..d0ffd2800685f 100644 --- a/pkg/server/stat_test.go +++ b/pkg/server/stat_test.go @@ -20,12 +20,14 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/keyspace" "github.com/pingcap/tidb/pkg/server/internal/util" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func TestUptime(t *testing.T) { @@ -63,3 +65,46 @@ func TestUptime(t *testing.T) { require.NoError(t, err) require.GreaterOrEqual(t, stats[upTime].(int64), int64(time.Since(time.Unix(1282967700, 0)).Seconds())) } + +func TestInitStatsSessionBlockGC(t *testing.T) { + origConfig := config.GetGlobalConfig() + defer func() { + config.StoreGlobalConfig(origConfig) + }() + newConfig := *origConfig + for _, lite := range []bool{false, true} { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStats", "pause")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStatsLite", "pause")) + newConfig.Performance.LiteInitStats = lite + config.StoreGlobalConfig(&newConfig) + + store, err := mockstore.NewMockStore() + require.NoError(t, err) + dom, err := session.BootstrapSession(store) + require.NoError(t, err) + + infoSyncer := dom.InfoSyncer() + sv := CreateMockServer(t, store) + sv.SetDomain(dom) + infoSyncer.SetSessionManager(sv) + time.Sleep(time.Second) + require.Eventually(t, func() bool { + now := time.Now() + startTSList := sv.GetInternalSessionStartTSList() + for _, startTs := range startTSList { + if startTs != 0 { + startTime := oracle.GetTimeFromTS(startTs) + // test pass if the min_start_ts is blocked over 1s. + if now.Sub(startTime) > time.Second { + return true + } + } + } + return false + }, 10*time.Second, 10*time.Millisecond, "min_start_ts is not blocked over 1s") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStats")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStatsLite")) + dom.Close() + require.NoError(t, store.Close()) + } +} diff --git a/pkg/statistics/handle/BUILD.bazel b/pkg/statistics/handle/BUILD.bazel index 4ed865fdfaced..628e4b0cfef3a 100644 --- a/pkg/statistics/handle/BUILD.bazel +++ b/pkg/statistics/handle/BUILD.bazel @@ -40,9 +40,12 @@ go_library( "//pkg/util/logutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", +<<<<<<< HEAD "@com_github_tiancaiamao_gp//:gp", "@org_golang_x_sync//singleflight", "@org_uber_go_atomic//:atomic", +======= +>>>>>>> bf704fd635c (domain: make the transaction from `initStatsCtx` blocking gc (#53602)) "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/statistics/handle/bootstrap.go b/pkg/statistics/handle/bootstrap.go index abd0dc5d00d79..1cc5441298dda 100644 --- a/pkg/statistics/handle/bootstrap.go +++ b/pkg/statistics/handle/bootstrap.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" @@ -700,6 +701,7 @@ func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) { if err != nil { return err } + failpoint.Inject("beforeInitStatsLite", func() {}) cache, err := h.initStatsMeta(is) if err != nil { return errors.Trace(err) @@ -727,6 +729,7 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) { if err != nil { return err } + failpoint.Inject("beforeInitStats", func() {}) cache, err := h.initStatsMeta(is) if err != nil { return errors.Trace(err)