From 2fed7dbb4de1b61f5df3133ff57c86e2ec1a6370 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 4 Sep 2023 19:06:44 +0800 Subject: [PATCH] This is an automated cherry-pick of #46521 Signed-off-by: ti-chi-bot --- br/pkg/backup/BUILD.bazel | 1 + br/pkg/backup/client.go | 40 ++++++++++++ br/pkg/stream/rewrite_meta_rawkv.go | 29 +++++++++ br/pkg/stream/rewrite_meta_rawkv_test.go | 82 ++++++++++++++++++++++++ br/pkg/task/common.go | 7 ++ br/pkg/version/BUILD.bazel | 11 ++++ br/pkg/version/version.go | 8 +++ br/pkg/version/version_test.go | 12 ++++ meta/meta.go | 21 ++++++ 9 files changed, 211 insertions(+) diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index 65ff4288987a1..a80396f4fddd2 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//br/pkg/storage", "//br/pkg/summary", "//br/pkg/utils", + "//br/pkg/version", "//ddl", "//distsql", "//kv", diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 48d4b2656f999..da5c1af1930d3 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/kv" @@ -535,7 +536,46 @@ func BuildBackupRangeAndSchema( continue } +<<<<<<< HEAD tables, err := m.ListTables(dbInfo.ID) +======= + hasTable := false + err = m.IterTables(dbInfo.ID, func(tableInfo *model.TableInfo) error { + if tableInfo.Version > version.CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION { + // normally this shouldn't happen in a production env. + // because we had a unit test to avoid table info version update silencly. + // and had version check before run backup. + return errors.Errorf("backup doesn't not support table %s with version %d, maybe try a new version of br", + tableInfo.Name.String(), + tableInfo.Version, + ) + } + if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) { + // Skip tables other than the given table. + return nil + } + + schemasNum += 1 + hasTable = true + if buildRange { + tableRanges, err := BuildTableRanges(tableInfo) + if err != nil { + return errors.Trace(err) + } + for _, r := range tableRanges { + // Add keyspace prefix to BackupRequest + startKey, endKey := storage.GetCodec().EncodeRange(r.StartKey, r.EndKey) + ranges = append(ranges, rtree.Range{ + StartKey: startKey, + EndKey: endKey, + }) + } + } + + return nil + }) + +>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521)) if err != nil { return nil, nil, nil, errors.Trace(err) } diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 7398abdbb2cb9..ef3c8a092d349 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -384,6 +384,20 @@ func (sr *SchemasReplace) rewriteEntryForTable(e *kv.Entry, cf string) (*kv.Entr return &kv.Entry{Key: newKey, Value: result.NewValue}, nil } +func (sr *SchemasReplace) rewriteEntryForAutoIncrementIDKey(e *kv.Entry, cf string) (*kv.Entry, error) { + newKey, err := sr.rewriteKeyForTable( + e.Key, + cf, + meta.ParseAutoIncrementIDKey, + meta.AutoIncrementIDKey, + ) + if err != nil { + return nil, errors.Trace(err) + } + + return &kv.Entry{Key: newKey, Value: e.Value}, nil +} + func (sr *SchemasReplace) rewriteEntryForAutoTableIDKey(e *kv.Entry, cf string) (*kv.Entry, error) { newKey, needWrite, err := sr.rewriteKeyForTable( e.Key, @@ -545,6 +559,21 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err } else { return nil, nil } +<<<<<<< HEAD +======= + if meta.IsTableKey(rawKey.Field) { + return sr.rewriteEntryForTable(e, cf) + } else if meta.IsAutoIncrementIDKey(rawKey.Field) { + return sr.rewriteEntryForAutoIncrementIDKey(e, cf) + } else if meta.IsAutoTableIDKey(rawKey.Field) { + return sr.rewriteEntryForAutoTableIDKey(e, cf) + } else if meta.IsSequenceKey(rawKey.Field) { + return sr.rewriteEntryForSequenceKey(e, cf) + } else if meta.IsAutoRandomTableIDKey(rawKey.Field) { + return sr.rewriteEntryForAutoRandomTableIDKey(e, cf) + } + return nil, nil +>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521)) } func (sr *SchemasReplace) tryToGCJob(job *model.Job) error { diff --git a/br/pkg/stream/rewrite_meta_rawkv_test.go b/br/pkg/stream/rewrite_meta_rawkv_test.go index d2cbe24e8295d..0707cf1c3aa31 100644 --- a/br/pkg/stream/rewrite_meta_rawkv_test.go +++ b/br/pkg/stream/rewrite_meta_rawkv_test.go @@ -82,7 +82,89 @@ func TestRewriteValueForDB(t *testing.T) { require.Equal(t, newId, sr.DbMap[dbID].NewDBID) } +<<<<<<< HEAD func TestRewriteValueForTable(t *testing.T) { +======= +func TestRewriteKeyForTable(t *testing.T) { + var ( + dbID int64 = 1 + tableID int64 = 57 + ts uint64 = 400036290571534337 + ) + cases := []struct { + encodeTableFn func(int64) []byte + decodeTableFn func([]byte) (int64, error) + }{ + { + meta.TableKey, + meta.ParseTableKey, + }, + { + meta.AutoIncrementIDKey, + meta.ParseAutoIncrementIDKey, + }, + { + meta.AutoTableIDKey, + meta.ParseAutoTableIDKey, + }, + { + meta.AutoRandomTableIDKey, + meta.ParseAutoRandomTableIDKey, + }, + { + meta.SequenceKey, + meta.ParseSequenceKey, + }, + } + + for _, ca := range cases { + encodedKey := encodeTxnMetaKey(meta.DBkey(dbID), ca.encodeTableFn(tableID), ts) + // create schemasReplace. + sr := MockEmptySchemasReplace(nil) + + // set preConstruct status and construct map information. + sr.SetPreConstructMapStatus() + newKey, err := sr.rewriteKeyForTable(encodedKey, WriteCF, ca.decodeTableFn, ca.encodeTableFn) + require.Nil(t, err) + require.Nil(t, newKey) + require.Equal(t, len(sr.DbMap), 1) + require.Equal(t, len(sr.DbMap[dbID].TableMap), 1) + downStreamDbID := sr.DbMap[dbID].DbID + downStreamTblID := sr.DbMap[dbID].TableMap[tableID].TableID + + // set restoreKV status and rewrite it. + sr.SetRestoreKVStatus() + newKey, err = sr.rewriteKeyForTable(encodedKey, DefaultCF, ca.decodeTableFn, ca.encodeTableFn) + require.Nil(t, err) + decodedKey, err := ParseTxnMetaKeyFrom(newKey) + require.Nil(t, err) + require.Equal(t, decodedKey.Ts, ts) + + newDbID, err := meta.ParseDBKey(decodedKey.Key) + require.Nil(t, err) + require.Equal(t, newDbID, downStreamDbID) + newTblID, err := ca.decodeTableFn(decodedKey.Field) + require.Nil(t, err) + require.Equal(t, newTblID, downStreamTblID) + + // rewrite it again, and get the same result. + newKey, err = sr.rewriteKeyForTable(encodedKey, WriteCF, ca.decodeTableFn, ca.encodeTableFn) + require.Nil(t, err) + decodedKey, err = ParseTxnMetaKeyFrom(newKey) + require.Nil(t, err) + require.Equal(t, decodedKey.Ts, sr.RewriteTS) + + newDbID, err = meta.ParseDBKey(decodedKey.Key) + require.Nil(t, err) + require.Equal(t, newDbID, downStreamDbID) + newTblID, err = ca.decodeTableFn(decodedKey.Field) + require.Nil(t, err) + require.Equal(t, newTblID, downStreamTblID) + } +} + +func TestRewriteTableInfo(t *testing.T) { +>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521)) var ( dbId int64 = 40 tableID int64 = 100 diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 190ad8b72ed9c..41bdcd2a96d49 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" filter "github.com/pingcap/tidb/util/table-filter" "github.com/spf13/cobra" @@ -103,6 +104,12 @@ const ( flagFullBackupType = "type" ) +const ( + // Once TableInfoVersion updated. BR need to check compatibility with + // new TableInfoVersion. both snapshot restore and pitr need to be checked. + CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5 +) + // FullBackupType type when doing full backup or restore type FullBackupType string diff --git a/br/pkg/version/BUILD.bazel b/br/pkg/version/BUILD.bazel index 8171081ae5df1..c891492ae8780 100644 --- a/br/pkg/version/BUILD.bazel +++ b/br/pkg/version/BUILD.bazel @@ -10,7 +10,11 @@ go_library( "//br/pkg/logutil", "//br/pkg/utils", "//br/pkg/version/build", +<<<<<<< HEAD "//sessionctx/variable", +======= + "//parser/model", +>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521)) "//util/engine", "@com_github_coreos_go_semver//semver", "@com_github_pingcap_errors//:errors", @@ -27,9 +31,16 @@ go_test( srcs = ["version_test.go"], embed = [":version"], flaky = True, +<<<<<<< HEAD deps = [ "//br/pkg/version/build", "//sessionctx/variable", +======= + shard_count = 10, + deps = [ + "//br/pkg/version/build", + "//parser/model", +>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521)) "@com_github_coreos_go_semver//semver", "@com_github_data_dog_go_sqlmock//:go-sqlmock", "@com_github_pingcap_kvproto//pkg/metapb", diff --git a/br/pkg/version/version.go b/br/pkg/version/version.go index c49e3d1ada923..4abde9b368520 100644 --- a/br/pkg/version/version.go +++ b/br/pkg/version/version.go @@ -18,7 +18,11 @@ import ( "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version/build" +<<<<<<< HEAD "github.com/pingcap/tidb/sessionctx/variable" +======= + "github.com/pingcap/tidb/parser/model" +>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521)) "github.com/pingcap/tidb/util/engine" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -36,6 +40,10 @@ var ( checkpointSupportError error = nil // pitrSupportBatchKVFiles specifies whether TiKV-server supports batch PITR. pitrSupportBatchKVFiles bool = false + + // Once TableInfoVersion updated. BR need to check compatibility with + // new TableInfoVersion. both snapshot restore and pitr need to be checked. + CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5 ) // NextMajorVersion returns the next major version. diff --git a/br/pkg/version/version_test.go b/br/pkg/version/version_test.go index 1fc654b3990b6..516597927fd58 100644 --- a/br/pkg/version/version_test.go +++ b/br/pkg/version/version_test.go @@ -13,7 +13,11 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/version/build" +<<<<<<< HEAD "github.com/pingcap/tidb/sessionctx/variable" +======= + "github.com/pingcap/tidb/parser/model" +>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521)) "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" ) @@ -633,3 +637,11 @@ Check Table Before Drop: false` require.NoError(t, err) require.Equal(t, "5.7.25", versionStr) } + +func TestEnsureSupportVersion(t *testing.T) { + // Once this test failed. please check the compatibility carefully. + // *** Don't change this test simply. *** + require.Equal(t, + CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION, + model.CurrLatestTableInfoVersion) +} diff --git a/meta/meta.go b/meta/meta.go index 52905f1f50211..6c79ac23b6659 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -276,9 +276,30 @@ func ParseAutoTableIDKey(key []byte) (int64, error) { } func (*Meta) autoIncrementIDKey(tableID int64) []byte { + return AutoIncrementIDKey(tableID) +} + +// AutoIncrementIDKey decodes the auto inc table key. +func AutoIncrementIDKey(tableID int64) []byte { return []byte(fmt.Sprintf("%s:%d", mIncIDPrefix, tableID)) } +// IsAutoIncrementIDKey checks whether the key is auto increment key. +func IsAutoIncrementIDKey(key []byte) bool { + return strings.HasPrefix(string(key), mIncIDPrefix+":") +} + +// ParseAutoIncrementIDKey decodes the tableID from the auto tableID key. +func ParseAutoIncrementIDKey(key []byte) (int64, error) { + if !IsAutoIncrementIDKey(key) { + return 0, ErrInvalidString.GenWithStack("fail to parse autoIncrementKey") + } + + tableID := strings.TrimPrefix(string(key), mIncIDPrefix+":") + id, err := strconv.Atoi(tableID) + return int64(id), err +} + func (*Meta) autoRandomTableIDKey(tableID int64) []byte { return AutoRandomTableIDKey(tableID) }