Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#46521
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3pointer authored and ti-chi-bot committed Sep 4, 2023
1 parent e2ba276 commit 2fed7db
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 0 deletions.
1 change: 1 addition & 0 deletions br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//br/pkg/storage",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/version",
"//ddl",
"//distsql",
"//kv",
Expand Down
40 changes: 40 additions & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -535,7 +536,46 @@ func BuildBackupRangeAndSchema(
continue
}

<<<<<<< HEAD
tables, err := m.ListTables(dbInfo.ID)
=======
hasTable := false
err = m.IterTables(dbInfo.ID, func(tableInfo *model.TableInfo) error {
if tableInfo.Version > version.CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION {
// normally this shouldn't happen in a production env.
// because we had a unit test to avoid table info version update silencly.
// and had version check before run backup.
return errors.Errorf("backup doesn't not support table %s with version %d, maybe try a new version of br",
tableInfo.Name.String(),
tableInfo.Version,
)
}
if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) {
// Skip tables other than the given table.
return nil
}

schemasNum += 1
hasTable = true
if buildRange {
tableRanges, err := BuildTableRanges(tableInfo)
if err != nil {
return errors.Trace(err)
}
for _, r := range tableRanges {
// Add keyspace prefix to BackupRequest
startKey, endKey := storage.GetCodec().EncodeRange(r.StartKey, r.EndKey)
ranges = append(ranges, rtree.Range{
StartKey: startKey,
EndKey: endKey,
})
}
}

return nil
})

>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521))
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,20 @@ func (sr *SchemasReplace) rewriteEntryForTable(e *kv.Entry, cf string) (*kv.Entr
return &kv.Entry{Key: newKey, Value: result.NewValue}, nil
}

func (sr *SchemasReplace) rewriteEntryForAutoIncrementIDKey(e *kv.Entry, cf string) (*kv.Entry, error) {
newKey, err := sr.rewriteKeyForTable(
e.Key,
cf,
meta.ParseAutoIncrementIDKey,
meta.AutoIncrementIDKey,
)
if err != nil {
return nil, errors.Trace(err)
}

return &kv.Entry{Key: newKey, Value: e.Value}, nil
}

func (sr *SchemasReplace) rewriteEntryForAutoTableIDKey(e *kv.Entry, cf string) (*kv.Entry, error) {
newKey, needWrite, err := sr.rewriteKeyForTable(
e.Key,
Expand Down Expand Up @@ -545,6 +559,21 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err
} else {
return nil, nil
}
<<<<<<< HEAD
=======
if meta.IsTableKey(rawKey.Field) {
return sr.rewriteEntryForTable(e, cf)
} else if meta.IsAutoIncrementIDKey(rawKey.Field) {
return sr.rewriteEntryForAutoIncrementIDKey(e, cf)
} else if meta.IsAutoTableIDKey(rawKey.Field) {
return sr.rewriteEntryForAutoTableIDKey(e, cf)
} else if meta.IsSequenceKey(rawKey.Field) {
return sr.rewriteEntryForSequenceKey(e, cf)
} else if meta.IsAutoRandomTableIDKey(rawKey.Field) {
return sr.rewriteEntryForAutoRandomTableIDKey(e, cf)
}
return nil, nil
>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521))
}

func (sr *SchemasReplace) tryToGCJob(job *model.Job) error {
Expand Down
82 changes: 82 additions & 0 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,89 @@ func TestRewriteValueForDB(t *testing.T) {
require.Equal(t, newId, sr.DbMap[dbID].NewDBID)
}

<<<<<<< HEAD
func TestRewriteValueForTable(t *testing.T) {
=======
func TestRewriteKeyForTable(t *testing.T) {
var (
dbID int64 = 1
tableID int64 = 57
ts uint64 = 400036290571534337
)
cases := []struct {
encodeTableFn func(int64) []byte
decodeTableFn func([]byte) (int64, error)
}{
{
meta.TableKey,
meta.ParseTableKey,
},
{
meta.AutoIncrementIDKey,
meta.ParseAutoIncrementIDKey,
},
{
meta.AutoTableIDKey,
meta.ParseAutoTableIDKey,
},
{
meta.AutoRandomTableIDKey,
meta.ParseAutoRandomTableIDKey,
},
{
meta.SequenceKey,
meta.ParseSequenceKey,
},
}

for _, ca := range cases {
encodedKey := encodeTxnMetaKey(meta.DBkey(dbID), ca.encodeTableFn(tableID), ts)
// create schemasReplace.
sr := MockEmptySchemasReplace(nil)

// set preConstruct status and construct map information.
sr.SetPreConstructMapStatus()
newKey, err := sr.rewriteKeyForTable(encodedKey, WriteCF, ca.decodeTableFn, ca.encodeTableFn)
require.Nil(t, err)
require.Nil(t, newKey)
require.Equal(t, len(sr.DbMap), 1)
require.Equal(t, len(sr.DbMap[dbID].TableMap), 1)
downStreamDbID := sr.DbMap[dbID].DbID
downStreamTblID := sr.DbMap[dbID].TableMap[tableID].TableID

// set restoreKV status and rewrite it.
sr.SetRestoreKVStatus()
newKey, err = sr.rewriteKeyForTable(encodedKey, DefaultCF, ca.decodeTableFn, ca.encodeTableFn)
require.Nil(t, err)
decodedKey, err := ParseTxnMetaKeyFrom(newKey)
require.Nil(t, err)
require.Equal(t, decodedKey.Ts, ts)

newDbID, err := meta.ParseDBKey(decodedKey.Key)
require.Nil(t, err)
require.Equal(t, newDbID, downStreamDbID)
newTblID, err := ca.decodeTableFn(decodedKey.Field)
require.Nil(t, err)
require.Equal(t, newTblID, downStreamTblID)

// rewrite it again, and get the same result.
newKey, err = sr.rewriteKeyForTable(encodedKey, WriteCF, ca.decodeTableFn, ca.encodeTableFn)
require.Nil(t, err)
decodedKey, err = ParseTxnMetaKeyFrom(newKey)
require.Nil(t, err)
require.Equal(t, decodedKey.Ts, sr.RewriteTS)

newDbID, err = meta.ParseDBKey(decodedKey.Key)
require.Nil(t, err)
require.Equal(t, newDbID, downStreamDbID)
newTblID, err = ca.decodeTableFn(decodedKey.Field)
require.Nil(t, err)
require.Equal(t, newTblID, downStreamTblID)
}
}

func TestRewriteTableInfo(t *testing.T) {
>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521))
var (
dbId int64 = 40
tableID int64 = 100
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -103,6 +104,12 @@ const (
flagFullBackupType = "type"
)

const (
// Once TableInfoVersion updated. BR need to check compatibility with
// new TableInfoVersion. both snapshot restore and pitr need to be checked.
CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5
)

// FullBackupType type when doing full backup or restore
type FullBackupType string

Expand Down
11 changes: 11 additions & 0 deletions br/pkg/version/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ go_library(
"//br/pkg/logutil",
"//br/pkg/utils",
"//br/pkg/version/build",
<<<<<<< HEAD
"//sessionctx/variable",
=======
"//parser/model",
>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521))
"//util/engine",
"@com_github_coreos_go_semver//semver",
"@com_github_pingcap_errors//:errors",
Expand All @@ -27,9 +31,16 @@ go_test(
srcs = ["version_test.go"],
embed = [":version"],
flaky = True,
<<<<<<< HEAD
deps = [
"//br/pkg/version/build",
"//sessionctx/variable",
=======
shard_count = 10,
deps = [
"//br/pkg/version/build",
"//parser/model",
>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521))
"@com_github_coreos_go_semver//semver",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import (
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version/build"
<<<<<<< HEAD
"github.com/pingcap/tidb/sessionctx/variable"
=======
"github.com/pingcap/tidb/parser/model"
>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521))
"github.com/pingcap/tidb/util/engine"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand All @@ -36,6 +40,10 @@ var (
checkpointSupportError error = nil
// pitrSupportBatchKVFiles specifies whether TiKV-server supports batch PITR.
pitrSupportBatchKVFiles bool = false

// Once TableInfoVersion updated. BR need to check compatibility with
// new TableInfoVersion. both snapshot restore and pitr need to be checked.
CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5
)

// NextMajorVersion returns the next major version.
Expand Down
12 changes: 12 additions & 0 deletions br/pkg/version/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/version/build"
<<<<<<< HEAD
"github.com/pingcap/tidb/sessionctx/variable"
=======
"github.com/pingcap/tidb/parser/model"
>>>>>>> e82519e79da (restore: rewrite auto increment id after pitr (#46521))
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
)
Expand Down Expand Up @@ -633,3 +637,11 @@ Check Table Before Drop: false`
require.NoError(t, err)
require.Equal(t, "5.7.25", versionStr)
}

func TestEnsureSupportVersion(t *testing.T) {
// Once this test failed. please check the compatibility carefully.
// *** Don't change this test simply. ***
require.Equal(t,
CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION,
model.CurrLatestTableInfoVersion)
}
21 changes: 21 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,30 @@ func ParseAutoTableIDKey(key []byte) (int64, error) {
}

func (*Meta) autoIncrementIDKey(tableID int64) []byte {
return AutoIncrementIDKey(tableID)
}

// AutoIncrementIDKey decodes the auto inc table key.
func AutoIncrementIDKey(tableID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mIncIDPrefix, tableID))
}

// IsAutoIncrementIDKey checks whether the key is auto increment key.
func IsAutoIncrementIDKey(key []byte) bool {
return strings.HasPrefix(string(key), mIncIDPrefix+":")
}

// ParseAutoIncrementIDKey decodes the tableID from the auto tableID key.
func ParseAutoIncrementIDKey(key []byte) (int64, error) {
if !IsAutoIncrementIDKey(key) {
return 0, ErrInvalidString.GenWithStack("fail to parse autoIncrementKey")
}

tableID := strings.TrimPrefix(string(key), mIncIDPrefix+":")
id, err := strconv.Atoi(tableID)
return int64(id), err
}

func (*Meta) autoRandomTableIDKey(tableID int64) []byte {
return AutoRandomTableIDKey(tableID)
}
Expand Down

0 comments on commit 2fed7db

Please sign in to comment.