diff --git a/ttl/cache/BUILD.bazel b/ttl/cache/BUILD.bazel index 5ca508152da7b..e632f113a580b 100644 --- a/ttl/cache/BUILD.bazel +++ b/ttl/cache/BUILD.bazel @@ -2,35 +2,49 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "cache", - srcs = ["table.go"], + srcs = [ + "base.go", + "infoschema.go", + "table.go", + "ttlstatus.go", + ], importpath = "github.com/pingcap/tidb/ttl/cache", visibility = ["//visibility:public"], deps = [ + "//infoschema", "//parser/ast", "//parser/model", "//parser/mysql", + "//sessionctx", "//table/tables", "//ttl/session", "//types", "//util/chunk", + "//util/logutil", "@com_github_pingcap_errors//:errors", + "@org_uber_go_zap//:zap", ], ) go_test( name = "cache_test", srcs = [ + "base_test.go", + "infoschema_test.go", "main_test.go", "table_test.go", + "ttlstatus_test.go", ], + embed = [":cache"], flaky = True, deps = [ - ":cache", "//parser", "//parser/model", + "//server", "//testkit", "//testkit/testsetup", "//ttl/session", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", ], diff --git a/ttl/cache/base.go b/ttl/cache/base.go new file mode 100644 index 0000000000000..cc2ece5c9bdc1 --- /dev/null +++ b/ttl/cache/base.go @@ -0,0 +1,41 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "time" +) + +type baseCache struct { + interval time.Duration + + updateTime time.Time +} + +func newBaseCache(interval time.Duration) baseCache { + return baseCache{ + interval: interval, + } +} + +// ShouldUpdate returns whether this cache needs update +func (bc *baseCache) ShouldUpdate() bool { + return time.Since(bc.updateTime) > bc.interval +} + +// SetInterval sets the interval of updating cache +func (bc *baseCache) SetInterval(interval time.Duration) { + bc.interval = interval +} diff --git a/ttl/cache/base_test.go b/ttl/cache/base_test.go new file mode 100644 index 0000000000000..838cfbbf55329 --- /dev/null +++ b/ttl/cache/base_test.go @@ -0,0 +1,33 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestBaseCache(t *testing.T) { + baseCache := newBaseCache(time.Nanosecond) + time.Sleep(time.Microsecond) + + assert.True(t, baseCache.ShouldUpdate()) + + baseCache.updateTime = time.Now() + baseCache.SetInterval(time.Hour) + assert.False(t, baseCache.ShouldUpdate()) +} diff --git a/ttl/cache/infoschema.go b/ttl/cache/infoschema.go new file mode 100644 index 0000000000000..afff0ea725847 --- /dev/null +++ b/ttl/cache/infoschema.go @@ -0,0 +1,115 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +// InfoSchemaCache is the cache for InfoSchema, it builds a map from physical table id to physical table information +type InfoSchemaCache struct { + baseCache + + schemaVer int64 + Tables map[int64]*PhysicalTable +} + +// NewInfoSchemaCache creates the cache for info schema +func NewInfoSchemaCache(updateInterval time.Duration) *InfoSchemaCache { + return &InfoSchemaCache{ + baseCache: newBaseCache(updateInterval), + } +} + +// Update updates the info schema cache +func (isc *InfoSchemaCache) Update(sctx sessionctx.Context) error { + is, ok := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + if !ok { + return errors.New("fail to get domain info schema from session") + } + + ext, ok := is.(*infoschema.SessionExtendedInfoSchema) + if !ok { + return errors.New("fail to get extended info schema") + } + + if isc.schemaVer == ext.SchemaMetaVersion() { + return nil + } + + newTables := make(map[int64]*PhysicalTable, len(isc.Tables)) + for _, db := range is.AllSchemas() { + for _, tbl := range is.SchemaTables(db.Name) { + tblInfo := tbl.Meta() + if tblInfo.TTLInfo == nil || tblInfo.State != model.StatePublic { + continue + } + + logger := logutil.BgLogger().With(zap.String("schema", db.Name.L), zap.Int64("tableID", tblInfo.ID), zap.String("tableName", tblInfo.Name.L)) + + if tblInfo.Partition == nil { + ttlTable, err := isc.newTable(db.Name, tblInfo, nil) + if err != nil { + logger.Warn("fail to build info schema cache", zap.Error(err)) + continue + } + newTables[tblInfo.ID] = ttlTable + continue + } + + for _, par := range tblInfo.Partition.Definitions { + par := par + ttlTable, err := isc.newTable(db.Name, tblInfo, &par) + if err != nil { + logger.Warn("fail to build info schema cache", zap.Int64("partitionID", par.ID), zap.String("partition", par.Name.L), zap.Error(err)) + continue + } + newTables[par.ID] = ttlTable + } + } + } + + isc.schemaVer = is.SchemaMetaVersion() + isc.Tables = newTables + isc.updateTime = time.Now() + return nil +} + +func (isc *InfoSchemaCache) newTable(schema model.CIStr, tblInfo *model.TableInfo, par *model.PartitionDefinition) (*PhysicalTable, error) { + id := tblInfo.ID + if par != nil { + id = par.ID + } + + if isc.Tables != nil { + ttlTable, ok := isc.Tables[id] + if ok && ttlTable.TableInfo == tblInfo { + return ttlTable, nil + } + } + + partitionName := model.NewCIStr("") + if par != nil { + partitionName = par.Name + } + return NewPhysicalTable(schema, tblInfo, partitionName) +} diff --git a/ttl/cache/infoschema_test.go b/ttl/cache/infoschema_test.go new file mode 100644 index 0000000000000..5ba99a2b69703 --- /dev/null +++ b/ttl/cache/infoschema_test.go @@ -0,0 +1,74 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache_test + +import ( + "testing" + "time" + + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/server" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/ttl/cache" + "github.com/stretchr/testify/assert" +) + +func TestInfoSchemaCache(t *testing.T) { + parser.TTLFeatureGate = true + + store, dom := testkit.CreateMockStoreAndDomain(t) + sv := server.CreateMockServer(t, store) + sv.SetDomain(dom) + defer sv.Close() + + conn := server.CreateMockConn(t, sv) + sctx := conn.Context().Session + tk := testkit.NewTestKitWithSession(t, store, sctx) + + isc := cache.NewInfoSchemaCache(time.Hour) + + // test should update + assert.True(t, isc.ShouldUpdate()) + assert.NoError(t, isc.Update(sctx)) + assert.False(t, isc.ShouldUpdate()) + + // test new tables are synced + assert.Equal(t, 0, len(isc.Tables)) + tk.MustExec("create table test.t(created_at datetime) ttl = created_at + INTERVAL 5 YEAR") + assert.NoError(t, isc.Update(sctx)) + assert.Equal(t, 1, len(isc.Tables)) + for _, table := range isc.Tables { + assert.Equal(t, "t", table.TableInfo.Name.L) + } + + // test new partitioned table are synced + tk.MustExec("drop table test.t") + tk.MustExec(`create table test.t(created_at datetime) + ttl = created_at + INTERVAL 5 YEAR + partition by range (YEAR(created_at)) ( + partition p0 values less than (1991), + partition p1 values less than (2000) + ) + `) + assert.NoError(t, isc.Update(sctx)) + assert.Equal(t, 2, len(isc.Tables)) + partitions := []string{} + for id, table := range isc.Tables { + assert.Equal(t, "t", table.TableInfo.Name.L) + assert.Equal(t, id, table.PartitionDef.ID) + partitions = append(partitions, table.PartitionDef.Name.L) + } + assert.ElementsMatch(t, []string{"p0", "p1"}, partitions) +} diff --git a/ttl/cache/main_test.go b/ttl/cache/main_test.go index 9a9867d13d47d..e3846871bb55b 100644 --- a/ttl/cache/main_test.go +++ b/ttl/cache/main_test.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, diff --git a/ttl/cache/table.go b/ttl/cache/table.go index ab8c6c4b0fbd5..e1637ac33906e 100644 --- a/ttl/cache/table.go +++ b/ttl/cache/table.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, diff --git a/ttl/cache/ttlstatus.go b/ttl/cache/ttlstatus.go new file mode 100644 index 0000000000000..5222ba3025433 --- /dev/null +++ b/ttl/cache/ttlstatus.go @@ -0,0 +1,186 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "context" + "time" + + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/ttl/session" + "github.com/pingcap/tidb/util/chunk" +) + +// JobStatus represents the current status of a job +type JobStatus string + +const ( + // JobStatusWaiting means the job hasn't started + JobStatusWaiting JobStatus = "waiting" + // JobStatusRunning means this job is running + JobStatusRunning = "running" + // JobStatusCancelling means this job is being canceled, but not canceled yet + JobStatusCancelling = "cancelling" + // JobStatusCancelled means this job has been canceled successfully + JobStatusCancelled = "cancelled" + // JobStatusError means this job is in error status + JobStatusError = "error" +) + +const selectFromTTLTableStatus = "SELECT table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status" + +// TableStatus contains the corresponding information in the system table `mysql.tidb_ttl_table_status` +type TableStatus struct { + TableID int64 + ParentTableID int64 + + TableStatistics string + + LastJobID string + LastJobStartTime time.Time + LastJobFinishTime time.Time + LastJobTTLExpire time.Time + LastJobSummary string + + CurrentJobID string + CurrentJobOwnerID string + CurrentJobOwnerAddr string + CurrentJobOwnerHBTime time.Time + CurrentJobStartTime time.Time + CurrentJobTTLExpire time.Time + + CurrentJobState string + CurrentJobStatus JobStatus + CurrentJobStatusUpdateTime time.Time +} + +// TableStatusCache is the cache for ttl table status, it builds a map from physical table id to the table status +type TableStatusCache struct { + baseCache + + Tables map[int64]*TableStatus +} + +// NewTableStatusCache creates cache for ttl table status +func NewTableStatusCache(updateInterval time.Duration) *TableStatusCache { + return &TableStatusCache{ + baseCache: newBaseCache(updateInterval), + } +} + +// Update updates the table status cache +func (tsc *TableStatusCache) Update(ctx context.Context, se session.Session) error { + rows, err := se.ExecuteSQL(ctx, selectFromTTLTableStatus) + if err != nil { + return err + } + + newTables := make(map[int64]*TableStatus, len(rows)) + for _, row := range rows { + status, err := rowToTableStatus(se, row) + if err != nil { + return err + } + + newTables[status.TableID] = status + } + tsc.Tables = newTables + tsc.updateTime = time.Now() + return nil +} + +func rowToTableStatus(sctx sessionctx.Context, row chunk.Row) (*TableStatus, error) { + var err error + timeZone := sctx.GetSessionVars().TimeZone + + status := &TableStatus{ + TableID: row.GetInt64(0), + } + if !row.IsNull(1) { + status.ParentTableID = row.GetInt64(1) + } + if !row.IsNull(2) { + status.TableStatistics = row.GetString(2) + } + if !row.IsNull(3) { + status.LastJobID = row.GetString(3) + } + if !row.IsNull(4) { + status.LastJobStartTime, err = row.GetTime(4).GoTime(timeZone) + if err != nil { + return nil, err + } + } + if !row.IsNull(5) { + status.LastJobFinishTime, err = row.GetTime(5).GoTime(timeZone) + if err != nil { + return nil, err + } + } + if !row.IsNull(6) { + status.LastJobTTLExpire, err = row.GetTime(6).GoTime(timeZone) + if err != nil { + return nil, err + } + } + if !row.IsNull(7) { + status.LastJobSummary = row.GetString(7) + } + if !row.IsNull(8) { + status.CurrentJobID = row.GetString(8) + } + if !row.IsNull(9) { + status.CurrentJobOwnerID = row.GetString(9) + } + if !row.IsNull(10) { + status.CurrentJobOwnerAddr = row.GetString(10) + } + if !row.IsNull(11) { + status.CurrentJobOwnerHBTime, err = row.GetTime(11).GoTime(timeZone) + if err != nil { + return nil, err + } + } + if !row.IsNull(12) { + status.CurrentJobStartTime, err = row.GetTime(12).GoTime(timeZone) + if err != nil { + return nil, err + } + } + if !row.IsNull(13) { + status.CurrentJobTTLExpire, err = row.GetTime(13).GoTime(timeZone) + if err != nil { + return nil, err + } + } + if !row.IsNull(14) { + status.CurrentJobState = row.GetString(14) + } + if !row.IsNull(15) { + jobStatus := row.GetString(15) + if len(jobStatus) == 0 { + jobStatus = "waiting" + } + status.CurrentJobStatus = JobStatus(jobStatus) + } + if !row.IsNull(16) { + status.CurrentJobStatusUpdateTime, err = row.GetTime(16).GoTime(timeZone) + if err != nil { + return nil, err + } + } + + return status, nil +} diff --git a/ttl/cache/ttlstatus_test.go b/ttl/cache/ttlstatus_test.go new file mode 100644 index 0000000000000..134faf3a201a3 --- /dev/null +++ b/ttl/cache/ttlstatus_test.go @@ -0,0 +1,181 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pingcap/tidb/server" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/session" + "github.com/stretchr/testify/assert" +) + +func TestTTLStatusCache(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + sv := server.CreateMockServer(t, store) + sv.SetDomain(dom) + defer sv.Close() + + conn := server.CreateMockConn(t, sv) + sctx := conn.Context().Session + tk := testkit.NewTestKitWithSession(t, store, sctx) + ttlSession := session.NewSession(sctx, tk.Session(), func() {}) + + isc := cache.NewTableStatusCache(time.Hour) + + // test should update + assert.True(t, isc.ShouldUpdate()) + assert.NoError(t, isc.Update(context.Background(), ttlSession)) + assert.False(t, isc.ShouldUpdate()) + + // test new entries are synced + tk.MustExec("insert into mysql.tidb_ttl_table_status(table_id, parent_table_id) values (1, 2)") + assert.NoError(t, isc.Update(context.Background(), ttlSession)) + assert.Equal(t, 1, len(isc.Tables)) + tk.MustExec("delete from mysql.tidb_ttl_table_status where table_id = 1") + assert.NoError(t, isc.Update(context.Background(), ttlSession)) + assert.Equal(t, 0, len(isc.Tables)) + + timeZone := tk.Session().GetSessionVars().TimeZone + + // test every field of tidb_ttl_table_status can be extracted well + testCases := []struct { + columnName string + sqlLiteral string + assert func(table *cache.TableStatus) + }{ + { + "parent_table_id", + "2", + func(table *cache.TableStatus) { assert.Equal(t, int64(2), table.ParentTableID) }, + }, + { + "table_statistics", + "'test str'", + func(table *cache.TableStatus) { assert.Equal(t, "test str", table.TableStatistics) }, + }, + { + "last_job_id", + "'test job id'", + func(table *cache.TableStatus) { assert.Equal(t, "test job id", table.LastJobID) }, + }, + { + "last_job_start_time", + "'2022-12-01 16:49:01'", + func(table *cache.TableStatus) { + expectedTime, err := time.ParseInLocation("2006-01-02 15:04:05", "2022-12-01 16:49:01", timeZone) + assert.NoError(t, err) + assert.Equal(t, expectedTime, table.LastJobStartTime) + }, + }, + { + "last_job_finish_time", + "'2022-12-01 16:50:01'", + func(table *cache.TableStatus) { + expectedTime, err := time.ParseInLocation("2006-01-02 15:04:05", "2022-12-01 16:50:01", timeZone) + assert.NoError(t, err) + assert.Equal(t, expectedTime, table.LastJobFinishTime) + }, + }, + { + "last_job_ttl_expire", + "'2022-12-01 16:51:01'", + func(table *cache.TableStatus) { + expectedTime, err := time.ParseInLocation("2006-01-02 15:04:05", "2022-12-01 16:51:01", timeZone) + assert.NoError(t, err) + assert.Equal(t, expectedTime, table.LastJobTTLExpire) + }, + }, + { + "last_job_summary", + "'test summary'", + func(table *cache.TableStatus) { assert.Equal(t, "test summary", table.LastJobSummary) }, + }, + { + "current_job_id", + "'test current job id'", + func(table *cache.TableStatus) { assert.Equal(t, "test current job id", table.CurrentJobID) }, + }, + { + "current_job_owner_id", + "'test current job owner id'", + func(table *cache.TableStatus) { assert.Equal(t, "test current job owner id", table.CurrentJobOwnerID) }, + }, + { + "current_job_owner_hb_time", + "'2022-12-01 16:52:01'", + func(table *cache.TableStatus) { + expectedTime, err := time.ParseInLocation("2006-01-02 15:04:05", "2022-12-01 16:52:01", timeZone) + assert.NoError(t, err) + assert.Equal(t, expectedTime, table.CurrentJobOwnerHBTime) + }, + }, + { + "current_job_start_time", + "'2022-12-01 16:53:01'", + func(table *cache.TableStatus) { + expectedTime, err := time.ParseInLocation("2006-01-02 15:04:05", "2022-12-01 16:53:01", timeZone) + assert.NoError(t, err) + assert.Equal(t, expectedTime, table.CurrentJobStartTime) + }, + }, + { + "current_job_ttl_expire", + "'2022-12-01 16:54:01'", + func(table *cache.TableStatus) { + expectedTime, err := time.ParseInLocation("2006-01-02 15:04:05", "2022-12-01 16:54:01", timeZone) + assert.NoError(t, err) + assert.Equal(t, expectedTime, table.CurrentJobTTLExpire) + }, + }, + { + "current_job_state", + "'test state'", + func(table *cache.TableStatus) { assert.Equal(t, "test state", table.CurrentJobState) }, + }, + { + "current_job_status", + "'test status'", + func(table *cache.TableStatus) { + assert.Equal(t, cache.JobStatus("test status"), table.CurrentJobStatus) + }, + }, + { + "current_job_status_update_time", + "'2022-12-01 16:55:01'", + func(table *cache.TableStatus) { + expectedTime, err := time.ParseInLocation("2006-01-02 15:04:05", "2022-12-01 16:55:01", timeZone) + assert.NoError(t, err) + assert.Equal(t, expectedTime, table.CurrentJobStatusUpdateTime) + }, + }, + } + for index, testCase := range testCases { + t.Run(testCase.columnName, func(t *testing.T) { + sql := fmt.Sprintf(`insert into mysql.tidb_ttl_table_status (table_id, %s) values (%d, %s)`, + testCase.columnName, index, testCase.sqlLiteral) + + tk.MustExec(sql) + assert.NoError(t, isc.Update(context.Background(), ttlSession)) + assert.Equal(t, index+1, len(isc.Tables)) + testCase.assert(isc.Tables[int64(index)]) + }) + } +}