diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index 487c91b717108..c49a79081de31 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//br/pkg/storage", "//br/pkg/summary", "//br/pkg/utils", + "//br/pkg/version", "//ddl", "//distsql", "//kv", diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index ab52aac9ab692..b15255b41733d 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/kv" @@ -537,6 +538,15 @@ func BuildBackupRangeAndInitSchema( hasTable := false err = m.IterTables(dbInfo.ID, func(tableInfo *model.TableInfo) error { + if tableInfo.Version > version.CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION { + // normally this shouldn't happen in a production env. + // because we had a unit test to avoid table info version update silencly. + // and had version check before run backup. + return errors.Errorf("backup doesn't not support table %s with version %d, maybe try a new version of br", + tableInfo.Name.String(), + tableInfo.Version, + ) + } if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) { // Skip tables other than the given table. return nil diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 4b21309dc439e..86d23f8be9d4c 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -519,6 +519,20 @@ func (sr *SchemasReplace) rewriteEntryForTable(e *kv.Entry, cf string) (*kv.Entr return &kv.Entry{Key: newKey, Value: result.NewValue}, nil } +func (sr *SchemasReplace) rewriteEntryForAutoIncrementIDKey(e *kv.Entry, cf string) (*kv.Entry, error) { + newKey, err := sr.rewriteKeyForTable( + e.Key, + cf, + meta.ParseAutoIncrementIDKey, + meta.AutoIncrementIDKey, + ) + if err != nil { + return nil, errors.Trace(err) + } + + return &kv.Entry{Key: newKey, Value: e.Value}, nil +} + func (sr *SchemasReplace) rewriteEntryForAutoTableIDKey(e *kv.Entry, cf string) (*kv.Entry, error) { newKey, err := sr.rewriteKeyForTable( e.Key, @@ -652,6 +666,8 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err } if meta.IsTableKey(rawKey.Field) { return sr.rewriteEntryForTable(e, cf) + } else if meta.IsAutoIncrementIDKey(rawKey.Field) { + return sr.rewriteEntryForAutoIncrementIDKey(e, cf) } else if meta.IsAutoTableIDKey(rawKey.Field) { return sr.rewriteEntryForAutoTableIDKey(e, cf) } else if meta.IsSequenceKey(rawKey.Field) { diff --git a/br/pkg/stream/rewrite_meta_rawkv_test.go b/br/pkg/stream/rewrite_meta_rawkv_test.go index 790a73fe4d90e..fe8045c2a1d59 100644 --- a/br/pkg/stream/rewrite_meta_rawkv_test.go +++ b/br/pkg/stream/rewrite_meta_rawkv_test.go @@ -208,49 +208,76 @@ func TestRewriteKeyForTable(t *testing.T) { tableID int64 = 57 ts uint64 = 400036290571534337 ) - encodedKey := encodeTxnMetaKey(meta.DBkey(dbID), meta.TableKey(tableID), ts) - - // create schemasReplace. - sr := MockEmptySchemasReplace(nil) - - // set preConstruct status and construct map information. - sr.SetPreConstructMapStatus() - newKey, err := sr.rewriteKeyForTable(encodedKey, WriteCF, meta.ParseTableKey, meta.TableKey) - require.Nil(t, err) - require.Nil(t, newKey) - require.Equal(t, len(sr.DbMap), 1) - require.Equal(t, len(sr.DbMap[dbID].TableMap), 1) - downStreamDbID := sr.DbMap[dbID].DbID - downStreamTblID := sr.DbMap[dbID].TableMap[tableID].TableID - - // set restoreKV status and rewrite it. - sr.SetRestoreKVStatus() - newKey, err = sr.rewriteKeyForTable(encodedKey, DefaultCF, meta.ParseTableKey, meta.TableKey) - require.Nil(t, err) - decodedKey, err := ParseTxnMetaKeyFrom(newKey) - require.Nil(t, err) - require.Equal(t, decodedKey.Ts, ts) - - newDbID, err := meta.ParseDBKey(decodedKey.Key) - require.Nil(t, err) - require.Equal(t, newDbID, downStreamDbID) - newTblID, err := meta.ParseTableKey(decodedKey.Field) - require.Nil(t, err) - require.Equal(t, newTblID, downStreamTblID) - - // rewrite it again, and get the same result. - newKey, err = sr.rewriteKeyForTable(encodedKey, WriteCF, meta.ParseTableKey, meta.TableKey) - require.Nil(t, err) - decodedKey, err = ParseTxnMetaKeyFrom(newKey) - require.Nil(t, err) - require.Equal(t, decodedKey.Ts, sr.RewriteTS) + cases := []struct { + encodeTableFn func(int64) []byte + decodeTableFn func([]byte) (int64, error) + }{ + { + meta.TableKey, + meta.ParseTableKey, + }, + { + meta.AutoIncrementIDKey, + meta.ParseAutoIncrementIDKey, + }, + { + meta.AutoTableIDKey, + meta.ParseAutoTableIDKey, + }, + { + meta.AutoRandomTableIDKey, + meta.ParseAutoRandomTableIDKey, + }, + { + meta.SequenceKey, + meta.ParseSequenceKey, + }, + } - newDbID, err = meta.ParseDBKey(decodedKey.Key) - require.Nil(t, err) - require.Equal(t, newDbID, downStreamDbID) - newTblID, err = meta.ParseTableKey(decodedKey.Field) - require.Nil(t, err) - require.Equal(t, newTblID, downStreamTblID) + for _, ca := range cases { + encodedKey := encodeTxnMetaKey(meta.DBkey(dbID), ca.encodeTableFn(tableID), ts) + // create schemasReplace. + sr := MockEmptySchemasReplace(nil) + + // set preConstruct status and construct map information. + sr.SetPreConstructMapStatus() + newKey, err := sr.rewriteKeyForTable(encodedKey, WriteCF, ca.decodeTableFn, ca.encodeTableFn) + require.Nil(t, err) + require.Nil(t, newKey) + require.Equal(t, len(sr.DbMap), 1) + require.Equal(t, len(sr.DbMap[dbID].TableMap), 1) + downStreamDbID := sr.DbMap[dbID].DbID + downStreamTblID := sr.DbMap[dbID].TableMap[tableID].TableID + + // set restoreKV status and rewrite it. + sr.SetRestoreKVStatus() + newKey, err = sr.rewriteKeyForTable(encodedKey, DefaultCF, ca.decodeTableFn, ca.encodeTableFn) + require.Nil(t, err) + decodedKey, err := ParseTxnMetaKeyFrom(newKey) + require.Nil(t, err) + require.Equal(t, decodedKey.Ts, ts) + + newDbID, err := meta.ParseDBKey(decodedKey.Key) + require.Nil(t, err) + require.Equal(t, newDbID, downStreamDbID) + newTblID, err := ca.decodeTableFn(decodedKey.Field) + require.Nil(t, err) + require.Equal(t, newTblID, downStreamTblID) + + // rewrite it again, and get the same result. + newKey, err = sr.rewriteKeyForTable(encodedKey, WriteCF, ca.decodeTableFn, ca.encodeTableFn) + require.Nil(t, err) + decodedKey, err = ParseTxnMetaKeyFrom(newKey) + require.Nil(t, err) + require.Equal(t, decodedKey.Ts, sr.RewriteTS) + + newDbID, err = meta.ParseDBKey(decodedKey.Key) + require.Nil(t, err) + require.Equal(t, newDbID, downStreamDbID) + newTblID, err = ca.decodeTableFn(decodedKey.Field) + require.Nil(t, err) + require.Equal(t, newTblID, downStreamTblID) + } } func TestRewriteTableInfo(t *testing.T) { diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 0436fa506b0b1..4f5466b8e5bd5 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" filter "github.com/pingcap/tidb/util/table-filter" "github.com/spf13/cobra" @@ -103,6 +104,12 @@ const ( flagFullBackupType = "type" ) +const ( + // Once TableInfoVersion updated. BR need to check compatibility with + // new TableInfoVersion. both snapshot restore and pitr need to be checked. + CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5 +) + // FullBackupType type when doing full backup or restore type FullBackupType string diff --git a/br/pkg/version/BUILD.bazel b/br/pkg/version/BUILD.bazel index 7262e5ea3d854..a984fd3870020 100644 --- a/br/pkg/version/BUILD.bazel +++ b/br/pkg/version/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//br/pkg/logutil", "//br/pkg/utils", "//br/pkg/version/build", + "//parser/model", "//util/engine", "@com_github_coreos_go_semver//semver", "@com_github_pingcap_errors//:errors", @@ -26,9 +27,10 @@ go_test( srcs = ["version_test.go"], embed = [":version"], flaky = True, - shard_count = 9, + shard_count = 10, deps = [ "//br/pkg/version/build", + "//parser/model", "@com_github_coreos_go_semver//semver", "@com_github_data_dog_go_sqlmock//:go-sqlmock", "@com_github_pingcap_kvproto//pkg/metapb", diff --git a/br/pkg/version/version.go b/br/pkg/version/version.go index 7bd629fb2c540..1d251f03f2e76 100644 --- a/br/pkg/version/version.go +++ b/br/pkg/version/version.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version/build" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util/engine" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -35,6 +36,10 @@ var ( checkpointSupportError error = nil // pitrSupportBatchKVFiles specifies whether TiKV-server supports batch PITR. pitrSupportBatchKVFiles bool = false + + // Once TableInfoVersion updated. BR need to check compatibility with + // new TableInfoVersion. both snapshot restore and pitr need to be checked. + CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5 ) // NextMajorVersion returns the next major version. diff --git a/br/pkg/version/version_test.go b/br/pkg/version/version_test.go index 927eeee119d5b..5a2d0d40e8ecd 100644 --- a/br/pkg/version/version_test.go +++ b/br/pkg/version/version_test.go @@ -13,6 +13,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/version/build" + "github.com/pingcap/tidb/parser/model" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" ) @@ -622,3 +623,11 @@ Check Table Before Drop: false` require.NoError(t, err) require.Equal(t, "5.7.25", versionStr) } + +func TestEnsureSupportVersion(t *testing.T) { + // Once this test failed. please check the compatibility carefully. + // *** Don't change this test simply. *** + require.Equal(t, + CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION, + model.CurrLatestTableInfoVersion) +} diff --git a/meta/meta.go b/meta/meta.go index 6aee783cbf192..5f58731f26fa0 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -313,9 +313,30 @@ func ParseAutoTableIDKey(key []byte) (int64, error) { } func (*Meta) autoIncrementIDKey(tableID int64) []byte { + return AutoIncrementIDKey(tableID) +} + +// AutoIncrementIDKey decodes the auto inc table key. +func AutoIncrementIDKey(tableID int64) []byte { return []byte(fmt.Sprintf("%s:%d", mIncIDPrefix, tableID)) } +// IsAutoIncrementIDKey checks whether the key is auto increment key. +func IsAutoIncrementIDKey(key []byte) bool { + return strings.HasPrefix(string(key), mIncIDPrefix+":") +} + +// ParseAutoIncrementIDKey decodes the tableID from the auto tableID key. +func ParseAutoIncrementIDKey(key []byte) (int64, error) { + if !IsAutoIncrementIDKey(key) { + return 0, ErrInvalidString.GenWithStack("fail to parse autoIncrementKey") + } + + tableID := strings.TrimPrefix(string(key), mIncIDPrefix+":") + id, err := strconv.Atoi(tableID) + return int64(id), err +} + func (*Meta) autoRandomTableIDKey(tableID int64) []byte { return AutoRandomTableIDKey(tableID) }