Skip to content

Commit

Permalink
log-backup: support the ddl(exchange partition) when PiTR (#37050)
Browse files Browse the repository at this point in the history
close #34742
  • Loading branch information
joccau authored Aug 25, 2022
1 parent f5d3df3 commit 87a6106
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 27 deletions.
68 changes: 50 additions & 18 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type DBReplace struct {

type SchemasReplace struct {
DbMap map[OldID]*DBReplace
RewriteTS uint64
TableFilter filter.Filter
globalTableIdMap map[OldID]NewID
RewriteTS uint64 // used to rewrite commit ts in meta kv.
TableFilter filter.Filter // used to filter schema/table
genGenGlobalID func(ctx context.Context) (int64, error)
genGenGlobalIDs func(ctx context.Context, n int) ([]int64, error)
insertDeleteRangeForTable func(jobID int64, tableIDs []int64)
Expand Down Expand Up @@ -92,8 +93,19 @@ func NewSchemasReplace(
insertDeleteRangeForTable func(jobID int64, tableIDs []int64),
insertDeleteRangeForIndex func(jobID int64, elementID *int64, tableID int64, indexIDs []int64),
) *SchemasReplace {
globalTableIdMap := make(map[OldID]NewID)
for _, dr := range dbMap {
for tblID, tr := range dr.TableMap {
globalTableIdMap[tblID] = tr.NewTableID
for oldpID, newpID := range tr.PartitionMap {
globalTableIdMap[oldpID] = newpID
}
}
}

return &SchemasReplace{
DbMap: dbMap,
globalTableIdMap: globalTableIdMap,
RewriteTS: restoreTS,
TableFilter: tableFilter,
genGenGlobalID: genID,
Expand Down Expand Up @@ -199,6 +211,11 @@ func (sr *SchemasReplace) rewriteKeyForTable(
parseField func([]byte) (tableID int64, err error),
encodeField func(tableID int64) []byte,
) ([]byte, bool, error) {
var (
err error
newID int64
exist bool
)
rawMetaKey, err := ParseTxnMetaKeyFrom(key)
if err != nil {
return nil, false, errors.Trace(err)
Expand All @@ -216,7 +233,7 @@ func (sr *SchemasReplace) rewriteKeyForTable(

dbReplace, exist := sr.DbMap[dbID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -226,9 +243,13 @@ func (sr *SchemasReplace) rewriteKeyForTable(

tableReplace, exist := dbReplace.TableMap[tableID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
newID, exist = sr.globalTableIdMap[tableID]
if !exist {
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
sr.globalTableIdMap[tableID] = newID
}
tableReplace = NewTableReplace(nil, newID)
dbReplace.TableMap[tableID] = tableReplace
Expand All @@ -243,15 +264,20 @@ func (sr *SchemasReplace) rewriteKeyForTable(
}

func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, bool, error) {
var tableInfo model.TableInfo
var (
tableInfo model.TableInfo
err error
newID int64
exist bool
)
if err := json.Unmarshal(value, &tableInfo); err != nil {
return nil, false, errors.Trace(err)
}

// update table ID
dbReplace, exist := sr.DbMap[dbID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -261,10 +287,15 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, bo

tableReplace, exist := dbReplace.TableMap[tableInfo.ID]
if !exist {
newID, err := sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
newID, exist = sr.globalTableIdMap[tableInfo.ID]
if !exist {
newID, err = sr.genGenGlobalID(context.TODO())
if err != nil {
return nil, false, errors.Trace(err)
}
sr.globalTableIdMap[tableInfo.ID] = newID
}

tableReplace = NewTableReplace(&tableInfo, newID)
dbReplace.TableMap[tableInfo.ID] = tableReplace
} else {
Expand All @@ -287,12 +318,15 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, bo
partitions := newTableInfo.GetPartitionInfo()
if partitions != nil {
for i, tbl := range partitions.Definitions {
newID, exist := tableReplace.PartitionMap[tbl.ID]
newID, exist = tableReplace.PartitionMap[tbl.ID]
if !exist {
var err error
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
newID, exist = sr.globalTableIdMap[tbl.ID]
if !exist {
newID, err = sr.genGenGlobalID(context.Background())
if err != nil {
return nil, false, errors.Trace(err)
}
sr.globalTableIdMap[tbl.ID] = newID
}
tableReplace.PartitionMap[tbl.ID] = newID
}
Expand Down Expand Up @@ -470,8 +504,6 @@ func (sr *SchemasReplace) tryToGCJob(job *model.Job) error {
return err
}
}
case model.ActionExchangeTablePartition:
return errors.Errorf("restore of ddl `exchange-table-partition` is not supported")
}
}
return nil
Expand Down
110 changes: 101 additions & 9 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,6 @@ func mockGenGenGlobalID(ctx context.Context) (int64, error) {
return increaseID, nil
}

func ProduceValue(tableName string, dbID int64) ([]byte, error) {
tableInfo := model.TableInfo{
ID: dbID,
Name: model.NewCIStr(tableName),
}

return json.Marshal(tableInfo)
}

func MockEmptySchemasReplace(midr *mockInsertDeleteRange) *SchemasReplace {
dbMap := make(map[OldID]*DBReplace)
if midr == nil {
Expand Down Expand Up @@ -211,6 +202,107 @@ func TestRewriteValueForPartitionTable(t *testing.T) {
require.Equal(t, tableInfo.Partition.Definitions[1].ID, newID2)
}

func TestRewriteValueForExchangePartition(t *testing.T) {
var (
dbID1 int64 = 100
tableID1 int64 = 101
pt1ID int64 = 102
pt2ID int64 = 103
tableName1 = "t1"
pt1Name = "pt1"
pt2Name = "pt2"

dbID2 int64 = 105
tableID2 int64 = 106
tableName2 = "t2"
tableInfo model.TableInfo
)

// construct the partition table t1
pt1 := model.PartitionDefinition{
ID: pt1ID,
Name: model.NewCIStr(pt1Name),
}
pt2 := model.PartitionDefinition{
ID: pt2ID,
Name: model.NewCIStr(pt2Name),
}

pi := model.PartitionInfo{
Enable: true,
Definitions: make([]model.PartitionDefinition, 0),
}
pi.Definitions = append(pi.Definitions, pt1, pt2)
t1 := model.TableInfo{
ID: tableID1,
Name: model.NewCIStr(tableName1),
Partition: &pi,
}
db1 := model.DBInfo{
ID: dbID1,
}

// construct the no partition table t2
t2 := model.TableInfo{
ID: tableID2,
Name: model.NewCIStr(tableName2),
}
db2 := model.DBInfo{
ID: dbID2,
}

// construct the SchemaReplace
dbMap := make(map[OldID]*DBReplace)
dbMap[dbID1] = NewDBReplace(&db1, dbID1+100)
dbMap[dbID1].TableMap[tableID1] = NewTableReplace(&t1, tableID1+100)
dbMap[dbID1].TableMap[tableID1].PartitionMap[pt1ID] = pt1ID + 100
dbMap[dbID1].TableMap[tableID1].PartitionMap[pt2ID] = pt2ID + 100

dbMap[dbID2] = NewDBReplace(&db2, dbID2+100)
dbMap[dbID2].TableMap[tableID2] = NewTableReplace(&t2, tableID2+100)

sr := NewSchemasReplace(
dbMap,
0,
filter.All(),
mockGenGenGlobalID,
nil,
nil,
nil,
)
require.Equal(t, len(sr.globalTableIdMap), 4)

//exchange parition, t1 parition0 with the t2
t1Copy := t1.Clone()
t1Copy.Partition = t1.Partition.Clone()
t2Copy := t2.Clone()

t1Copy.Partition.Definitions[0].ID = tableID2
t2Copy.ID = pt1ID

// rewrite partition table
value, err := json.Marshal(&t1Copy)
require.Nil(t, err)
value, needRewrite, err := sr.rewriteTableInfo(value, dbID1)
require.Nil(t, err)
require.True(t, needRewrite)
err = json.Unmarshal(value, &tableInfo)
require.Nil(t, err)
require.Equal(t, tableInfo.ID, tableID1+100)
require.Equal(t, tableInfo.Partition.Definitions[0].ID, tableID2+100)
require.Equal(t, tableInfo.Partition.Definitions[1].ID, pt2ID+100)

// rewrite no partition table
value, err = json.Marshal(&t2Copy)
require.Nil(t, err)
value, needRewrite, err = sr.rewriteTableInfo(value, dbID2)
require.Nil(t, err)
require.True(t, needRewrite)
err = json.Unmarshal(value, &tableInfo)
require.Nil(t, err)
require.Equal(t, tableInfo.ID, pt1ID+100)
}

// db:70->80 -
// | - t0:71->81 -
// | | - p0:72->82
Expand Down

0 comments on commit 87a6106

Please sign in to comment.