From 6ca720d87df158004065301f9fd9e63f801e3ea4 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 10 Jan 2022 15:13:41 +0800 Subject: [PATCH] owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838) (#3865) --- cdc/capture/http_validator.go | 2 +- cdc/model/capture.go | 10 ++ cdc/model/capture_test.go | 17 +++ cdc/model/changefeed.go | 59 ++++++++- cdc/model/changefeed_test.go | 148 ++++++++++++++++++++++- cdc/owner/owner.go | 34 +++++- cdc/owner/owner_test.go | 38 ++++++ pkg/cmd/cli/cli_changefeed_create.go | 2 +- pkg/cmd/util/helper.go | 3 +- pkg/orchestrator/reactor_state.go | 2 +- pkg/version/check.go | 11 +- pkg/version/check_test.go | 43 ++++--- pkg/version/creator_version_gate.go | 61 ++++++++++ pkg/version/creator_version_gate_test.go | 78 ++++++++++++ 14 files changed, 468 insertions(+), 40 deletions(-) create mode 100644 pkg/version/creator_version_gate.go create mode 100644 pkg/version/creator_version_gate_test.go diff --git a/cdc/capture/http_validator.go b/cdc/capture/http_validator.go index 029cfd68368..fefb3807f4b 100644 --- a/cdc/capture/http_validator.go +++ b/cdc/capture/http_validator.go @@ -99,7 +99,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch return nil, err } // set sortEngine and EnableOldValue - cdcClusterVer, err := version.GetTiCDCClusterVersion(captureInfos) + cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos)) if err != nil { return nil, err } diff --git a/cdc/model/capture.go b/cdc/model/capture.go index aa2aa76331b..22c95e2a48e 100644 --- a/cdc/model/capture.go +++ b/cdc/model/capture.go @@ -43,3 +43,13 @@ func (c *CaptureInfo) Unmarshal(data []byte) error { return errors.Annotatef(cerror.WrapError(cerror.ErrUnmarshalFailed, err), "unmarshal data: %v", data) } + +// ListVersionsFromCaptureInfos returns the version list of the CaptureInfo list. +func ListVersionsFromCaptureInfos(captureInfos []*CaptureInfo) []string { + var captureVersions []string + for _, ci := range captureInfos { + captureVersions = append(captureVersions, ci.Version) + } + + return captureVersions +} diff --git a/cdc/model/capture_test.go b/cdc/model/capture_test.go index b1ff3e97a63..67c3f7f9366 100644 --- a/cdc/model/capture_test.go +++ b/cdc/model/capture_test.go @@ -36,3 +36,20 @@ func TestMarshalUnmarshal(t *testing.T) { require.Nil(t, err) require.Equal(t, info, decodedInfo) } + +func TestListVersionsFromCaptureInfos(t *testing.T) { + infos := []*CaptureInfo{ + { + ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891", + AdvertiseAddr: "127.0.0.1:8300", + Version: "dev", + }, + { + ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891", + AdvertiseAddr: "127.0.0.1:8300", + Version: "", + }, + } + + require.ElementsMatch(t, []string{"dev", ""}, ListVersionsFromCaptureInfos(infos)) +} diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index d1806071bc9..5584ffe8a47 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/cyclic/mark" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/version" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -48,7 +49,7 @@ const ( StateError FeedState = "error" StateFailed FeedState = "failed" StateStopped FeedState = "stopped" - StateRemoved FeedState = "removed" // deprecated, will be removed in the next version + StateRemoved FeedState = "removed" StateFinished FeedState = "finished" ) @@ -226,10 +227,10 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) { return cloned, err } -// VerifyAndFix verifies changefeed info and may fillin some fields. -// If a must field is not provided, return an error. -// If some necessary filed is missing but can use a default value, fillin it. -func (info *ChangeFeedInfo) VerifyAndFix() error { +// VerifyAndComplete verifies changefeed info and may fill in some fields. +// If a required field is not provided, return an error. +// If some necessary filed is missing but can use a default value, fill in it. +func (info *ChangeFeedInfo) VerifyAndComplete() error { defaultConfig := config.GetDefaultReplicaConfig() if info.Engine == "" { info.Engine = SortUnified @@ -255,6 +256,54 @@ func (info *ChangeFeedInfo) VerifyAndFix() error { return nil } +// FixIncompatible fixes incompatible changefeed meta info. +func (info *ChangeFeedInfo) FixIncompatible() { + creatorVersionGate := version.NewCreatorVersionGate(info.CreatorVersion) + if creatorVersionGate.ChangefeedStateFromAdminJob() { + log.Info("Start fixing incompatible changefeed state", zap.Any("changefeed", info)) + info.fixState() + log.Info("Fix incompatibility changefeed state completed", zap.Any("changefeed", info)) + } +} + +// fixState attempts to fix state loss from upgrading the old owner to the new owner. +func (info *ChangeFeedInfo) fixState() { + // Notice: In the old owner we used AdminJobType field to determine if the task was paused or not, + // we need to handle this field in the new owner. + // Otherwise, we will see that the old version of the task is paused and then upgraded, + // and the task is automatically resumed after the upgrade. + state := info.State + // Upgrading from an old owner, we need to deal with cases where the state is normal, + // but actually contains errors and does not match the admin job type. + if state == StateNormal { + switch info.AdminJobType { + // This corresponds to the case of failure or error. + case AdminNone, AdminResume: + if info.Error != nil { + if cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) { + state = StateFailed + } else { + state = StateError + } + } + case AdminStop: + state = StateStopped + case AdminFinish: + state = StateFinished + case AdminRemove: + state = StateRemoved + } + } + + if state != info.State { + log.Info("handle old owner inconsistent state", + zap.String("old state", string(info.State)), + zap.String("admin job type", info.AdminJobType.String()), + zap.String("new state", string(state))) + info.State = state + } +} + // CheckErrorHistory checks error history of a changefeed // if having error record older than GC interval, set needSave to true. // if error counts reach threshold, set canInit to false. diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 1a2d9103f20..ee1f03618a8 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -163,7 +163,7 @@ func TestFillV1(t *testing.T) { }, cfg) } -func TestVerifyAndFix(t *testing.T) { +func TestVerifyAndComplete(t *testing.T) { t.Parallel() info := &ChangeFeedInfo{ @@ -177,7 +177,7 @@ func TestVerifyAndFix(t *testing.T) { }, } - err := info.VerifyAndFix() + err := info.VerifyAndComplete() require.Nil(t, err) require.Equal(t, SortUnified, info.Engine) @@ -189,6 +189,150 @@ func TestVerifyAndFix(t *testing.T) { require.Equal(t, marshalConfig2, marshalConfig1) } +func TestFixIncompatible(t *testing.T) { + // Test to fix incompatible states. + testCases := []struct { + info *ChangeFeedInfo + expectedState FeedState + }{ + { + info: &ChangeFeedInfo{ + AdminJobType: AdminStop, + State: StateNormal, + Error: nil, + CreatorVersion: "", + }, + expectedState: StateStopped, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminStop, + State: StateNormal, + Error: nil, + CreatorVersion: "4.0.14", + }, + expectedState: StateStopped, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminStop, + State: StateNormal, + Error: nil, + CreatorVersion: "5.0.5", + }, + expectedState: StateStopped, + }, + } + + for _, tc := range testCases { + tc.info.FixIncompatible() + require.Equal(t, tc.expectedState, tc.info.State) + } +} + +func TestFixState(t *testing.T) { + t.Parallel() + + testCases := []struct { + info *ChangeFeedInfo + expectedState FeedState + }{ + { + info: &ChangeFeedInfo{ + AdminJobType: AdminNone, + State: StateNormal, + Error: nil, + }, + expectedState: StateNormal, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminResume, + State: StateNormal, + Error: nil, + }, + expectedState: StateNormal, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminNone, + State: StateNormal, + Error: &RunningError{ + Code: string(cerror.ErrGCTTLExceeded.RFCCode()), + }, + }, + expectedState: StateFailed, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminResume, + State: StateNormal, + Error: &RunningError{ + Code: string(cerror.ErrGCTTLExceeded.RFCCode()), + }, + }, + expectedState: StateFailed, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminNone, + State: StateNormal, + Error: &RunningError{ + Code: string(cerror.ErrClusterIDMismatch.RFCCode()), + }, + }, + expectedState: StateError, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminResume, + State: StateNormal, + Error: &RunningError{ + Code: string(cerror.ErrClusterIDMismatch.RFCCode()), + }, + }, + expectedState: StateError, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminStop, + State: StateNormal, + Error: nil, + }, + expectedState: StateStopped, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminFinish, + State: StateNormal, + Error: nil, + }, + expectedState: StateFinished, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminRemove, + State: StateNormal, + Error: nil, + }, + expectedState: StateRemoved, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminRemove, + State: StateNormal, + Error: nil, + }, + expectedState: StateRemoved, + }, + } + + for _, tc := range testCases { + tc.info.fixState() + require.Equal(t, tc.expectedState, tc.info.State) + } +} + func TestChangeFeedInfoClone(t *testing.T) { t.Parallel() diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 8cd40ad1a6e..af6c5c4d7ee 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -86,6 +86,10 @@ type Owner struct { logLimiter *rate.Limiter lastTickTime time.Time closed int32 + // bootstrapped specifies whether the owner has been initialized. + // This will only be done when the owner starts the first Tick. + // NOTICE: Do not use it in a method other than tick unexpectedly, as it is not a thread-safe value. + bootstrapped bool newChangefeed func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed } @@ -108,6 +112,8 @@ func NewOwner4Test( pdClient pd.Client, ) *Owner { o := NewOwner(pdClient) + // Most tests do not need to test bootstrap. + o.bootstrapped = true o.newChangefeed = func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed { return newChangefeed4Test(id, gcManager, newDDLPuller, newSink) } @@ -120,8 +126,15 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) failpoint.Return(nil, errors.New("owner run with injected error")) }) failpoint.Inject("sleep-in-owner-tick", nil) - ctx := stdCtx.(cdcContext.Context) state := rawState.(*orchestrator.GlobalReactorState) + // At the first Tick, we need to do a bootstrap operation. + // Fix incompatible or incorrect meta information. + if !o.bootstrapped { + o.Bootstrap(state) + o.bootstrapped = true + return state, nil + } + o.captures = state.Captures o.updateMetrics(state) @@ -143,6 +156,7 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) return nil, errors.Trace(err) } + ctx := stdCtx.(cdcContext.Context) for changefeedID, changefeedState := range state.Changefeeds { if changefeedState.Info == nil { o.cleanUpChangefeed(changefeedState) @@ -256,6 +270,24 @@ func (o *Owner) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) { } } +// Bootstrap checks if the state contains incompatible or incorrect information and tries to fix it. +func (o *Owner) Bootstrap(state *orchestrator.GlobalReactorState) { + log.Info("Start bootstrapping", zap.Any("state", state)) + fixChangefeedInfos(state) +} + +// fixChangefeedInfos attempts to fix incompatible or incorrect meta information in changefeed state. +func fixChangefeedInfos(state *orchestrator.GlobalReactorState) { + for _, changefeedState := range state.Changefeeds { + if changefeedState != nil { + changefeedState.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + info.FixIncompatible() + return info, true, nil + }) + } + } +} + func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) { // Keep the value of prometheus expression `rate(counter)` = 1 // Please also change alert rule in ticdc.rules.yml when change the expression value. diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index fe18e2e69a9..21e55ef397d 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -187,6 +187,44 @@ func (s *ownerSuite) TestStopChangefeed(c *check.C) { c.Assert(state.Changefeeds, check.Not(check.HasKey), changefeedID) } +func (s *ownerSuite) TestFixChangefeedInfos(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(false) + owner, state, tester := createOwner4Test(ctx, c) + // We need to do bootstrap. + owner.bootstrapped = false + changefeedID := "test-changefeed" + // Mismatched state and admin job. + changefeedInfo := &model.ChangeFeedInfo{ + State: model.StateNormal, + AdminJobType: model.AdminStop, + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + CreatorVersion: "4.0.14", + } + changefeedStr, err := changefeedInfo.Marshal() + c.Assert(err, check.IsNil) + cdcKey := etcd.CDCKey{ + Tp: etcd.CDCKeyTypeChangefeedInfo, + ChangefeedID: changefeedID, + } + tester.MustUpdate(cdcKey.String(), []byte(changefeedStr)) + // For the first tick, we do a bootstrap, and it tries to fix the meta information. + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.bootstrapped, check.IsTrue) + c.Assert(owner.changefeeds, check.Not(check.HasKey), changefeedID) + + // Start tick normally. + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.changefeeds, check.HasKey, changefeedID) + // The meta information is fixed correctly. + c.Assert(owner.changefeeds[changefeedID].state.Info.State, check.Equals, model.StateStopped) +} + func (s *ownerSuite) TestCheckClusterVersion(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index ddf4a04229b..9cfc2d101de 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -181,7 +181,7 @@ func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Co return err } - cdcClusterVer, err := version.GetTiCDCClusterVersion(captureInfos) + cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos)) if err != nil { return errors.Trace(err) } diff --git a/pkg/cmd/util/helper.go b/pkg/cmd/util/helper.go index 94348175270..69b3aa0a861 100644 --- a/pkg/cmd/util/helper.go +++ b/pkg/cmd/util/helper.go @@ -25,6 +25,7 @@ import ( "github.com/BurntSushi/toml" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" cmdconetxt "github.com/pingcap/tiflow/pkg/cmd/context" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/logutil" @@ -158,7 +159,7 @@ func VerifyAndGetTiCDCClusterVersion( return version.TiCDCClusterVersion{}, err } - cdcClusterVer, err := version.GetTiCDCClusterVersion(captureInfos) + cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos)) if err != nil { return version.TiCDCClusterVersion{}, err } diff --git a/pkg/orchestrator/reactor_state.go b/pkg/orchestrator/reactor_state.go index c1d253e2308..0c7ec3f4b42 100644 --- a/pkg/orchestrator/reactor_state.go +++ b/pkg/orchestrator/reactor_state.go @@ -210,7 +210,7 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er return errors.Trace(err) } if key.Tp == etcd.CDCKeyTypeChangefeedInfo { - if err := s.Info.VerifyAndFix(); err != nil { + if err := s.Info.VerifyAndComplete(); err != nil { return errors.Trace(err) } } diff --git a/pkg/version/check.go b/pkg/version/check.go index 63e9e29c2b5..1a66957341e 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -25,7 +25,6 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/httputil" "github.com/pingcap/tiflow/pkg/security" @@ -223,16 +222,16 @@ func (v *TiCDCClusterVersion) ShouldEnableUnifiedSorterByDefault() bool { var TiCDCClusterVersionUnknown = TiCDCClusterVersion{} // GetTiCDCClusterVersion returns the version of ticdc cluster -func GetTiCDCClusterVersion(captureInfos []*model.CaptureInfo) (TiCDCClusterVersion, error) { - if len(captureInfos) == 0 { +func GetTiCDCClusterVersion(captureVersion []string) (TiCDCClusterVersion, error) { + if len(captureVersion) == 0 { return TiCDCClusterVersionUnknown, nil } var minVer *semver.Version - for _, captureInfo := range captureInfos { + for _, versionStr := range captureVersion { var ver *semver.Version var err error - if captureInfo.Version != "" { - ver, err = semver.NewVersion(removeVAndHash(captureInfo.Version)) + if versionStr != "" { + ver, err = semver.NewVersion(removeVAndHash(versionStr)) } else { ver = defaultTiCDCVersion } diff --git a/pkg/version/check_test.go b/pkg/version/check_test.go index bd9c54404e0..47dac43a360 100644 --- a/pkg/version/check_test.go +++ b/pkg/version/check_test.go @@ -23,7 +23,6 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tiflow/cdc/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" @@ -188,56 +187,56 @@ func TestReleaseSemver(t *testing.T) { func TestGetTiCDCClusterVersion(t *testing.T) { t.Parallel() testCases := []struct { - captureInfos []*model.CaptureInfo - expected TiCDCClusterVersion + captureVersions []string + expected TiCDCClusterVersion }{ { - captureInfos: []*model.CaptureInfo{}, - expected: TiCDCClusterVersionUnknown, + captureVersions: []string{}, + expected: TiCDCClusterVersionUnknown, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: ""}, - {ID: "capture2", Version: ""}, - {ID: "capture3", Version: ""}, + captureVersions: []string{ + "", + "", + "", }, expected: TiCDCClusterVersion{defaultTiCDCVersion}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "5.0.1"}, - {ID: "capture2", Version: "4.0.7"}, - {ID: "capture3", Version: "5.0.0-rc"}, + captureVersions: []string{ + "5.0.1", + "4.0.7", + "5.0.0-rc", }, expected: TiCDCClusterVersion{semver.New("4.0.7")}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "5.0.0-rc"}, + captureVersions: []string{ + "5.0.0-rc", }, expected: TiCDCClusterVersion{semver.New("5.0.0-rc")}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "5.0.0"}, + captureVersions: []string{ + "5.0.0", }, expected: TiCDCClusterVersion{semver.New("5.0.0")}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "4.1.0"}, + captureVersions: []string{ + "4.1.0", }, expected: TiCDCClusterVersion{semver.New("4.1.0")}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "4.0.10"}, + captureVersions: []string{ + "4.0.10", }, expected: TiCDCClusterVersion{semver.New("4.0.10")}, }, } for _, tc := range testCases { - ver, err := GetTiCDCClusterVersion(tc.captureInfos) + ver, err := GetTiCDCClusterVersion(tc.captureVersions) require.Nil(t, err) require.Equal(t, ver, tc.expected) } diff --git a/pkg/version/creator_version_gate.go b/pkg/version/creator_version_gate.go new file mode 100644 index 00000000000..30102682e06 --- /dev/null +++ b/pkg/version/creator_version_gate.go @@ -0,0 +1,61 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "github.com/coreos/go-semver/semver" +) + +// CreatorVersionGate determines the introduced version and compatibility +// of some features based on the creator's version value. +type CreatorVersionGate struct { + version string +} + +// changefeedStateFromAdminJobVersions specifies the version before +// which we use the admin job type to control the state of the changefeed. +var changefeedStateFromAdminJobVersions = []semver.Version{ + // Introduced in https://github.com/pingcap/ticdc/pull/3014. + *semver.New("4.0.16"), + // Introduced in https://github.com/pingcap/ticdc/pull/2946. + *semver.New("5.0.6"), +} + +// NewCreatorVersionGate creates the creator version gate. +func NewCreatorVersionGate(version string) *CreatorVersionGate { + return &CreatorVersionGate{ + version: version, + } +} + +// ChangefeedStateFromAdminJob determines if admin job is the state +// of changefeed based on the version of the creator. +func (f *CreatorVersionGate) ChangefeedStateFromAdminJob() bool { + // Introduced in https://github.com/pingcap/ticdc/pull/1341. + // The changefeed before it was introduced was using the old owner. + if f.version == "" { + return true + } + + creatorVersion := semver.New(removeVAndHash(f.version)) + for _, version := range changefeedStateFromAdminJobVersions { + // NOTICE: To compare against the same major version. + if creatorVersion.Major == version.Major && + creatorVersion.LessThan(version) { + return true + } + } + + return false +} diff --git a/pkg/version/creator_version_gate_test.go b/pkg/version/creator_version_gate_test.go new file mode 100644 index 00000000000..5e79859e7fa --- /dev/null +++ b/pkg/version/creator_version_gate_test.go @@ -0,0 +1,78 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestChangefeedStateFromAdminJob(t *testing.T) { + t.Parallel() + + testCases := []struct { + creatorVersion string + expected bool + }{ + { + creatorVersion: "", + expected: true, + }, + { + creatorVersion: "4.0.12", + expected: true, + }, + { + creatorVersion: "4.0.14", + expected: true, + }, + { + creatorVersion: "4.0.15", + expected: true, + }, + { + creatorVersion: "4.0.16", + }, + { + creatorVersion: "5.0.0", + expected: true, + }, + { + creatorVersion: "5.0.1", + expected: true, + }, + { + creatorVersion: "5.0.6", + expected: false, + }, + { + creatorVersion: "5.1.0", + expected: false, + }, + { + creatorVersion: "5.2.0", + expected: false, + }, + { + creatorVersion: "5.3.0", + expected: false, + }, + } + + for _, tc := range testCases { + creatorVersionGate := CreatorVersionGate{version: tc.creatorVersion} + require.Equal(t, tc.expected, creatorVersionGate.ChangefeedStateFromAdminJob()) + } +}