Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add integration test for pitr (#47740) #47907

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2242,7 +2242,7 @@ func (rc *Client) InitSchemasReplaceForDDL(
dbReplace.TableMap[t.Info.ID] = &stream.TableReplace{
OldTableInfo: t.Info,
NewTableID: newTableInfo.ID,
PartitionMap: getTableIDMap(newTableInfo, t.Info),
PartitionMap: getPartitionIDMap(newTableInfo, t.Info),
IndexMap: getIndexIDMap(newTableInfo, t.Info),
}
}
Expand Down
10 changes: 8 additions & 2 deletions br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type AppliedFile interface {
GetEndKey() []byte
}

// getTableIDMap creates a map maping old tableID to new tableID.
func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
// getPartitionIDMap creates a map maping old physical ID to new physical ID.
func getPartitionIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
tableIDMap := make(map[int64]int64)

if oldTable.Partition != nil && newTable.Partition != nil {
Expand All @@ -60,6 +60,12 @@ func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
}
}

return tableIDMap
}

// getTableIDMap creates a map maping old tableID to new tableID.
func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
tableIDMap := getPartitionIDMap(newTable, oldTable)
tableIDMap[oldTable.ID] = newTable.ID
return tableIDMap
}
Expand Down
96 changes: 57 additions & 39 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/tablecodec"
filter "github.com/pingcap/tidb/util/table-filter"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -566,13 +568,9 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err
func (sr *SchemasReplace) tryToGCJob(job *model.Job) error {
if !job.IsCancelled() {
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
if job.State == model.JobStateRollbackDone {
return sr.deleteRange(job)
}
return nil
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes:
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes,
model.ActionAddIndex, model.ActionAddPrimaryKey:
return sr.deleteRange(job)
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
Expand All @@ -587,10 +585,11 @@ func (sr *SchemasReplace) tryToGCJob(job *model.Job) error {
}

func (sr *SchemasReplace) deleteRange(job *model.Job) error {
lctx := logutil.ContextWithField(context.Background(), logutil.RedactAny("category", "ddl: rewrite delete range"))
dbReplace, exist := sr.DbMap[job.SchemaID]
if !exist {
// skip this mddljob, the same below
log.Debug("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
logutil.CL(lctx).Warn("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
return nil
}

Expand Down Expand Up @@ -626,21 +625,24 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
newTableIDs := make([]int64, 0, len(tableIDs))
for tableID, tableReplace := range dbReplace.TableMap {
if _, exist := argsSet[tableID]; !exist {
log.Debug("DropSchema: record a table, but it doesn't exist in job args", zap.Int64("oldTableID", tableID))
logutil.CL(lctx).Warn("DropSchema: record a table, but it doesn't exist in job args",
zap.Int64("oldTableID", tableID))
continue
}
newTableIDs = append(newTableIDs, tableReplace.NewTableID)
for partitionID, newPartitionID := range tableReplace.PartitionMap {
if _, exist := argsSet[partitionID]; !exist {
log.Debug("DropSchema: record a partition, but it doesn't exist in job args", zap.Int64("oldPartitionID", partitionID))
logutil.CL(lctx).Warn("DropSchema: record a partition, but it doesn't exist in job args",
zap.Int64("oldPartitionID", partitionID))
continue
}
newTableIDs = append(newTableIDs, newPartitionID)
}
}

if len(newTableIDs) != len(tableIDs) {
log.Debug("DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace")
logutil.CL(lctx).Warn(
"DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace")
// only drop newTableIDs' ranges
}

Expand All @@ -653,7 +655,8 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropTable, model.ActionTruncateTable:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -665,17 +668,19 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
return errors.Trace(err)
}
if len(physicalTableIDs) > 0 {
// delete partition id instead of table id
for i := 0; i < len(physicalTableIDs); i++ {
newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
// delete partition id
for _, oldPid := range physicalTableIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", physicalTableIDs[i]))
logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
continue
}
physicalTableIDs[i] = newPid
newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
}
if len(physicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)
if len(newPhysicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
}
return nil
}
Expand All @@ -685,32 +690,37 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn(
"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
return nil
}
var physicalTableIDs []int64
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
return errors.Trace(err)
}

for i := 0; i < len(physicalTableIDs); i++ {
newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
for _, oldPid := range physicalTableIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", physicalTableIDs[i]))
logutil.CL(lctx).Warn(
"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
continue
}
physicalTableIDs[i] = newPid
newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
}
if len(physicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)
if len(newPhysicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
}
return nil
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
// iff job.State = model.JobStateRollbackDone
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
return nil
}
var indexID int64
Expand All @@ -720,14 +730,22 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
return errors.Trace(err)
}

tempIdxID := tablecodec.TempIndexPrefix | indexID
var elementID int64 = 1
indexIDs := []int64{indexID}
var indexIDs []int64
if job.State == model.JobStateRollbackDone {
indexIDs = []int64{indexID, tempIdxID}
} else {
indexIDs = []int64{tempIdxID}
}

if len(partitionIDs) > 0 {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn(
"AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
continue
}

Expand All @@ -740,7 +758,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -759,7 +777,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
// len(indexIDs) = 1
Expand All @@ -782,7 +800,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {

tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -791,7 +809,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand All @@ -811,7 +829,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
if len(indexIDs) > 0 {
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -820,7 +838,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand All @@ -841,7 +859,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
if len(indexIDs) > 0 {
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -850,7 +868,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand All @@ -870,7 +888,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
}
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -879,7 +897,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand Down
36 changes: 30 additions & 6 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/tablecodec"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -435,8 +436,10 @@ var (
dropTable0Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)}
dropTable1Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)}
dropTable0Partition1Job = &model.Job{Type: model.ActionDropTablePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
rollBackTable0IndexJob = &model.Job{Type: model.ActionAddIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)}
rollBackTable1IndexJob = &model.Job{Type: model.ActionAddIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)}
rollBackTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)}
rollBackTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)}
addTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)}
addTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)}
dropTable0IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,2,[72,73,74]]`)}
dropTable1IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,2,[]]`)}
dropTable0IndexesJob = &model.Job{Type: model.ActionDropIndexes, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[72,73,74]]`)}
Expand Down Expand Up @@ -581,23 +584,44 @@ func TestDeleteRangeForMDDLJob(t *testing.T) {
require.Equal(t, targs.tableIDs[0], mDDLJobPartition1NewID)

// roll back add index for table0
err = schemaReplace.deleteRange(rollBackTable0IndexJob)
err = schemaReplace.tryToGCJob(rollBackTable0IndexJob)
require.NoError(t, err)
for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ {
iargs = <-midr.indexCh
_, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID]
require.True(t, exist)
require.Equal(t, len(iargs.indexIDs), 1)
require.Equal(t, len(iargs.indexIDs), 2)
require.Equal(t, iargs.indexIDs[0], int64(2))
require.Equal(t, iargs.indexIDs[1], int64(tablecodec.TempIndexPrefix|2))
}

// roll back add index for table1
err = schemaReplace.deleteRange(rollBackTable1IndexJob)
err = schemaReplace.tryToGCJob(rollBackTable1IndexJob)
require.NoError(t, err)
iargs = <-midr.indexCh
require.Equal(t, iargs.tableID, mDDLJobTable1NewID)
require.Equal(t, len(iargs.indexIDs), 1)
require.Equal(t, len(iargs.indexIDs), 2)
require.Equal(t, iargs.indexIDs[0], int64(2))
require.Equal(t, iargs.indexIDs[1], int64(tablecodec.TempIndexPrefix|2))

// add index for table 0
err = schemaReplace.tryToGCJob(addTable0IndexJob)
require.NoError(t, err)
for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ {
iargs = <-midr.indexCh
_, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID]
require.True(t, exist)
require.Equal(t, len(iargs.indexIDs), 1)
require.Equal(t, iargs.indexIDs[0], int64(tablecodec.TempIndexPrefix|2))
}

// add index for table 1
err = schemaReplace.tryToGCJob(addTable1IndexJob)
require.NoError(t, err)
iargs = <-midr.indexCh
require.Equal(t, iargs.tableID, mDDLJobTable1NewID)
require.Equal(t, len(iargs.indexIDs), 1)
require.Equal(t, iargs.indexIDs[0], int64(tablecodec.TempIndexPrefix|2))

// drop index for table0
err = schemaReplace.deleteRange(dropTable0IndexJob)
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/stream/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func TestDateFormat(t *testing.T) {
434605479096221697,
"2022-07-15 20:32:12.734 +0800",
},
{
434605478903808000,
"2022-07-15 20:32:12 +0800",
},
}

timeZone, _ := time.LoadLocation("Asia/Shanghai")
Expand Down
Loading