Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: introduce ttl.PhysicalTable to provide some common methods for TTL #39429

Merged
merged 13 commits into from
Nov 29, 2022
2 changes: 2 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,6 @@ const (
InternalTxnBR = InternalTxnTools
// InternalTxnTrace handles the trace statement.
InternalTxnTrace = "Trace"
// InternalTxnTTL is the type of TTL usage
InternalTxnTTL = "TTL"
)
12 changes: 11 additions & 1 deletion ttl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "ttl",
srcs = [
"session.go",
"sql.go",
"table.go",
],
importpath = "github.com/pingcap/tidb/ttl",
visibility = ["//visibility:public"],
deps = [
"//kv",
"//parser/ast",
"//parser/format",
"//parser/model",
"//parser/mysql",
"//parser/terror",
"//sessionctx",
"//sessionctx/variable",
"//table/tables",
"//types",
"//util/chunk",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pkg_errors//:errors",
Expand All @@ -24,11 +31,13 @@ go_test(
name = "ttl_test",
srcs = [
"main_test.go",
"session_test.go",
"sql_test.go",
"table_test.go",
],
embed = [":ttl"],
flaky = True,
deps = [
":ttl",
"//kv",
"//parser",
"//parser/ast",
Expand All @@ -38,6 +47,7 @@ go_test(
"//testkit/testsetup",
"//types",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
],
Expand Down
97 changes: 97 additions & 0 deletions ttl/session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ttl

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
)

// Session is used to execute queries for TTL case
type Session struct {
Sctx sessionctx.Context
SQLExec sqlexec.SQLExecutor
CloseFn func()
}

// GetSessionVars returns the sessionVars
func (s *Session) GetSessionVars() *variable.SessionVars {
if s.Sctx != nil {
return s.Sctx.GetSessionVars()
}
return nil
}

// ExecuteSQL executes the sql
func (s *Session) ExecuteSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) {
if s.SQLExec == nil {
return nil, errors.New("session is closed")
}

ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnTTL)
rs, err := s.SQLExec.ExecuteInternal(ctx, sql, args...)
if err != nil {
return nil, err
}

if rs == nil {
return nil, nil
}

defer func() {
terror.Log(rs.Close())
}()

return sqlexec.DrainRecordSet(ctx, rs, 8)
}

// RunInTxn executes the specified function in a txn
func (s *Session) RunInTxn(ctx context.Context, fn func() error) (err error) {
if _, err = s.ExecuteSQL(ctx, "BEGIN"); err != nil {
return err
}

success := false
defer func() {
if !success {
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
_, err = s.ExecuteSQL(ctx, "ROLLBACK")
bb7133 marked this conversation as resolved.
Show resolved Hide resolved
terror.Log(err)
} else {
_, err = s.ExecuteSQL(ctx, "COMMIT")
terror.Log(err)
}
}()

err = fn()
success = err == nil
return err
}

// Close closed the session
func (s *Session) Close() {
if s.CloseFn != nil {
s.CloseFn()
s.Sctx = nil
s.SQLExec = nil
s.CloseFn = nil
}
}
55 changes: 55 additions & 0 deletions ttl/session_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ttl

import (
"context"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func TestSessionRunInTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(id int primary key, v int)")
se := &Session{
Sctx: tk.Session(),
SQLExec: tk.Session(),
}
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

require.NoError(t, se.RunInTxn(context.TODO(), func() error {
tk.MustExec("insert into t values (1, 10)")
return nil
}))
tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10"))

require.NoError(t, se.RunInTxn(context.TODO(), func() error {
tk.MustExec("insert into t values (2, 20)")
return errors.New("err")
}))
tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10"))

require.NoError(t, se.RunInTxn(context.TODO(), func() error {
tk.MustExec("insert into t values (3, 30)")
return nil
}))
tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10", "3 30"))
}
4 changes: 3 additions & 1 deletion ttl/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/pkg/errors"
)

const dateTimeFormat = "2006-01-02 15:04:05.999999"

func writeHex(in io.Writer, d types.Datum) error {
_, err := fmt.Fprintf(in, "x'%s'", hex.EncodeToString(d.GetBytes()))
return err
Expand Down Expand Up @@ -179,7 +181,7 @@ func (b *SQLBuilder) WriteExpireCondition(expire time.Time) error {
b.writeColNames([]*model.ColumnInfo{b.tbl.TimeColumn}, false)
b.restoreCtx.WritePlain(" < ")
b.restoreCtx.WritePlain("'")
b.restoreCtx.WritePlain(expire.Format("2006-01-02 15:04:05.999999"))
b.restoreCtx.WritePlain(expire.Format(dateTimeFormat))
b.restoreCtx.WritePlain("'")
b.hasWriteExpireCond = true
return nil
Expand Down
114 changes: 114 additions & 0 deletions ttl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,141 @@
package ttl

import (
"context"
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)

func getTableKeyColumns(tbl *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType, error) {
if tbl.PKIsHandle {
for i, col := range tbl.Columns {
if mysql.HasPriKeyFlag(col.GetFlag()) {
return []*model.ColumnInfo{tbl.Columns[i]}, []*types.FieldType{&tbl.Columns[i].FieldType}, nil
}
}
return nil, nil, errors.Errorf("Cannot find primary key for table: %s", tbl.Name)
}

if tbl.IsCommonHandle {
idxInfo := tables.FindPrimaryIndex(tbl)
columns := make([]*model.ColumnInfo, len(idxInfo.Columns))
fieldTypes := make([]*types.FieldType, len(idxInfo.Columns))
for i, idxCol := range idxInfo.Columns {
columns[i] = tbl.Columns[idxCol.Offset]
fieldTypes[i] = &tbl.Columns[idxCol.Offset].FieldType
}
return columns, fieldTypes, nil
}

extraHandleColInfo := model.NewExtraHandleColInfo()
return []*model.ColumnInfo{extraHandleColInfo}, []*types.FieldType{&extraHandleColInfo.FieldType}, nil
}

// PhysicalTable is used to provide some information for a physical table in TTL job
type PhysicalTable struct {
// Schema is the database name of the table
Schema model.CIStr
*model.TableInfo
// Partition is the partition name
Partition model.CIStr
// PartitionDef is the partition definition
PartitionDef *model.PartitionDefinition
// KeyColumns is the cluster index key columns for the table
KeyColumns []*model.ColumnInfo
// KeyColumnTypes is the types of the key columns
KeyColumnTypes []*types.FieldType
// TimeColum is the time column used for TTL
TimeColumn *model.ColumnInfo
}

// NewPhysicalTable create a new PhysicalTable
func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.CIStr) (*PhysicalTable, error) {
if tbl.State != model.StatePublic {
return nil, errors.Errorf("table '%s.%s' is not a public table", schema, tbl.Name)
}

ttlInfo := tbl.TTLInfo
if ttlInfo == nil {
return nil, errors.Errorf("table '%s.%s' is not a ttl table", schema, tbl.Name)
}

timeColumn := tbl.FindPublicColumnByName(ttlInfo.ColumnName.L)
if timeColumn == nil {
return nil, errors.Errorf("time column '%s' is not public in ttl table '%s.%s'", ttlInfo.ColumnName, schema, tbl.Name)
}

keyColumns, keyColumTypes, err := getTableKeyColumns(tbl)
if err != nil {
return nil, err
}

var partitionDef *model.PartitionDefinition
if tbl.Partition == nil {
if partition.L != "" {
return nil, errors.Errorf("table '%s.%s' is not a partitioned table", schema, tbl.Name)
}
} else {
if partition.L == "" {
return nil, errors.Errorf("partition name is required, table '%s.%s' is a partitioned table", schema, tbl.Name)
}

for i := range tbl.Partition.Definitions {
def := &tbl.Partition.Definitions[i]
if def.Name.L == partition.L {
partitionDef = def
}
}

if partitionDef == nil {
return nil, errors.Errorf("partition '%s' is not found in ttl table '%s.%s'", partition.O, schema, tbl.Name)
}
}

return &PhysicalTable{
Schema: schema,
TableInfo: tbl,
Partition: partition,
PartitionDef: partitionDef,
KeyColumns: keyColumns,
KeyColumnTypes: keyColumTypes,
TimeColumn: timeColumn,
}, nil
}

// ValidateKey validates a key
func (t *PhysicalTable) ValidateKey(key []types.Datum) error {
if len(t.KeyColumns) != len(key) {
return errors.Errorf("invalid key length: %d, expected %d", len(key), len(t.KeyColumns))
}
return nil
}

// EvalExpireTime returns the expired time
func (t *PhysicalTable) EvalExpireTime(ctx context.Context, se *Session, now time.Time) (expire time.Time, err error) {
tz := se.GetSessionVars().TimeZone

expireExpr := t.TTLInfo.IntervalExprStr
unit := ast.TimeUnitType(t.TTLInfo.IntervalTimeUnit)

var rows []chunk.Row
rows, err = se.ExecuteSQL(
ctx,
// FROM_UNIXTIME does not support negative value, so we use `FROM_UNIXTIME(0) + INTERVAL <current_ts>` to present current time
fmt.Sprintf("SELECT FROM_UNIXTIME(0) + INTERVAL %d SECOND - INTERVAL %s %s", now.Unix(), expireExpr, unit.String()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between SELECT FROM_UNIXTIME(0) + INTERVAL %d SECOND - INTERVAL %s %s and SELECT NOW() - INTERVAL %s %s?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is that the now is an argument of the function for easy to mock in unit test and SELECT NOW() ... is a hard code to use the real time

)

if err != nil {
return
}

tm := rows[0].GetTime(0)
return tm.CoreTime().GoTime(tz)
}
Loading