Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#53602
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
you06 authored and ti-chi-bot committed May 28, 2024
1 parent 5a66534 commit 0a14d80
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 2 deletions.
19 changes: 19 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2293,6 +2293,25 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
},
"gcAnalyzeHistory",
)
do.wg.Run(
func() {
// The initStatsCtx is used to store the internal session for initializing stats,
// so we need the gc min start ts calculation to track it as an internal session.
// Since the session manager may not be ready at this moment, `infosync.StoreInternalSession` can fail.
// we need to retry until the session manager is ready or the init stats completes.
for !infosync.StoreInternalSession(initStatsCtx) {
waitRetry := time.After(time.Second)
select {
case <-do.StatsHandle().InitStatsDone:
return
case <-waitRetry:
}
}
<-do.StatsHandle().InitStatsDone
infosync.DeleteInternalSession(initStatsCtx)
},
"RemoveInitStatsFromInternalSessions",
)
return nil
}

Expand Down
10 changes: 8 additions & 2 deletions pkg/domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1283,16 +1283,22 @@ func ConfigureTiFlashPDForPartitions(accel bool, definitions *[]model.PartitionD
}

// StoreInternalSession is the entry function for store an internal session to SessionManager.
<<<<<<< HEAD
func StoreInternalSession(se interface{}) {
=======
// return whether the session is stored successfully.
func StoreInternalSession(se any) bool {
>>>>>>> bf704fd635c (domain: make the transaction from `initStatsCtx` blocking gc (#53602))
is, err := getGlobalInfoSyncer()
if err != nil {
return
return false
}
sm := is.GetSessionManager()
if sm == nil {
return
return false
}
sm.StoreInternalSession(se)
return true
}

// DeleteInternalSession is the entry function for delete an internal session from SessionManager.
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ go_test(
"@com_github_prometheus_client_golang//prometheus/testutil",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
Expand Down
45 changes: 45 additions & 0 deletions pkg/server/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/server/internal/util"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func TestUptime(t *testing.T) {
Expand Down Expand Up @@ -63,3 +65,46 @@ func TestUptime(t *testing.T) {
require.NoError(t, err)
require.GreaterOrEqual(t, stats[upTime].(int64), int64(time.Since(time.Unix(1282967700, 0)).Seconds()))
}

func TestInitStatsSessionBlockGC(t *testing.T) {
origConfig := config.GetGlobalConfig()
defer func() {
config.StoreGlobalConfig(origConfig)
}()
newConfig := *origConfig
for _, lite := range []bool{false, true} {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStats", "pause"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStatsLite", "pause"))
newConfig.Performance.LiteInitStats = lite
config.StoreGlobalConfig(&newConfig)

store, err := mockstore.NewMockStore()
require.NoError(t, err)
dom, err := session.BootstrapSession(store)
require.NoError(t, err)

infoSyncer := dom.InfoSyncer()
sv := CreateMockServer(t, store)
sv.SetDomain(dom)
infoSyncer.SetSessionManager(sv)
time.Sleep(time.Second)
require.Eventually(t, func() bool {
now := time.Now()
startTSList := sv.GetInternalSessionStartTSList()
for _, startTs := range startTSList {
if startTs != 0 {
startTime := oracle.GetTimeFromTS(startTs)
// test pass if the min_start_ts is blocked over 1s.
if now.Sub(startTime) > time.Second {
return true
}
}
}
return false
}, 10*time.Second, 10*time.Millisecond, "min_start_ts is not blocked over 1s")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStats"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStatsLite"))
dom.Close()
require.NoError(t, store.Close())
}
}
3 changes: 3 additions & 0 deletions pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ go_library(
"//pkg/util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
<<<<<<< HEAD
"@com_github_tiancaiamao_gp//:gp",
"@org_golang_x_sync//singleflight",
"@org_uber_go_atomic//:atomic",
=======
>>>>>>> bf704fd635c (domain: make the transaction from `initStatsCtx` blocking gc (#53602))
"@org_uber_go_zap//:zap",
],
)
Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -700,6 +701,7 @@ func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) {
if err != nil {
return err
}
failpoint.Inject("beforeInitStatsLite", func() {})
cache, err := h.initStatsMeta(is)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -727,6 +729,7 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
if err != nil {
return err
}
failpoint.Inject("beforeInitStats", func() {})
cache, err := h.initStatsMeta(is)
if err != nil {
return errors.Trace(err)
Expand Down

0 comments on commit 0a14d80

Please sign in to comment.