Skip to content

Commit

Permalink
br: add integration test for pitr (#47740) (#47906)
Browse files Browse the repository at this point in the history
ref #47738
  • Loading branch information
ti-chi-bot authored Oct 31, 2023
1 parent 06c954c commit df8d7c9
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 172 deletions.
2 changes: 1 addition & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2789,7 +2789,7 @@ func (rc *Client) InitSchemasReplaceForDDL(
dbReplace.TableMap[t.Info.ID] = &stream.TableReplace{
Name: newTableInfo.Name.O,
TableID: newTableInfo.ID,
PartitionMap: getTableIDMap(newTableInfo, t.Info),
PartitionMap: getPartitionIDMap(newTableInfo, t.Info),
IndexMap: getIndexIDMap(newTableInfo, t.Info),
}
}
Expand Down
10 changes: 8 additions & 2 deletions br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type AppliedFile interface {
GetEndKey() []byte
}

// getTableIDMap creates a map maping old tableID to new tableID.
func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
// getPartitionIDMap creates a map maping old physical ID to new physical ID.
func getPartitionIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
tableIDMap := make(map[int64]int64)

if oldTable.Partition != nil && newTable.Partition != nil {
Expand All @@ -60,6 +60,12 @@ func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
}
}

return tableIDMap
}

// getTableIDMap creates a map maping old tableID to new tableID.
func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
tableIDMap := getPartitionIDMap(newTable, oldTable)
tableIDMap[oldTable.ID] = newTable.ID
return tableIDMap
}
Expand Down
155 changes: 56 additions & 99 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/restore/ingestrec"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/tablecodec"
filter "github.com/pingcap/tidb/util/table-filter"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -685,14 +687,17 @@ func (sr *SchemasReplace) restoreFromHistory(job *model.Job, isSubJob bool) erro
if !job.IsCancelled() {
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
if job.State == model.JobStateRollbackDone {
return sr.deleteRange(job)
// AddJob would filter out the job state
if err := sr.ingestRecorder.AddJob(job, isSubJob); err != nil {
return err
}
err := sr.ingestRecorder.AddJob(job, isSubJob)
return errors.Trace(err)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn,
model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes:
return sr.deleteRange(job)
case model.ActionDropSchema, model.ActionDropTable,
model.ActionTruncateTable, model.ActionDropIndex,
model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition,
model.ActionDropColumn, model.ActionModifyColumn,
model.ActionReorganizePartition:
return sr.deleteRange(job)
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
Expand All @@ -708,10 +713,11 @@ func (sr *SchemasReplace) restoreFromHistory(job *model.Job, isSubJob bool) erro
}

func (sr *SchemasReplace) deleteRange(job *model.Job) error {
lctx := logutil.ContextWithField(context.Background(), logutil.RedactAny("category", "ddl: rewrite delete range"))
dbReplace, exist := sr.DbMap[job.SchemaID]
if !exist {
// skip this mddljob, the same below
log.Debug("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
logutil.CL(lctx).Warn("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
return nil
}

Expand Down Expand Up @@ -747,14 +753,14 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
newTableIDs := make([]int64, 0, len(tableIDs))
for tableID, tableReplace := range dbReplace.TableMap {
if _, exist := argsSet[tableID]; !exist {
log.Debug("DropSchema: record a table, but it doesn't exist in job args",
logutil.CL(lctx).Warn("DropSchema: record a table, but it doesn't exist in job args",
zap.Int64("oldTableID", tableID))
continue
}
newTableIDs = append(newTableIDs, tableReplace.TableID)
for partitionID, newPartitionID := range tableReplace.PartitionMap {
if _, exist := argsSet[partitionID]; !exist {
log.Debug("DropSchema: record a partition, but it doesn't exist in job args",
logutil.CL(lctx).Warn("DropSchema: record a partition, but it doesn't exist in job args",
zap.Int64("oldPartitionID", partitionID))
continue
}
Expand All @@ -763,7 +769,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
}

if len(newTableIDs) != len(tableIDs) {
log.Debug(
logutil.CL(lctx).Warn(
"DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace")
// only drop newTableIDs' ranges
}
Expand All @@ -777,7 +783,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropTable, model.ActionTruncateTable:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID",
logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
return nil
}
Expand All @@ -790,28 +796,33 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
return errors.Trace(err)
}
if len(physicalTableIDs) > 0 {
// delete partition id instead of table id
for i := 0; i < len(physicalTableIDs); i++ {
newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
// delete partition id
for _, oldPid := range physicalTableIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", physicalTableIDs[i]))
logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
continue
}
physicalTableIDs[i] = newPid
newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
}
if len(physicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)

// logical table may contain global index regions, so delete the logical table range.
newPhysicalTableIDs = append(newPhysicalTableIDs, tableReplace.TableID)
if len(newPhysicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
}

return nil
}

sr.insertDeleteRangeForTable(newJobID, []int64{tableReplace.TableID})
return nil
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
case model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionReorganizePartition:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug(
logutil.CL(lctx).Warn(
"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
return nil
Expand All @@ -821,26 +832,26 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
return errors.Trace(err)
}

for i := 0; i < len(physicalTableIDs); i++ {
newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
for _, oldPid := range physicalTableIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug(
logutil.CL(lctx).Warn(
"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", physicalTableIDs[i]))
zap.Int64("oldPartitionID", oldPid))
continue
}
physicalTableIDs[i] = newPid
newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
}
if len(physicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)
if len(newPhysicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
}
return nil
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
// iff job.State = model.JobStateRollbackDone
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID",
logutil.CL(lctx).Warn("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
return nil
}
Expand All @@ -851,14 +862,20 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
return errors.Trace(err)
}

tempIdxID := tablecodec.TempIndexPrefix | indexID
var elementID int64 = 1
indexIDs := []int64{indexID}
var indexIDs []int64
if job.State == model.JobStateRollbackDone {
indexIDs = []int64{indexID, tempIdxID}
} else {
indexIDs = []int64{tempIdxID}
}

if len(partitionIDs) > 0 {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug(
logutil.CL(lctx).Warn(
"AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
continue
Expand All @@ -873,7 +890,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -892,7 +909,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
// len(indexIDs) = 1
Expand All @@ -902,37 +919,6 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs)
}
return nil
case model.ActionDropIndexes: // // Deprecated, we use ActionMultiSchemaChange instead.
var indexIDs []int64
var partitionIDs []int64
if err := job.DecodeArgs(&[]model.CIStr{}, &[]bool{}, &indexIDs, &partitionIDs); err != nil {
return errors.Trace(err)
}
// Remove data in TiKV.
if len(indexIDs) == 0 {
return nil
}

tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

var elementID int64 = 1
if len(partitionIDs) > 0 {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
}
} else {
sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs)
}
return nil
case model.ActionDropColumn:
var colName model.CIStr
var ifExists bool
Expand All @@ -944,7 +930,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
if len(indexIDs) > 0 {
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -953,7 +939,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand All @@ -963,35 +949,6 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
}
}
return nil
case model.ActionDropColumns: // Deprecated, we use ActionMultiSchemaChange instead.
var colNames []model.CIStr
var ifExists []bool
var indexIDs []int64
var partitionIDs []int64
if err := job.DecodeArgs(&colNames, &ifExists, &indexIDs, &partitionIDs); err != nil {
return errors.Trace(err)
}
if len(indexIDs) > 0 {
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

var elementID int64 = 1
if len(partitionIDs) > 0 {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
}
} else {
sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs)
}
}
case model.ActionModifyColumn:
var indexIDs []int64
var partitionIDs []int64
Expand All @@ -1003,7 +960,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
}
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -1012,7 +969,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand Down
Loading

0 comments on commit df8d7c9

Please sign in to comment.