Skip to content

Commit

Permalink
br/stream: Added Version Check for Loading Migrations. (#57541)
Browse files Browse the repository at this point in the history
close #57540
  • Loading branch information
YuJuncen authored Nov 21, 2024
1 parent c068b39 commit 91c14a4
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 24 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5776,13 +5776,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "d470ef683433f2c5bc7a1e610da44d516908d326a0341c07208af76a30f0d8a6",
strip_prefix = "github.com/pingcap/[email protected]20241113043844-e1fa7ea8c302",
sha256 = "7d3b6f6b755b027ba138d3069238f4a4e91d0d1f573de17cda00985616adc843",
strip_prefix = "github.com/pingcap/[email protected]20241120022153-92b0414aeed8",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241113043844-e1fa7ea8c302.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241113043844-e1fa7ea8c302.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241113043844-e1fa7ea8c302.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241113043844-e1fa7ea8c302.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip",
],
)
go_repository(
Expand Down
19 changes: 10 additions & 9 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ func IsContextCanceled(err error) bool {

// BR errors.
var (
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument"))
ErrUndefinedRestoreDbOrTable = errors.Normalize("undefined restore databases or tables", errors.RFCCodeText("BR:Common:ErrUndefinedDbOrTable"))
ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch"))
ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect"))
ErrInvalidMetaFile = errors.Normalize("invalid metafile: %s", errors.RFCCodeText("BR:Common:ErrInvalidMetaFile"))
ErrEnvNotSpecified = errors.Normalize("environment variable not found", errors.RFCCodeText("BR:Common:ErrEnvNotSpecified"))
ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation"))
ErrInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Common:ErrInvalidRange"))
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument"))
ErrUndefinedRestoreDbOrTable = errors.Normalize("undefined restore databases or tables", errors.RFCCodeText("BR:Common:ErrUndefinedDbOrTable"))
ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch"))
ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect"))
ErrInvalidMetaFile = errors.Normalize("invalid metafile: %s", errors.RFCCodeText("BR:Common:ErrInvalidMetaFile"))
ErrEnvNotSpecified = errors.Normalize("environment variable not found", errors.RFCCodeText("BR:Common:ErrEnvNotSpecified"))
ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation"))
ErrInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Common:ErrInvalidRange"))
ErrMigrationVersionNotSupported = errors.Normalize("the migration version isn't supported", errors.RFCCodeText("BR:Common:ErrMigrationVersionNotSupported"))

ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/stream/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//pkg/util/codec",
"//pkg/util/mathutil",
"//pkg/util/table-filter",
"//pkg/util/versioninfo",
"@com_github_docker_go_units//:go-units",
"@com_github_fatih_color//:color",
"@com_github_klauspost_compress//zstd",
Expand Down Expand Up @@ -64,7 +65,7 @@ go_test(
],
embed = [":stream"],
flaky = True,
shard_count = 44,
shard_count = 46,
deps = [
"//br/pkg/storage",
"//br/pkg/streamhelper",
Expand Down
43 changes: 38 additions & 5 deletions br/pkg/stream/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils/iter"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/pingcap/tidb/pkg/util/versioninfo"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -38,8 +39,17 @@ const (
baseTmp = "BASE_TMP"
metaSuffix = ".meta"
migrationPrefix = "v1/migrations"

SupportedMigVersion = pb.MigrationVersion_M1
)

func NewMigration() *pb.Migration {
return &pb.Migration{
Version: pb.MigrationVersion_M1,
Creator: fmt.Sprintf("br;commit=%s;branch=%s", versioninfo.TiDBGitHash, versioninfo.TiDBGitBranch),
}
}

type StreamMetadataSet struct {
// if set true, the metadata and datafile won't be removed
DryRun bool
Expand Down Expand Up @@ -196,7 +206,7 @@ func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(
hst := ms.hook(st)
est := MigerationExtension(hst)
est.Hooks = updateFnHook{updateFn: updateFn}
res := MigratedTo{NewBase: new(pb.Migration)}
res := MigratedTo{NewBase: NewMigration()}
est.doTruncateLogs(ctx, ms, from, &res)

if bst, ok := hst.ExternalStorage.(*storage.Batched); ok {
Expand Down Expand Up @@ -517,7 +527,7 @@ func MigerationExtension(s storage.ExternalStorage) MigrationExt {
// Merge merges two migrations.
// The merged migration contains all operations from the two arguments.
func MergeMigrations(m1 *pb.Migration, m2 *pb.Migration) *pb.Migration {
out := new(pb.Migration)
out := NewMigration()
out.EditMeta = mergeMetaEdits(m1.GetEditMeta(), m2.GetEditMeta())
out.Compactions = append(out.Compactions, m1.GetCompactions()...)
out.Compactions = append(out.Compactions, m2.GetCompactions()...)
Expand Down Expand Up @@ -563,6 +573,24 @@ type OrderedMigration struct {
Content pb.Migration `json:"content"`
}

func (o *OrderedMigration) unmarshalContent(b []byte) error {
err := o.Content.Unmarshal(b)
if err != nil {
return err
}
if o.Content.Version > SupportedMigVersion {
return errors.Annotatef(
berrors.ErrMigrationVersionNotSupported,
"the migration at %s has version %s(%d), the max version we support is %s(%d)",
o.Path,
o.Content.Version, o.Content.Version,
SupportedMigVersion, SupportedMigVersion,
)
}

return nil
}

// Load loads the current living migrations from the storage.
func (m MigrationExt) Load(ctx context.Context) (Migrations, error) {
opt := &storage.WalkOption{
Expand All @@ -575,6 +603,11 @@ func (m MigrationExt) Load(ctx context.Context) (Migrations, error) {
if err != nil {
return errors.Trace(err)
}
err = t.unmarshalContent(b)
if err != nil {
return err
}

if t.SeqNum == baseMigrationSN {
// NOTE: the legacy truncating isn't implemented by appending a migration.
// We load their checkpoint here to be compatible with them.
Expand All @@ -585,7 +618,7 @@ func (m MigrationExt) Load(ctx context.Context) (Migrations, error) {
}
t.Content.TruncatedTo = max(truncatedTs, t.Content.TruncatedTo)
}
return t.Content.Unmarshal(b)
return nil
})
collected := iter.CollectAll(ctx, items)
if collected.Err != nil {
Expand All @@ -605,7 +638,7 @@ func (m MigrationExt) Load(ctx context.Context) (Migrations, error) {
// The BASE migration isn't persisted.
// This happens when `migrate-to` wasn't run ever.
result = Migrations{
Base: new(pb.Migration),
Base: NewMigration(),
Layers: collected.Item,
}
}
Expand Down Expand Up @@ -818,7 +851,7 @@ func (m MigrationExt) MigrateTo(ctx context.Context, mig *pb.Migration, opts ...
}

result := MigratedTo{
NewBase: new(pb.Migration),
NewBase: NewMigration(),
}
// Fills: EditMeta for new Base.
m.doMetaEdits(ctx, mig, &result)
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/stream/stream_metas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,12 @@ func mTruncatedTo(to uint64) migOP {
}
}

func mVersion(ver backuppb.MigrationVersion) migOP {
return func(m *backuppb.Migration) {
m.Version = ver
}
}

// tmp creates a temporary storage.
func tmp(t *testing.T) *storage.LocalStorage {
tmpDir := t.TempDir()
Expand Down Expand Up @@ -2830,3 +2836,21 @@ func TestWithSimpleTruncate(t *testing.T) {
}
}
}

func TestUnsupportedVersion(t *testing.T) {
s := tmp(t)
m := mig(mVersion(backuppb.MigrationVersion(65535)))
pmig(s, 1, m)

est := MigerationExtension(s)
ctx := context.Background()
_, err := est.Load(ctx)
require.Error(t, err)
require.ErrorContains(t, err, "ErrMigrationVersionNotSupported")
}

func TestCreator(t *testing.T) {
mig := NewMigration()
require.Contains(t, mig.Creator, "br")
require.Equal(t, mig.Version, SupportedMigVersion)
}
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ error = '''
invalid restore range
'''

["BR:Common:ErrMigrationVersionNotSupported"]
error = '''
the migration version isn't supported
'''

["BR:Common:ErrUndefinedDbOrTable"]
error = '''
undefined restore databases or tables
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/fn v1.0.0
github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302
github.com/pingcap/kvproto v0.0.0-20241120022153-92b0414aeed8
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 h1:ynwwqr0rLliSOJcx0wHMu4T/NiPXHlK48mk2DCrBKCI=
github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20241120022153-92b0414aeed8 h1:aNNifhc6xCjXKejjiNYtJJLFNMXnoDiXxkJIg1JErQE=
github.com/pingcap/kvproto v0.0.0-20241120022153-92b0414aeed8/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfUnQGqft0ud+xVFuCdp1XkVL0X1E=
Expand Down

0 comments on commit 91c14a4

Please sign in to comment.