Skip to content

Commit

Permalink
domain: make the transaction from initStatsCtx blocking gc (#53602) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 7, 2024
1 parent ec98fc7 commit 5e6bbb5
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 3 deletions.
19 changes: 19 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2199,6 +2199,25 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
do.wg.Run(func() {
do.handleDDLEvent()
}, "handleDDLEvent")
do.wg.Run(
func() {
// The initStatsCtx is used to store the internal session for initializing stats,
// so we need the gc min start ts calculation to track it as an internal session.
// Since the session manager may not be ready at this moment, `infosync.StoreInternalSession` can fail.
// we need to retry until the session manager is ready or the init stats completes.
for !infosync.StoreInternalSession(initStatsCtx) {
waitRetry := time.After(time.Second)
select {
case <-do.StatsHandle().InitStatsDone:
return
case <-waitRetry:
}
}
<-do.StatsHandle().InitStatsDone
infosync.DeleteInternalSession(initStatsCtx)
},
"RemoveInitStatsFromInternalSessions",
)
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,16 +1339,18 @@ func ConfigureTiFlashPDForPartitions(accel bool, definitions *[]model.PartitionD
}

// StoreInternalSession is the entry function for store an internal session to SessionManager.
func StoreInternalSession(se interface{}) {
// return whether the session is stored successfully.
func StoreInternalSession(se interface{}) bool {
is, err := getGlobalInfoSyncer()
if err != nil {
return
return false
}
sm := is.GetSessionManager()
if sm == nil {
return
return false
}
sm.StoreInternalSession(se)
return true
}

// DeleteInternalSession is the entry function for delete an internal session from SessionManager.
Expand Down
1 change: 1 addition & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ go_test(
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
Expand Down
46 changes: 46 additions & 0 deletions server/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func TestUptime(t *testing.T) {
Expand Down Expand Up @@ -62,3 +64,47 @@ func TestUptime(t *testing.T) {
require.NoError(t, err)
require.GreaterOrEqual(t, stats[upTime].(int64), int64(time.Since(time.Unix(1282967700, 0)).Seconds()))
}

func TestInitStatsSessionBlockGC(t *testing.T) {
origConfig := config.GetGlobalConfig()
defer func() {
config.StoreGlobalConfig(origConfig)
}()
newConfig := *origConfig
for _, lite := range []bool{false, true} {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/statistics/handle/beforeInitStats", "pause"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/statistics/handle/beforeInitStatsLite", "pause"))
newConfig.Performance.LiteInitStats = lite
config.StoreGlobalConfig(&newConfig)
session.SetStatsLease(3 * time.Second)

store, err := mockstore.NewMockStore()
require.NoError(t, err)
dom, err := session.BootstrapSession(store)
require.NoError(t, err)

infoSyncer := dom.InfoSyncer()
sv := CreateMockServer(t, store)
sv.SetDomain(dom)
infoSyncer.SetSessionManager(sv)
time.Sleep(time.Second)
require.Eventually(t, func() bool {
now := time.Now()
startTSList := sv.GetInternalSessionStartTSList()
for _, startTs := range startTSList {
if startTs != 0 {
startTime := oracle.GetTimeFromTS(startTs)
// test pass if the min_start_ts is blocked over 1s.
if now.Sub(startTime) > time.Second {
return true
}
}
}
return false
}, 10*time.Second, 10*time.Millisecond, "min_start_ts is not blocked over 1s")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/handle/beforeInitStats"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/handle/beforeInitStatsLite"))
dom.Close()
require.NoError(t, store.Close())
}
}
3 changes: 3 additions & 0 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -470,6 +471,7 @@ func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) {
if err != nil {
return err
}
failpoint.Inject("beforeInitStatsLite", func() {})
cache, err := h.initStatsMeta(is)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -504,6 +506,7 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
if err != nil {
return err
}
failpoint.Inject("beforeInitStats", func() {})
cache, err := h.initStatsMeta(is)
if err != nil {
return errors.Trace(err)
Expand Down

0 comments on commit 5e6bbb5

Please sign in to comment.