Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore: rewrite auto increment id after pitr #46521

Merged
merged 9 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//br/pkg/storage",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/version",
"//ddl",
"//distsql",
"//kv",
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -537,6 +538,15 @@ func BuildBackupRangeAndInitSchema(

hasTable := false
err = m.IterTables(dbInfo.ID, func(tableInfo *model.TableInfo) error {
if tableInfo.Version > version.CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION {
// normaly this shouldn't happen in a production env.
// because we had a unit test to avoid table info version update silencly.
// and had version check before run backup.
return errors.Errorf("backup doesn't not support table %s with version %d, maybe try a new version of br",
tableInfo.Name.String(),
tableInfo.Version,
)
}
if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) {
// Skip tables other than the given table.
return nil
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,20 @@ func (sr *SchemasReplace) rewriteEntryForTable(e *kv.Entry, cf string) (*kv.Entr
return &kv.Entry{Key: newKey, Value: result.NewValue}, nil
}

func (sr *SchemasReplace) rewriteEntryForAutoIncrementIDKey(e *kv.Entry, cf string) (*kv.Entry, error) {
newKey, err := sr.rewriteKeyForTable(
e.Key,
cf,
meta.ParseAutoIncrementIDKey,
meta.AutoIncrementIDKey,
)
if err != nil {
return nil, errors.Trace(err)
}

return &kv.Entry{Key: newKey, Value: e.Value}, nil
}

func (sr *SchemasReplace) rewriteEntryForAutoTableIDKey(e *kv.Entry, cf string) (*kv.Entry, error) {
newKey, err := sr.rewriteKeyForTable(
e.Key,
Expand Down Expand Up @@ -652,6 +666,8 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err
}
if meta.IsTableKey(rawKey.Field) {
return sr.rewriteEntryForTable(e, cf)
} else if meta.IsAutoIncrementIDKey(rawKey.Field) {
return sr.rewriteEntryForAutoIncrementIDKey(e, cf)
} else if meta.IsAutoTableIDKey(rawKey.Field) {
return sr.rewriteEntryForAutoTableIDKey(e, cf)
} else if meta.IsSequenceKey(rawKey.Field) {
Expand Down
111 changes: 69 additions & 42 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,49 +208,76 @@ func TestRewriteKeyForTable(t *testing.T) {
tableID int64 = 57
ts uint64 = 400036290571534337
)
encodedKey := encodeTxnMetaKey(meta.DBkey(dbID), meta.TableKey(tableID), ts)

// create schemasReplace.
sr := MockEmptySchemasReplace(nil)

// set preConstruct status and construct map information.
sr.SetPreConstructMapStatus()
newKey, err := sr.rewriteKeyForTable(encodedKey, WriteCF, meta.ParseTableKey, meta.TableKey)
require.Nil(t, err)
require.Nil(t, newKey)
require.Equal(t, len(sr.DbMap), 1)
require.Equal(t, len(sr.DbMap[dbID].TableMap), 1)
downStreamDbID := sr.DbMap[dbID].DbID
downStreamTblID := sr.DbMap[dbID].TableMap[tableID].TableID

// set restoreKV status and rewrite it.
sr.SetRestoreKVStatus()
newKey, err = sr.rewriteKeyForTable(encodedKey, DefaultCF, meta.ParseTableKey, meta.TableKey)
require.Nil(t, err)
decodedKey, err := ParseTxnMetaKeyFrom(newKey)
require.Nil(t, err)
require.Equal(t, decodedKey.Ts, ts)

newDbID, err := meta.ParseDBKey(decodedKey.Key)
require.Nil(t, err)
require.Equal(t, newDbID, downStreamDbID)
newTblID, err := meta.ParseTableKey(decodedKey.Field)
require.Nil(t, err)
require.Equal(t, newTblID, downStreamTblID)

// rewrite it again, and get the same result.
newKey, err = sr.rewriteKeyForTable(encodedKey, WriteCF, meta.ParseTableKey, meta.TableKey)
require.Nil(t, err)
decodedKey, err = ParseTxnMetaKeyFrom(newKey)
require.Nil(t, err)
require.Equal(t, decodedKey.Ts, sr.RewriteTS)
cases := []struct {
encodeTableFn func(int64) []byte
decodeTableFn func([]byte) (int64, error)
}{
{
meta.TableKey,
meta.ParseTableKey,
},
{
meta.AutoIncrementIDKey,
meta.ParseAutoIncrementIDKey,
},
{
meta.AutoTableIDKey,
meta.ParseAutoTableIDKey,
},
{
meta.AutoRandomTableIDKey,
meta.ParseAutoRandomTableIDKey,
},
{
meta.SequenceKey,
meta.ParseSequenceKey,
},
}

newDbID, err = meta.ParseDBKey(decodedKey.Key)
require.Nil(t, err)
require.Equal(t, newDbID, downStreamDbID)
newTblID, err = meta.ParseTableKey(decodedKey.Field)
require.Nil(t, err)
require.Equal(t, newTblID, downStreamTblID)
for _, ca := range cases {
encodedKey := encodeTxnMetaKey(meta.DBkey(dbID), ca.encodeTableFn(tableID), ts)
// create schemasReplace.
sr := MockEmptySchemasReplace(nil)

// set preConstruct status and construct map information.
sr.SetPreConstructMapStatus()
newKey, err := sr.rewriteKeyForTable(encodedKey, WriteCF, ca.decodeTableFn, ca.encodeTableFn)
require.Nil(t, err)
require.Nil(t, newKey)
require.Equal(t, len(sr.DbMap), 1)
require.Equal(t, len(sr.DbMap[dbID].TableMap), 1)
downStreamDbID := sr.DbMap[dbID].DbID
downStreamTblID := sr.DbMap[dbID].TableMap[tableID].TableID

// set restoreKV status and rewrite it.
sr.SetRestoreKVStatus()
newKey, err = sr.rewriteKeyForTable(encodedKey, DefaultCF, ca.decodeTableFn, ca.encodeTableFn)
require.Nil(t, err)
decodedKey, err := ParseTxnMetaKeyFrom(newKey)
require.Nil(t, err)
require.Equal(t, decodedKey.Ts, ts)

newDbID, err := meta.ParseDBKey(decodedKey.Key)
require.Nil(t, err)
require.Equal(t, newDbID, downStreamDbID)
newTblID, err := ca.decodeTableFn(decodedKey.Field)
require.Nil(t, err)
require.Equal(t, newTblID, downStreamTblID)

// rewrite it again, and get the same result.
newKey, err = sr.rewriteKeyForTable(encodedKey, WriteCF, ca.decodeTableFn, ca.encodeTableFn)
require.Nil(t, err)
decodedKey, err = ParseTxnMetaKeyFrom(newKey)
require.Nil(t, err)
require.Equal(t, decodedKey.Ts, sr.RewriteTS)

newDbID, err = meta.ParseDBKey(decodedKey.Key)
require.Nil(t, err)
require.Equal(t, newDbID, downStreamDbID)
newTblID, err = ca.decodeTableFn(decodedKey.Field)
require.Nil(t, err)
require.Equal(t, newTblID, downStreamTblID)
}
}

func TestRewriteTableInfo(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -103,6 +104,12 @@ const (
flagFullBackupType = "type"
)

const (
// Once TableInfoVersion updated. BR need to check compatibility with
// new TableInfoVersion. both snapshot restore and pitr need to be checked.
CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5
)

// FullBackupType type when doing full backup or restore
type FullBackupType string

Expand Down
4 changes: 3 additions & 1 deletion br/pkg/version/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//br/pkg/logutil",
"//br/pkg/utils",
"//br/pkg/version/build",
"//parser/model",
"//util/engine",
"@com_github_coreos_go_semver//semver",
"@com_github_pingcap_errors//:errors",
Expand All @@ -26,9 +27,10 @@ go_test(
srcs = ["version_test.go"],
embed = [":version"],
flaky = True,
shard_count = 9,
shard_count = 10,
deps = [
"//br/pkg/version/build",
"//parser/model",
"@com_github_coreos_go_semver//semver",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version/build"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/engine"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand All @@ -35,6 +36,10 @@ var (
checkpointSupportError error = nil
// pitrSupportBatchKVFiles specifies whether TiKV-server supports batch PITR.
pitrSupportBatchKVFiles bool = false

// Once TableInfoVersion updated. BR need to check compatibility with
// new TableInfoVersion. both snapshot restore and pitr need to be checked.
CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5
)

// NextMajorVersion returns the next major version.
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/version/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/version/build"
"github.com/pingcap/tidb/parser/model"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
)
Expand Down Expand Up @@ -622,3 +623,11 @@ Check Table Before Drop: false`
require.NoError(t, err)
require.Equal(t, "5.7.25", versionStr)
}

func TestEnsureSupportVersion(t *testing.T) {
// Once this test failed. please check the compatiblity carefully.
// *** Don't change this test simply. ***
require.Equal(t,
CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION,
model.CurrLatestTableInfoVersion)
}
20 changes: 20 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,26 @@ func (*Meta) autoIncrementIDKey(tableID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mIncIDPrefix, tableID))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call the AutoIncrementIDKey instead.

}

// AutoIncrementIDKey decodes the auto inc table key.
func AutoIncrementIDKey(tableID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mIncIDPrefix, tableID))
}

func IsAutoIncrementIDKey(key []byte) bool {
return strings.HasPrefix(string(key), mIncIDPrefix+":")
}

// ParseAutoIncrementIDKey decodes the tableID from the auto tableID key.
func ParseAutoIncrementIDKey(key []byte) (int64, error) {
if !IsAutoIncrementIDKey(key) {
return 0, ErrInvalidString.GenWithStack("fail to parse autoIncrementKey")
}

tableID := strings.TrimPrefix(string(key), mIncIDPrefix+":")
id, err := strconv.Atoi(tableID)
return int64(id), err
}

func (*Meta) autoRandomTableIDKey(tableID int64) []byte {
return AutoRandomTableIDKey(tableID)
}
Expand Down