Skip to content

Commit

Permalink
add info schema cache and ttl status table cache
Browse files Browse the repository at this point in the history
Signed-off-by: YangKeao <[email protected]>
  • Loading branch information
YangKeao committed Dec 5, 2022
1 parent a7c4c71 commit 04cd9b8
Show file tree
Hide file tree
Showing 9 changed files with 652 additions and 4 deletions.
18 changes: 16 additions & 2 deletions ttl/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,49 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "cache",
srcs = ["table.go"],
srcs = [
"base.go",
"infoschema.go",
"table.go",
"ttlstatus.go",
],
importpath = "github.com/pingcap/tidb/ttl/cache",
visibility = ["//visibility:public"],
deps = [
"//infoschema",
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//sessionctx",
"//table/tables",
"//ttl/session",
"//types",
"//util/chunk",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "cache_test",
srcs = [
"base_test.go",
"infoschema_test.go",
"main_test.go",
"table_test.go",
"ttlstatus_test.go",
],
embed = [":cache"],
flaky = True,
deps = [
":cache",
"//parser",
"//parser/model",
"//server",
"//testkit",
"//testkit/testsetup",
"//ttl/session",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
],
Expand Down
41 changes: 41 additions & 0 deletions ttl/cache/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache

import (
"time"
)

type baseCache struct {
interval time.Duration

updateTime time.Time
}

func newBaseCache(interval time.Duration) baseCache {
return baseCache{
interval: interval,
}
}

// ShouldUpdate returns whether this cache needs update
func (bc *baseCache) ShouldUpdate() bool {
return time.Since(bc.updateTime) > bc.interval
}

// SetInterval sets the interval of updating cache
func (bc *baseCache) SetInterval(interval time.Duration) {
bc.interval = interval
}
33 changes: 33 additions & 0 deletions ttl/cache/base_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestBaseCache(t *testing.T) {
baseCache := newBaseCache(time.Nanosecond)
time.Sleep(time.Microsecond)

assert.True(t, baseCache.ShouldUpdate())

baseCache.updateTime = time.Now()
baseCache.SetInterval(time.Hour)
assert.False(t, baseCache.ShouldUpdate())
}
115 changes: 115 additions & 0 deletions ttl/cache/infoschema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache

import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// InfoSchemaCache is the cache for InfoSchema, it builds a map from physical table id to physical table information
type InfoSchemaCache struct {
baseCache

schemaVer int64
Tables map[int64]*PhysicalTable
}

// NewInfoSchemaCache creates the cache for info schema
func NewInfoSchemaCache(updateInterval time.Duration) *InfoSchemaCache {
return &InfoSchemaCache{
baseCache: newBaseCache(updateInterval),
}
}

// Update updates the info schema cache
func (isc *InfoSchemaCache) Update(sctx sessionctx.Context) error {
is, ok := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
if !ok {
return errors.New("fail to get domain info schema from session")
}

ext, ok := is.(*infoschema.SessionExtendedInfoSchema)
if !ok {
return errors.New("fail to get extended info schema")
}

if isc.schemaVer == ext.SchemaMetaVersion() {
return nil
}

newTables := make(map[int64]*PhysicalTable, len(isc.Tables))
for _, db := range is.AllSchemas() {
for _, tbl := range is.SchemaTables(db.Name) {
tblInfo := tbl.Meta()
if !isTTLTable(tblInfo) {
continue
}

logger := logutil.BgLogger().With(zap.String("schema", db.Name.L), zap.Int64("tableID", tblInfo.ID), zap.String("tableName", tblInfo.Name.L))

if tblInfo.Partition == nil {
ttlTable, err := isc.newTable(db.Name, tblInfo, nil)
if err != nil {
logger.Warn("fail to build info schema cache")
continue
}
newTables[tblInfo.ID] = ttlTable
continue
}

for _, par := range tblInfo.Partition.Definitions {
par := par
ttlTable, err := isc.newTable(db.Name, tblInfo, &par)
if err != nil {
logger.Warn("fail to build info schema cache", zap.Int64("partitionID", par.ID), zap.String("partition", par.Name.L))
continue
}
newTables[par.ID] = ttlTable
}
}
}

isc.schemaVer = is.SchemaMetaVersion()
isc.Tables = newTables
isc.updateTime = time.Now()
return nil
}

func (isc *InfoSchemaCache) newTable(schema model.CIStr, tblInfo *model.TableInfo, par *model.PartitionDefinition) (*PhysicalTable, error) {
id := tblInfo.ID
if par != nil {
id = par.ID
}

if isc.Tables != nil {
ttlTable, ok := isc.Tables[id]
if ok && ttlTable.TableInfo == tblInfo {
return ttlTable, nil
}
}

partitionName := model.NewCIStr("")
if par != nil {
partitionName = par.Name
}
return NewPhysicalTable(schema, tblInfo, partitionName)
}
74 changes: 74 additions & 0 deletions ttl/cache/infoschema_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache_test

import (
"testing"
"time"

"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/cache"
"github.com/stretchr/testify/assert"
)

func TestInfoSchemaCache(t *testing.T) {
parser.TTLFeatureGate = true

store, dom := testkit.CreateMockStoreAndDomain(t)
sv := server.CreateMockServer(t, store)
sv.SetDomain(dom)
defer sv.Close()

conn := server.CreateMockConn(t, sv)
sctx := conn.Context().Session
tk := testkit.NewTestKitWithSession(t, store, sctx)

isc := cache.NewInfoSchemaCache(time.Hour)

// test should update
assert.True(t, isc.ShouldUpdate())
assert.NoError(t, isc.Update(sctx))
assert.False(t, isc.ShouldUpdate())

// test new tables are synced
assert.Equal(t, 0, len(isc.Tables))
tk.MustExec("create table test.t(created_at datetime) ttl = created_at + INTERVAL 5 YEAR")
assert.NoError(t, isc.Update(sctx))
assert.Equal(t, 1, len(isc.Tables))
for _, table := range isc.Tables {
assert.Equal(t, "t", table.TableInfo.Name.L)
}

// test new partitioned table are synced
tk.MustExec("drop table test.t")
tk.MustExec(`create table test.t(created_at datetime)
ttl = created_at + INTERVAL 5 YEAR
partition by range (YEAR(created_at)) (
partition p0 values less than (1991),
partition p1 values less than (2000)
)
`)
assert.NoError(t, isc.Update(sctx))
assert.Equal(t, 2, len(isc.Tables))
partitions := []string{}
for id, table := range isc.Tables {
assert.Equal(t, "t", table.TableInfo.Name.L)
assert.Equal(t, id, table.PartitionDef.ID)
partitions = append(partitions, table.PartitionDef.Name.L)
}
assert.ElementsMatch(t, []string{"p0", "p1"}, partitions)
}
2 changes: 1 addition & 1 deletion ttl/cache/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
6 changes: 5 additions & 1 deletion ttl/cache/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -132,6 +132,10 @@ func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.
}, nil
}

func isTTLTable(tbl *model.TableInfo) bool {
return tbl.TTLInfo != nil && tbl.State == model.StatePublic
}

// ValidateKey validates a key
func (t *PhysicalTable) ValidateKey(key []types.Datum) error {
if len(t.KeyColumns) != len(key) {
Expand Down
Loading

0 comments on commit 04cd9b8

Please sign in to comment.