Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: Implement scan and delete task for TTL (#39481) #39615

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2127,6 +2127,45 @@ var defaultSysVars = []*SysVar{
s.EnableReuseCheck = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLJobEnable, Value: BoolToOnOff(DefTiDBTTLJobEnable), Type: TypeBool, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
EnableTTLJob.Store(TiDBOptOn(s))
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return BoolToOnOff(EnableTTLJob.Load()), nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLScanBatchSize, Value: strconv.Itoa(DefTiDBTTLScanBatchSize), Type: TypeInt, MinValue: DefTiDBTTLScanBatchMinSize, MaxValue: DefTiDBTTLScanBatchMaxSize, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
TTLScanBatchSize.Store(val)
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
val := TTLScanBatchSize.Load()
return strconv.FormatInt(val, 10), nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLDeleteBatchSize, Value: strconv.Itoa(DefTiDBTTLDeleteBatchSize), Type: TypeInt, MinValue: DefTiDBTTLDeleteBatchMinSize, MaxValue: DefTiDBTTLDeleteBatchMaxSize, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
TTLDeleteBatchSize.Store(val)
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
val := TTLDeleteBatchSize.Load()
return strconv.FormatInt(val, 10), nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLDeleteRateLimit, Value: strconv.Itoa(DefTiDBTTLDeleteRateLimit), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
TTLDeleteRateLimit.Store(val)
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
val := TTLDeleteRateLimit.Load()
return strconv.FormatInt(val, 10), nil
}},
{
Scope: ScopeGlobal | ScopeSession, Name: TiDBStoreBatchSize, Value: strconv.FormatInt(DefTiDBStoreBatchSize, 10),
Type: TypeInt, MinValue: 0, MaxValue: 25000, SetSession: func(s *SessionVars, val string) error {
Expand Down
20 changes: 20 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,14 @@ const (
TiDBGOGCTunerThreshold = "tidb_gogc_tuner_threshold"
// TiDBExternalTS is the ts to read through when the `TiDBEnableExternalTsRead` is on
TiDBExternalTS = "tidb_external_ts"
// TiDBTTLJobEnable is used to enable/disable scheduling ttl job
TiDBTTLJobEnable = "tidb_ttl_job_enable"
// TiDBTTLScanBatchSize is used to control the batch size in the SELECT statement for TTL jobs
TiDBTTLScanBatchSize = "tidb_ttl_scan_batch_size"
// TiDBTTLDeleteBatchSize is used to control the batch size in the DELETE statement for TTL jobs
TiDBTTLDeleteBatchSize = "tidb_ttl_delete_batch_size"
// TiDBTTLDeleteRateLimit is used to control the delete rate limit for TTL jobs in each node
TiDBTTLDeleteRateLimit = "tidb_ttl_delete_rate_limit"
// PasswordReuseHistory limit a few passwords to reuse.
PasswordReuseHistory = "password_history"
// PasswordReuseTime limit how long passwords can be reused.
Expand Down Expand Up @@ -1115,6 +1123,14 @@ const (
DefTiDBUseAlloc = false
DefTiDBEnablePlanReplayerCapture = false
DefTiDBIndexMergeIntersectionConcurrency = ConcurrencyUnset
DefTiDBTTLJobEnable = true
DefTiDBTTLScanBatchSize = 500
DefTiDBTTLScanBatchMaxSize = 10240
DefTiDBTTLScanBatchMinSize = 1
DefTiDBTTLDeleteBatchSize = 500
DefTiDBTTLDeleteBatchMaxSize = 10240
DefTiDBTTLDeleteBatchMinSize = 1
DefTiDBTTLDeleteRateLimit = 0
DefPasswordReuseHistory = 0
DefPasswordReuseTime = 0
DefTiDBStoreBatchSize = 0
Expand Down Expand Up @@ -1180,6 +1196,10 @@ var (
PasswordValidationMixedCaseCount = atomic.NewInt32(1)
PasswordValidtaionNumberCount = atomic.NewInt32(1)
PasswordValidationSpecialCharCount = atomic.NewInt32(1)
EnableTTLJob = atomic.NewBool(DefTiDBTTLJobEnable)
TTLScanBatchSize = atomic.NewInt64(DefTiDBTTLScanBatchSize)
TTLDeleteBatchSize = atomic.NewInt64(DefTiDBTTLDeleteBatchSize)
TTLDeleteRateLimit = atomic.NewInt64(DefTiDBTTLDeleteRateLimit)
PasswordHistory = atomic.NewInt64(DefPasswordReuseHistory)
PasswordReuseInterval = atomic.NewInt64(DefPasswordReuseTime)
IsSandBoxModeEnabled = atomic.NewBool(false)
Expand Down
7 changes: 7 additions & 0 deletions ttl/cache/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func getTableKeyColumns(tbl *model.TableInfo) ([]*model.ColumnInfo, []*types.Fie

// PhysicalTable is used to provide some information for a physical table in TTL job
type PhysicalTable struct {
// ID is the physical ID of the table
ID int64
// Schema is the database name of the table
Schema model.CIStr
*model.TableInfo
Expand Down Expand Up @@ -92,11 +94,13 @@ func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.
return nil, err
}

var physicalID int64
var partitionDef *model.PartitionDefinition
if tbl.Partition == nil {
if partition.L != "" {
return nil, errors.Errorf("table '%s.%s' is not a partitioned table", schema, tbl.Name)
}
physicalID = tbl.ID
} else {
if partition.L == "" {
return nil, errors.Errorf("partition name is required, table '%s.%s' is a partitioned table", schema, tbl.Name)
Expand All @@ -112,9 +116,12 @@ func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.
if partitionDef == nil {
return nil, errors.Errorf("partition '%s' is not found in ttl table '%s.%s'", partition.O, schema, tbl.Name)
}

physicalID = partitionDef.ID
}

return &PhysicalTable{
ID: physicalID,
Schema: schema,
TableInfo: tbl,
Partition: partition,
Expand Down
4 changes: 3 additions & 1 deletion ttl/cache/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestNewTTLTable(t *testing.T) {
physicalTbls = append(physicalTbls, ttlTbl)
} else {
for _, partition := range tblInfo.Partition.Definitions {
ttlTbl, err := cache.NewPhysicalTable(model.NewCIStr(c.db), tblInfo, model.NewCIStr(partition.Name.O))
ttlTbl, err := cache.NewPhysicalTable(model.NewCIStr(c.db), tblInfo, partition.Name)
if c.timeCol == "" {
require.Error(t, err)
continue
Expand All @@ -131,10 +131,12 @@ func TestNewTTLTable(t *testing.T) {
require.Same(t, timeColumn, ttlTbl.TimeColumn)

if tblInfo.Partition == nil {
require.Equal(t, ttlTbl.TableInfo.ID, ttlTbl.ID)
require.Equal(t, "", ttlTbl.Partition.L)
require.Nil(t, ttlTbl.PartitionDef)
} else {
def := tblInfo.Partition.Definitions[i]
require.Equal(t, def.ID, ttlTbl.ID)
require.Equal(t, def.Name.L, ttlTbl.Partition.L)
require.Equal(t, def, *(ttlTbl.PartitionDef))
}
Expand Down
5 changes: 4 additions & 1 deletion ttl/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//kv",
"//parser/terror",
"//sessionctx",
"//sessionctx/variable",
"//sessiontxn",
"//util/chunk",
"//util/sqlexec",
Expand All @@ -22,10 +23,12 @@ go_test(
srcs = [
"main_test.go",
"session_test.go",
"sysvar_test.go",
],
embed = [":session"],
flaky = True,
deps = [
":session",
"//sessionctx/variable",
"//testkit",
"//testkit/testsetup",
"@com_github_pingcap_errors//:errors",
Expand Down
24 changes: 24 additions & 0 deletions ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
Expand All @@ -36,6 +37,8 @@ type Session interface {
ExecuteSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error)
// RunInTxn executes the specified function in a txn
RunInTxn(ctx context.Context, fn func() error) (err error)
// ResetWithGlobalTimeZone resets the session time zone to global time zone
ResetWithGlobalTimeZone(ctx context.Context) error
// Close closes the session
Close()
}
Expand Down Expand Up @@ -112,6 +115,27 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) {
return err
}

// ResetWithGlobalTimeZone resets the session time zone to global time zone
func (s *session) ResetWithGlobalTimeZone(ctx context.Context) error {
sessVar := s.GetSessionVars()
globalTZ, err := sessVar.GetGlobalSystemVar(ctx, variable.TimeZone)
if err != nil {
return err
}

tz, err := sessVar.GetSessionOrGlobalSystemVar(ctx, variable.TimeZone)
if err != nil {
return err
}

if globalTZ == tz {
return nil
}

_, err = s.ExecuteSQL(ctx, "SET @@time_zone=@@global.time_zone")
return err
}

// Close closes the session
func (s *session) Close() {
if s.closeFn != nil {
Expand Down
17 changes: 15 additions & 2 deletions ttl/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package session
package session_test

import (
"context"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/session"
"github.com/stretchr/testify/require"
)

Expand All @@ -28,7 +29,7 @@ func TestSessionRunInTxn(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(id int primary key, v int)")
se := NewSession(tk.Session(), tk.Session(), nil)
se := session.NewSession(tk.Session(), tk.Session(), nil)
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

Expand All @@ -50,3 +51,15 @@ func TestSessionRunInTxn(t *testing.T) {
}))
tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10", "3 30"))
}

func TestSessionResetTimeZone(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.time_zone='UTC'")
tk.MustExec("set @@time_zone='Asia/Shanghai'")

se := session.NewSession(tk.Session(), tk.Session(), nil)
tk.MustQuery("select @@time_zone").Check(testkit.Rows("Asia/Shanghai"))
require.NoError(t, se.ResetWithGlobalTimeZone(context.TODO()))
tk.MustQuery("select @@time_zone").Check(testkit.Rows("UTC"))
}
125 changes: 125 additions & 0 deletions ttl/session/sysvar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package session_test

import (
"fmt"
"strconv"
"testing"

"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func TestSysVarTTLJobEnable(t *testing.T) {
origEnableDDL := variable.EnableTTLJob.Load()
defer func() {
variable.EnableTTLJob.Store(origEnableDDL)
}()

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_ttl_job_enable=0")
require.False(t, variable.EnableTTLJob.Load())
tk.MustQuery("select @@global.tidb_ttl_job_enable").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_ttl_job_enable").Check(testkit.Rows("0"))

tk.MustExec("set @@global.tidb_ttl_job_enable=1")
require.True(t, variable.EnableTTLJob.Load())
tk.MustQuery("select @@global.tidb_ttl_job_enable").Check(testkit.Rows("1"))
tk.MustQuery("select @@tidb_ttl_job_enable").Check(testkit.Rows("1"))

tk.MustExec("set @@global.tidb_ttl_job_enable=0")
require.False(t, variable.EnableTTLJob.Load())
tk.MustQuery("select @@global.tidb_ttl_job_enable").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_ttl_job_enable").Check(testkit.Rows("0"))
}

func TestSysVarTTLScanBatchSize(t *testing.T) {
origScanBatchSize := variable.TTLScanBatchSize.Load()
defer func() {
variable.TTLScanBatchSize.Store(origScanBatchSize)
}()

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_ttl_scan_batch_size=789")
require.Equal(t, int64(789), variable.TTLScanBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_scan_batch_size").Check(testkit.Rows("789"))
tk.MustQuery("select @@tidb_ttl_scan_batch_size").Check(testkit.Rows("789"))

tk.MustExec("set @@global.tidb_ttl_scan_batch_size=0")
require.Equal(t, int64(1), variable.TTLScanBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_scan_batch_size").Check(testkit.Rows("1"))
tk.MustQuery("select @@tidb_ttl_scan_batch_size").Check(testkit.Rows("1"))

maxVal := int64(variable.DefTiDBTTLScanBatchMaxSize)
tk.MustExec(fmt.Sprintf("set @@global.tidb_ttl_scan_batch_size=%d", maxVal+1))
require.Equal(t, maxVal, variable.TTLScanBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_scan_batch_size").Check(testkit.Rows(strconv.FormatInt(maxVal, 10)))
tk.MustQuery("select @@tidb_ttl_scan_batch_size").Check(testkit.Rows(strconv.FormatInt(maxVal, 10)))
}

func TestSysVarTTLScanDeleteBatchSize(t *testing.T) {
origScanBatchSize := variable.TTLScanBatchSize.Load()
defer func() {
variable.TTLScanBatchSize.Store(origScanBatchSize)
}()

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_ttl_delete_batch_size=789")
require.Equal(t, int64(789), variable.TTLDeleteBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_batch_size").Check(testkit.Rows("789"))
tk.MustQuery("select @@tidb_ttl_delete_batch_size").Check(testkit.Rows("789"))

tk.MustExec("set @@global.tidb_ttl_delete_batch_size=0")
require.Equal(t, int64(1), variable.TTLDeleteBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_batch_size").Check(testkit.Rows("1"))
tk.MustQuery("select @@tidb_ttl_delete_batch_size").Check(testkit.Rows("1"))

maxVal := int64(variable.DefTiDBTTLDeleteBatchMaxSize)
tk.MustExec(fmt.Sprintf("set @@global.tidb_ttl_delete_batch_size=%d", maxVal+1))
require.Equal(t, maxVal, variable.TTLDeleteBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_batch_size").Check(testkit.Rows(strconv.FormatInt(maxVal, 10)))
tk.MustQuery("select @@tidb_ttl_delete_batch_size").Check(testkit.Rows(strconv.FormatInt(maxVal, 10)))
}

func TestSysVarTTLScanDeleteLimit(t *testing.T) {
origDeleteLimit := variable.TTLDeleteRateLimit.Load()
defer func() {
variable.TTLDeleteRateLimit.Store(origDeleteLimit)
}()

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustQuery("select @@global.tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))

tk.MustExec("set @@global.tidb_ttl_delete_rate_limit=100000")
require.Equal(t, int64(100000), variable.TTLDeleteRateLimit.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_rate_limit").Check(testkit.Rows("100000"))
tk.MustQuery("select @@tidb_ttl_delete_rate_limit").Check(testkit.Rows("100000"))

tk.MustExec("set @@global.tidb_ttl_delete_rate_limit=0")
require.Equal(t, int64(0), variable.TTLDeleteRateLimit.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))

tk.MustExec("set @@global.tidb_ttl_delete_rate_limit=-1")
require.Equal(t, int64(0), variable.TTLDeleteRateLimit.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))
}
Loading