Skip to content

Commit

Permalink
add ddl job for ttlInfo
Browse files Browse the repository at this point in the history
Signed-off-by: YangKeao <[email protected]>
  • Loading branch information
YangKeao committed Nov 24, 2022
1 parent 84fa999 commit 85b0264
Show file tree
Hide file tree
Showing 18 changed files with 708 additions and 1 deletion.
2 changes: 2 additions & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_library(
"stat.go",
"table.go",
"table_lock.go",
"ttl.go",
],
importpath = "github.com/pingcap/tidb/ddl",
visibility = [
Expand Down Expand Up @@ -195,6 +196,7 @@ go_test(
"table_split_test.go",
"table_test.go",
"tiflash_replica_test.go",
"ttl_test.go",
],
embed = [":ddl"],
flaky = True,
Expand Down
18 changes: 18 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ func checkDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (*model.TableInfo,
if err = checkDropColumnWithForeignKeyConstraintInOwner(d, t, job, tblInfo, colName.L); err != nil {
return nil, nil, nil, false, errors.Trace(err)
}
if err = checkDropColumnWithTTLConfig(tblInfo, colName.L); err != nil {
return nil, nil, nil, false, errors.Trace(err)
}
idxInfos := listIndicesWithColumn(colName.L, tblInfo.Indices)
return tblInfo, colInfo, idxInfos, false, nil
}
Expand Down Expand Up @@ -858,6 +861,9 @@ func adjustTableInfoAfterModifyColumnWithData(tblInfo *model.TableInfo, pos *ast
indexesToRemove := filterIndexesToRemove(changingIdxs, newName, tblInfo)
replaceOldIndexes(tblInfo, indexesToRemove)
}
if tblInfo.TTLInfo != nil {
updateTTLInfoWhenModifyColumn(tblInfo, oldCol.Name, changingCol.Name)
}
// Move the new column to a correct offset.
destOffset, err := LocateOffsetToMove(changingCol.Offset, pos, tblInfo)
if err != nil {
Expand Down Expand Up @@ -932,6 +938,17 @@ func updateFKInfoWhenModifyColumn(tblInfo *model.TableInfo, oldCol, newCol model
}
}

func updateTTLInfoWhenModifyColumn(tblInfo *model.TableInfo, oldCol, newCol model.CIStr) {
if oldCol.L == newCol.L {
return
}
if tblInfo.TTLInfo != nil {
if tblInfo.TTLInfo.ColumnName.L == oldCol.L {
tblInfo.TTLInfo.ColumnName = newCol
}
}
}

// filterIndexesToRemove filters out the indexes that can be removed.
func filterIndexesToRemove(changingIdxs []*model.IndexInfo, colName model.CIStr, tblInfo *model.TableInfo) []*model.IndexInfo {
indexesToRemove := make([]*model.IndexInfo, 0, len(changingIdxs))
Expand Down Expand Up @@ -1474,6 +1491,7 @@ func adjustTableInfoAfterModifyColumn(
tblInfo.MoveColumnInfo(oldCol.Offset, destOffset)
updateNewIdxColsNameOffset(tblInfo.Indices, oldCol.Name, newCol)
updateFKInfoWhenModifyColumn(tblInfo, oldCol.Name, newCol.Name)
updateTTLInfoWhenModifyColumn(tblInfo, oldCol.Name, newCol.Name)
return nil
}

Expand Down
150 changes: 150 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2115,6 +2115,11 @@ func checkTableInfoValidWithStmt(ctx sessionctx.Context, tbInfo *model.TableInfo
}
}
}
if tbInfo.TTLInfo != nil {
if err := checkTTLInfoValid(ctx, tbInfo); err != nil {
return errors.Trace(err)
}
}

return nil
}
Expand Down Expand Up @@ -2193,6 +2198,10 @@ func BuildTableInfoWithLike(ctx sessionctx.Context, ident ast.Ident, referTblInf
copy(pi.Definitions, referTblInfo.Partition.Definitions)
tblInfo.Partition = &pi
}

if referTblInfo.TTLInfo != nil {
tblInfo.TTLInfo = referTblInfo.TTLInfo.Clone()
}
return &tblInfo, nil
}

Expand Down Expand Up @@ -3000,6 +3009,8 @@ func SetDirectPlacementOpt(placementSettings *model.PlacementSettings, placement

// handleTableOptions updates tableInfo according to table options.
func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) error {
var handledTTLOrTTLEnable bool

for _, op := range options {
switch op.Tp {
case ast.TableOptionAutoIncrement:
Expand Down Expand Up @@ -3036,6 +3047,23 @@ func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) err
tbInfo.PlacementPolicyRef = &model.PolicyRefInfo{
Name: model.NewCIStr(op.StrValue),
}
case ast.TableOptionTTL, ast.TableOptionTTLEnable:
if handledTTLOrTTLEnable {
continue
}

ttlInfo, ttlEnable, err := getTTLInfoInOptions(options)
if err != nil {
return err
}
// It's impossible that `ttlInfo` and `ttlEnable` are all nil, because we have met this option.
// After exclude the situation `ttlInfo == nil && ttlEnable != nil`, we could say `ttlInfo != nil`
if ttlInfo == nil && ttlEnable != nil {
return errors.Trace(dbterror.ErrSetTTLEnableForNonTTLTable)
}

tbInfo.TTLInfo = ttlInfo
handledTTLOrTTLEnable = true
}
}
shardingBits := shardingBits(tbInfo)
Expand Down Expand Up @@ -3227,6 +3255,7 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
}
for _, spec := range validSpecs {
var handledCharsetOrCollate bool
var handledTTLOrTTLEnable bool
switch spec.Tp {
case ast.AlterTableAddColumns:
err = d.AddColumn(sctx, ident, spec)
Expand Down Expand Up @@ -3363,6 +3392,20 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
Name: model.NewCIStr(opt.StrValue),
}
case ast.TableOptionEngine:
case ast.TableOptionTTL, ast.TableOptionTTLEnable:
var ttlInfo *model.TTLInfo
var ttlEnable *bool

if handledTTLOrTTLEnable {
continue
}
ttlInfo, ttlEnable, err = getTTLInfoInOptions(spec.Options)
if err != nil {
return err
}
err = d.AlterTableTTLInfoOrEnable(sctx, ident, ttlInfo, ttlEnable)

handledTTLOrTTLEnable = true
default:
err = dbterror.ErrUnsupportedAlterTableOption
}
Expand Down Expand Up @@ -3406,6 +3449,9 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
case ast.AlterTableDisableKeys, ast.AlterTableEnableKeys:
// Nothing to do now, see https://github.com/pingcap/tidb/issues/1051
// MyISAM specific
case ast.AlterTableRemoveTTL:
// the parser makes sure we have only one `ast.AlterTableRemoveTTL` in an alter statement
err = d.AlterTableRemoveTTL(sctx, ident)
default:
err = errors.Trace(dbterror.ErrUnsupportedAlterTableSpec)
}
Expand Down Expand Up @@ -4238,6 +4284,11 @@ func checkIsDroppableColumn(ctx sessionctx.Context, is infoschema.InfoSchema, sc
if err != nil {
return false, errors.Trace(err)
}
// Check the column with TTL config
err = checkDropColumnWithTTLConfig(tblInfo, colName.L)
if err != nil {
return false, errors.Trace(err)
}
// We don't support dropping column with PK handle covered now.
if col.IsPKHandleColumn(tblInfo) {
return false, dbterror.ErrUnsupportedPKHandle
Expand Down Expand Up @@ -4724,6 +4775,13 @@ func GetModifiableColumnJob(
return nil, errors.Trace(err)
}

if t.Meta().TTLInfo != nil {
// the column referenced by TTL should be a time type
if t.Meta().TTLInfo.ColumnName.L == originalColName.L && !types.IsTypeTime(newCol.ColumnInfo.FieldType.GetType()) {
return nil, errors.Trace(dbterror.ErrUnsupportedColumnInTTLConfig.GenWithStackByArgs(newCol.ColumnInfo.Name.O))
}
}

var newAutoRandBits uint64
if newAutoRandBits, err = checkAutoRandom(t.Meta(), col, specNewColumn); err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -5262,6 +5320,98 @@ func (d *ddl) AlterTableSetTiFlashReplica(ctx sessionctx.Context, ident ast.Iden
return errors.Trace(err)
}

// AlterTableTTLInfoOrEnable submit ddl job to change table info according to the ttlInfo, or ttlEnable
// at least one of the `ttlInfo` or `ttlEnable` should be not nil.
// When `ttlInfo` is nil, and `ttlEnable` is not, it will use the original `.TTLInfo` in the table info and modify the
// `.Enable`. If the `.TTLInfo` in the table info is empty, this function will return an error.
// When `ttlInfo` is not nil, it simply submits the job with the `ttlInfo` and ignore the `ttlEnable`.
func (d *ddl) AlterTableTTLInfoOrEnable(ctx sessionctx.Context, ident ast.Ident, ttlInfo *model.TTLInfo, ttlEnable *bool) error {
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}

tb, err := is.TableByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name))
}

tblInfo := tb.Meta().Clone()
tableID := tblInfo.ID
tableName := tblInfo.Name.L

var job *model.Job
if ttlInfo != nil {
tblInfo.TTLInfo = ttlInfo
err = checkTTLInfoValid(ctx, tblInfo)
if err != nil {
return err
}
job = &model.Job{
SchemaID: schema.ID,
TableID: tableID,
SchemaName: schema.Name.L,
TableName: tableName,
Type: model.ActionAlterTTLInfo,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{ttlInfo, ttlEnable},
}
} else {
if tblInfo.TTLInfo == nil {
return errors.Trace(dbterror.ErrSetTTLEnableForNonTTLTable)
}

job = &model.Job{
SchemaID: schema.ID,
TableID: tableID,
SchemaName: schema.Name.L,
TableName: tableName,
Type: model.ActionAlterTTLInfo,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{ttlInfo, ttlEnable},
}
}

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}

func (d *ddl) AlterTableRemoveTTL(ctx sessionctx.Context, ident ast.Ident) error {
is := d.infoCache.GetLatest()

schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}

tb, err := is.TableByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name))
}

tblInfo := tb.Meta().Clone()
tableID := tblInfo.ID
tableName := tblInfo.Name.L

if tblInfo.TTLInfo != nil {
job := &model.Job{
SchemaID: schema.ID,
TableID: tableID,
SchemaName: schema.Name.L,
TableName: tableName,
Type: model.ActionAlterTTLRemove,
BinlogInfo: &model.HistoryInfo{},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}

return nil
}

func isTableTiFlashSupported(schema *model.DBInfo, tb table.Table) error {
// Memory tables and system tables are not supported by TiFlash
if util.IsMemOrSysDB(schema.Name.L) {
Expand Down
4 changes: 4 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,10 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = w.onFlashbackCluster(d, t, job)
case model.ActionMultiSchemaChange:
ver, err = onMultiSchemaChange(w, d, t, job)
case model.ActionAlterTTLInfo:
ver, err = onTTLInfoChange(d, t, job)
case model.ActionAlterTTLRemove:
ver, err = onTTLInfoRemove(d, t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down
81 changes: 81 additions & 0 deletions ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -371,3 +373,82 @@ func TestCreateTables(t *testing.T) {
testGetTable(t, domain, genIDs[1])
testGetTable(t, domain, genIDs[2])
}

func TestAlterTTL(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)

d := domain.DDL()

dbInfo, err := testSchemaInfo(store, "test_table")
require.NoError(t, err)
testCreateSchema(t, testkit.NewTestKit(t, store).Session(), d, dbInfo)

ctx := testkit.NewTestKit(t, store).Session()

// initialize a table with ttlInfo
tableName := "t"
tblInfo, err := testTableInfo(store, tableName, 2)
require.NoError(t, err)
tblInfo.Columns[0].FieldType = *types.NewFieldType(mysql.TypeDatetime)
tblInfo.Columns[1].FieldType = *types.NewFieldType(mysql.TypeDatetime)
tblInfo.TTLInfo = &model.TTLInfo{
ColumnName: tblInfo.Columns[0].Name,
IntervalExprStr: "5",
IntervalTimeUnit: int(ast.TimeUnitDay),
}

// create table
job := testCreateTable(t, ctx, d, dbInfo, tblInfo)
testCheckTableState(t, store, dbInfo, tblInfo, model.StatePublic)
testCheckJobDone(t, store, job.ID, true)

// submit ddl job to modify ttlInfo
tableInfoAfterAlterTTLInfo := tblInfo.Clone()
require.NoError(t, err)
tableInfoAfterAlterTTLInfo.TTLInfo = &model.TTLInfo{
ColumnName: tblInfo.Columns[1].Name,
IntervalExprStr: "1",
IntervalTimeUnit: int(ast.TimeUnitYear),
}

job = &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAlterTTLInfo,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{&model.TTLInfo{
ColumnName: tblInfo.Columns[1].Name,
IntervalExprStr: "1",
IntervalTimeUnit: int(ast.TimeUnitYear),
}},
}
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.DoDDLJob(ctx, job))

v := getSchemaVer(t, ctx)
checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: nil})

// assert the ddlInfo as expected
historyJob, err := ddl.GetHistoryJobByID(testkit.NewTestKit(t, store).Session(), job.ID)
require.NoError(t, err)
require.Equal(t, tableInfoAfterAlterTTLInfo.TTLInfo, historyJob.BinlogInfo.TableInfo.TTLInfo)

// submit a ddl job to modify ttlEnabled
job = &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAlterTTLRemove,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{true},
}
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.DoDDLJob(ctx, job))

v = getSchemaVer(t, ctx)
checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: nil})

// assert the ddlInfo as expected
historyJob, err = ddl.GetHistoryJobByID(testkit.NewTestKit(t, store).Session(), job.ID)
require.NoError(t, err)
require.Empty(t, historyJob.BinlogInfo.TableInfo.TTLInfo)
}
Loading

0 comments on commit 85b0264

Please sign in to comment.