Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: make the transaction from initStatsCtx blocking gc #53602

Merged
merged 8 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2256,6 +2256,25 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
},
"analyzeJobsCleanupWorker",
)
do.wg.Run(
func() {
// The initStatsCtx is used to store the internal session for initializing stats,
// so we need the gc min start ts calculation to track it as an internal session.
// Since the session manager may not be ready at this moment, `infosync.StoreInternalSession` can fail.
// we need to retry until the session manager is ready or the init stats completes.
for !infosync.StoreInternalSession(initStatsCtx) {
waitRetry := time.After(time.Second)
select {
case <-do.StatsHandle().InitStatsDone:
return
case <-waitRetry:
}
}
<-do.StatsHandle().InitStatsDone
infosync.DeleteInternalSession(initStatsCtx)
},
"RemoveInitStatsFromInternalSessions",
)
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,16 +1243,18 @@ func ConfigureTiFlashPDForPartitions(accel bool, definitions *[]model.PartitionD
}

// StoreInternalSession is the entry function for store an internal session to SessionManager.
func StoreInternalSession(se any) {
// return whether the session is stored successfully.
func StoreInternalSession(se any) bool {
is, err := getGlobalInfoSyncer()
if err != nil {
return
return false
}
sm := is.GetSessionManager()
if sm == nil {
return
return false
}
sm.StoreInternalSession(se)
return true
}

// DeleteInternalSession is the entry function for delete an internal session from SessionManager.
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ go_test(
"@com_github_prometheus_client_golang//prometheus/testutil",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
Expand Down
45 changes: 45 additions & 0 deletions pkg/server/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/server/internal/util"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func TestUptime(t *testing.T) {
Expand Down Expand Up @@ -63,3 +65,46 @@ func TestUptime(t *testing.T) {
require.NoError(t, err)
require.GreaterOrEqual(t, stats[upTime].(int64), int64(time.Since(time.Unix(1282967700, 0)).Seconds()))
}

func TestInitStatsSessionBlockGC(t *testing.T) {
origConfig := config.GetGlobalConfig()
defer func() {
config.StoreGlobalConfig(origConfig)
}()
newConfig := *origConfig
for _, lite := range []bool{false, true} {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStats", "pause"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStatsLite", "pause"))
newConfig.Performance.LiteInitStats = lite
config.StoreGlobalConfig(&newConfig)

store, err := mockstore.NewMockStore()
require.NoError(t, err)
dom, err := session.BootstrapSession(store)
require.NoError(t, err)

infoSyncer := dom.InfoSyncer()
sv := CreateMockServer(t, store)
sv.SetDomain(dom)
infoSyncer.SetSessionManager(sv)
time.Sleep(time.Second)
require.Eventually(t, func() bool {
now := time.Now()
startTSList := sv.GetInternalSessionStartTSList()
for _, startTs := range startTSList {
if startTs != 0 {
startTime := oracle.GetTimeFromTS(startTs)
// test pass if the min_start_ts is blocked over 1s.
if now.Sub(startTime) > time.Second {
return true
}
}
}
return false
}, 10*time.Second, 10*time.Millisecond, "min_start_ts is not blocked over 1s")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStats"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/beforeInitStatsLite"))
dom.Close()
require.NoError(t, store.Close())
}
}
1 change: 1 addition & 0 deletions pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"//pkg/util/chunk",
"//pkg/util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_uber_go_zap//:zap",
],
)
Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -690,6 +691,7 @@ func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) {
if err != nil {
return err
}
failpoint.Inject("beforeInitStatsLite", func() {})
cache, err := h.initStatsMeta(is)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -717,6 +719,7 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
if err != nil {
return err
}
failpoint.Inject("beforeInitStats", func() {})
cache, err := h.initStatsMeta(is)
if err != nil {
return errors.Trace(err)
Expand Down