Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add global variable tidb_schema_version_cache_limit to control infoschema cache size #46558

Merged
merged 10 commits into from
Sep 8, 2023
2 changes: 1 addition & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
exit: make(chan struct{}),
sysSessionPool: newSessionPool(capacity, factory),
statsLease: statsLease,
infoCache: infoschema.NewCache(16),
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
infoCache: infoschema.NewCache(int(variable.SchemaVersionCacheLimit.Load())),
slowQuery: newTopNSlowQueries(config.GetGlobalConfig().InMemSlowQueryTopNNum, time.Hour*24*7, config.GetGlobalConfig().InMemSlowQueryRecentNum),
indexUsageSyncLease: idxUsageSyncLease,
dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName(), GetExtractTaskDirName()}},
Expand Down
1 change: 1 addition & 0 deletions domain/sysvar_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,5 +158,6 @@ func (do *Domain) rebuildSysVarCache(ctx sessionctx.Context) error {
defer do.sysVarCache.Unlock()
do.sysVarCache.session = newSessionCache
do.sysVarCache.global = newGlobalCache
do.infoCache.ReSize(int(variable.SchemaVersionCacheLimit.Load()))
return nil
}
17 changes: 17 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,23 @@ func TestSetVar(t *testing.T) {
tk.MustQuery("select @@session.tidb_skip_missing_partition_stats").Check(testkit.Rows("0"))
tk.MustExec("set session tidb_skip_missing_partition_stats = 1")
tk.MustQuery("select @@session.tidb_skip_missing_partition_stats").Check(testkit.Rows("1"))

// test tidb_schema_version_cache_limit
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("16"))
tk.MustExec("set @@global.tidb_schema_version_cache_limit=64;")
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("64"))
tk.MustExec("set @@global.tidb_schema_version_cache_limit=2;")
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("2"))
tk.MustExec("set @@global.tidb_schema_version_cache_limit=256;")
tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_schema_version_cache_limit value: '256'"))
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("255"))
tk.MustExec("set @@global.tidb_schema_version_cache_limit=0;")
tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_schema_version_cache_limit value: '0'"))
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("2"))
tk.MustGetErrMsg("set @@global.tidb_schema_version_cache_limit='x';", "[variable:1232]Incorrect argument type to variable 'tidb_schema_version_cache_limit'")
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("2"))
tk.MustExec("set @@global.tidb_schema_version_cache_limit=64;")
tk.MustQuery("select @@global.tidb_schema_version_cache_limit").Check(testkit.Rows("64"))
}

func TestGetSetNoopVars(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions infoschema/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,30 @@ func NewCache(capacity int) *InfoCache {
}
}

// ReSize re-size the cache.
func (h *InfoCache) ReSize(capacity int) {
h.mu.Lock()
defer h.mu.Unlock()
if cap(h.cache) == capacity {
return
}
oldCache := h.cache
h.cache = make([]schemaAndTimestamp, 0, capacity)
for i, v := range oldCache {
if i >= capacity {
break
}
h.cache = append(h.cache, v)
}
}

// Size returns the size of the cache, export for test.
func (h *InfoCache) Size() int {
h.mu.Lock()
defer h.mu.Unlock()
return len(h.cache)
}

// Reset resets the cache.
func (h *InfoCache) Reset(capacity int) {
h.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion infoschema/test/cachetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 5,
shard_count = 6,
deps = [
"//infoschema",
"//testkit/testsetup",
Expand Down
34 changes: 34 additions & 0 deletions infoschema/test/cachetest/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,37 @@ func TestGetByTimestamp(t *testing.T) {
require.Equal(t, is3, ic.GetBySnapshotTS(3))
require.Equal(t, 3, ic.Len())
}

func TestReSize(t *testing.T) {
ic := infoschema.NewCache(2)
require.NotNil(t, ic)
is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1)
ic.Insert(is1, 1)
is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2)
ic.Insert(is2, 2)

ic.ReSize(3)
require.Equal(t, 2, ic.Size())
require.Equal(t, is1, ic.GetByVersion(1))
require.Equal(t, is2, ic.GetByVersion(2))
is3 := infoschema.MockInfoSchemaWithSchemaVer(nil, 3)
require.True(t, ic.Insert(is3, 3))
require.Equal(t, is1, ic.GetByVersion(1))
require.Equal(t, is2, ic.GetByVersion(2))
require.Equal(t, is3, ic.GetByVersion(3))

ic.ReSize(1)
require.Equal(t, 1, ic.Size())
require.Nil(t, ic.GetByVersion(1))
require.Nil(t, ic.GetByVersion(2))
require.Equal(t, is3, ic.GetByVersion(3))
require.False(t, ic.Insert(is2, 2))
require.Equal(t, 1, ic.Size())
is4 := infoschema.MockInfoSchemaWithSchemaVer(nil, 4)
require.True(t, ic.Insert(is4, 4))
require.Equal(t, 1, ic.Size())
require.Nil(t, ic.GetByVersion(1))
require.Nil(t, ic.GetByVersion(2))
require.Nil(t, ic.GetByVersion(3))
require.Equal(t, is4, ic.GetByVersion(4))
}
5 changes: 5 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2852,6 +2852,11 @@ var defaultSysVars = []*SysVar{
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return ServiceScope.Load(), nil
}},
{Scope: ScopeGlobal, Name: TiDBSchemaVersionCacheLimit, Value: strconv.Itoa(DefTiDBSchemaVersionCacheLimit), Type: TypeInt, MinValue: 2, MaxValue: math.MaxUint8, AllowEmpty: true,
SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
SchemaVersionCacheLimit.Store(TidbOptInt64(val, DefTiDBSchemaVersionCacheLimit))
return nil
}},
}

func setTiFlashComputeDispatchPolicy(s *SessionVars, val string) error {
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,8 @@ const (
TiDBSessionAlias = "tidb_session_alias"
// TiDBServiceScope indicates the role for tidb for distributed task framework.
TiDBServiceScope = "tidb_service_scope"
// TiDBSchemaVersionCacheLimit defines the capacity size of domain infoSchema cache.
TiDBSchemaVersionCacheLimit = "tidb_schema_version_cache_limit"
)

// TiDB intentional limits
Expand Down Expand Up @@ -1404,6 +1406,7 @@ const (
DefTiDBEnableCheckConstraint = false
DefTiDBSkipMissingPartitionStats = true
DefTiDBOptObjective = OptObjectiveModerate
DefTiDBSchemaVersionCacheLimit = 16
)

// Process global variables.
Expand Down Expand Up @@ -1500,6 +1503,7 @@ var (
EnableCheckConstraint = atomic.NewBool(DefTiDBEnableCheckConstraint)
SkipMissingPartitionStats = atomic.NewBool(DefTiDBSkipMissingPartitionStats)
ServiceScope = atomic.NewString("")
SchemaVersionCacheLimit = atomic.NewInt64(DefTiDBSchemaVersionCacheLimit)
)

var (
Expand Down