From dee031500d8ccf04c567afeb431a85ecf8bd0747 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 14 Jan 2022 13:45:43 +0800 Subject: [PATCH] owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838) (#3863) --- cdc/http_api_handler.go | 21 +++- cdc/model/capture.go | 10 ++ cdc/model/capture_test.go | 18 +++ cdc/model/changefeed.go | 59 ++++++++- cdc/model/changefeed_test.go | 149 ++++++++++++++++++++++- cdc/model/reactor_state.go | 2 +- cdc/owner/owner.go | 34 +++++- cdc/owner/owner_test.go | 38 ++++++ cdc/sink/manager.go | 2 +- cmd/client_changefeed.go | 2 +- cmd/util.go | 2 +- pkg/httputil/httputil.go | 19 --- pkg/version/check.go | 12 +- pkg/version/check_test.go | 43 ++++--- pkg/version/creator_version_gate.go | 61 ++++++++++ pkg/version/creator_version_gate_test.go | 79 ++++++++++++ 16 files changed, 489 insertions(+), 62 deletions(-) create mode 100644 pkg/version/creator_version_gate.go create mode 100644 pkg/version/creator_version_gate_test.go diff --git a/cdc/http_api_handler.go b/cdc/http_api_handler.go index 2bbd90bdc76..34b4edcffe2 100644 --- a/cdc/http_api_handler.go +++ b/cdc/http_api_handler.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/httputil" "go.uber.org/zap" ) @@ -99,7 +98,7 @@ func (s *Server) handleChangefeedsList(w http.ResponseWriter, req *http.Request) writeInternalServerErrorJSON(w, err) return } - if !httputil.IsFiltered(state, cfInfo.State) { + if !isFiltered(state, cfInfo.State) { continue } cfStatus, _, err := s.etcdClient.GetChangeFeedStatus(req.Context(), changefeedID) @@ -144,3 +143,21 @@ func writeErrorJSON(w http.ResponseWriter, statusCode int, cerr errors.Error) { log.Error("fail to write data", zap.Error(err)) } } + +// isFiltered return true if the given feedState matches the whiteList. +func isFiltered(whiteList string, feedState model.FeedState) bool { + if whiteList == "all" { + return true + } + if whiteList == "" { + switch feedState { + case model.StateNormal: + return true + case model.StateStopped: + return true + case model.StateFailed: + return true + } + } + return whiteList == string(feedState) +} diff --git a/cdc/model/capture.go b/cdc/model/capture.go index aa2aa76331b..22c95e2a48e 100644 --- a/cdc/model/capture.go +++ b/cdc/model/capture.go @@ -43,3 +43,13 @@ func (c *CaptureInfo) Unmarshal(data []byte) error { return errors.Annotatef(cerror.WrapError(cerror.ErrUnmarshalFailed, err), "unmarshal data: %v", data) } + +// ListVersionsFromCaptureInfos returns the version list of the CaptureInfo list. +func ListVersionsFromCaptureInfos(captureInfos []*CaptureInfo) []string { + var captureVersions []string + for _, ci := range captureInfos { + captureVersions = append(captureVersions, ci.Version) + } + + return captureVersions +} diff --git a/cdc/model/capture_test.go b/cdc/model/capture_test.go index f5358c301bb..8d7aa60477f 100644 --- a/cdc/model/capture_test.go +++ b/cdc/model/capture_test.go @@ -38,3 +38,21 @@ func (s *captureSuite) TestMarshalUnmarshal(c *check.C) { c.Assert(err, check.IsNil) c.Assert(decodedInfo, check.DeepEquals, info) } + +func (s *captureSuite) TestListVersionsFromCaptureInfos(c *check.C) { + defer testleak.AfterTest(c)() + infos := []*CaptureInfo{ + { + ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891", + AdvertiseAddr: "127.0.0.1:8300", + Version: "dev", + }, + { + ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891", + AdvertiseAddr: "127.0.0.1:8300", + Version: "", + }, + } + + c.Assert(ListVersionsFromCaptureInfos(infos), check.DeepEquals, []string{"dev", ""}) +} diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index ebc129382f2..87158608f41 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/cyclic/mark" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" ) @@ -48,7 +49,7 @@ const ( StateError FeedState = "error" StateFailed FeedState = "failed" StateStopped FeedState = "stopped" - StateRemoved FeedState = "removed" // deprecated, will be removed in the next version + StateRemoved FeedState = "removed" StateFinished FeedState = "finished" ) @@ -208,10 +209,10 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) { return cloned, err } -// VerifyAndFix verifies changefeed info and may fillin some fields. -// If a must field is not provided, return an error. -// If some necessary filed is missing but can use a default value, fillin it. -func (info *ChangeFeedInfo) VerifyAndFix() error { +// VerifyAndComplete verifies changefeed info and may fill in some fields. +// If a required field is not provided, return an error. +// If some necessary filed is missing but can use a default value, fill in it. +func (info *ChangeFeedInfo) VerifyAndComplete() error { defaultConfig := config.GetDefaultReplicaConfig() if info.Engine == "" { info.Engine = SortUnified @@ -234,6 +235,54 @@ func (info *ChangeFeedInfo) VerifyAndFix() error { return nil } +// FixIncompatible fixes incompatible changefeed meta info. +func (info *ChangeFeedInfo) FixIncompatible() { + creatorVersionGate := version.NewCreatorVersionGate(info.CreatorVersion) + if creatorVersionGate.ChangefeedStateFromAdminJob() { + log.Info("Start fixing incompatible changefeed state", zap.Any("changefeed", info)) + info.fixState() + log.Info("Fix incompatibility changefeed state completed", zap.Any("changefeed", info)) + } +} + +// fixState attempts to fix state loss from upgrading the old owner to the new owner. +func (info *ChangeFeedInfo) fixState() { + // Notice: In the old owner we used AdminJobType field to determine if the task was paused or not, + // we need to handle this field in the new owner. + // Otherwise, we will see that the old version of the task is paused and then upgraded, + // and the task is automatically resumed after the upgrade. + state := info.State + // Upgrading from an old owner, we need to deal with cases where the state is normal, + // but actually contains errors and does not match the admin job type. + if state == StateNormal { + switch info.AdminJobType { + // This corresponds to the case of failure or error. + case AdminNone, AdminResume: + if info.Error != nil { + if cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) { + state = StateFailed + } else { + state = StateError + } + } + case AdminStop: + state = StateStopped + case AdminFinish: + state = StateFinished + case AdminRemove: + state = StateRemoved + } + } + + if state != info.State { + log.Info("handle old owner inconsistent state", + zap.String("old state", string(info.State)), + zap.String("admin job type", info.AdminJobType.String()), + zap.String("new state", string(state))) + info.State = state + } +} + // CheckErrorHistory checks error history of a changefeed // if having error record older than GC interval, set needSave to true. // if error counts reach threshold, set canInit to false. diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 1bcd869c5c5..43ea14afcae 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -165,7 +165,7 @@ func (s *configSuite) TestFillV1(c *check.C) { }) } -func (s *configSuite) TestVerifyAndFix(c *check.C) { +func (s *configSuite) TestVerifyAndComplete(c *check.C) { defer testleak.AfterTest(c)() info := &ChangeFeedInfo{ SinkURI: "blackhole://", @@ -178,7 +178,7 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) { }, } - err := info.VerifyAndFix() + err := info.VerifyAndComplete() c.Assert(err, check.IsNil) c.Assert(info.Engine, check.Equals, SortUnified) @@ -358,3 +358,148 @@ func (s *changefeedSuite) TestGetTs(c *check.C) { status := &ChangeFeedStatus{CheckpointTs: checkpointTs} c.Assert(info.GetCheckpointTs(status), check.Equals, checkpointTs) } + +func (s *changefeedSuite) TestFixIncompatible(c *check.C) { + defer testleak.AfterTest(c)() + // Test to fix incompatible states. + testCases := []struct { + info *ChangeFeedInfo + expectedState FeedState + }{ + { + info: &ChangeFeedInfo{ + AdminJobType: AdminStop, + State: StateNormal, + Error: nil, + CreatorVersion: "", + }, + expectedState: StateStopped, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminStop, + State: StateNormal, + Error: nil, + CreatorVersion: "4.0.14", + }, + expectedState: StateStopped, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminStop, + State: StateNormal, + Error: nil, + CreatorVersion: "5.0.5", + }, + expectedState: StateStopped, + }, + } + + for _, tc := range testCases { + tc.info.FixIncompatible() + c.Assert(tc.info.State, check.Equals, tc.expectedState) + } +} + +func (s *changefeedSuite) TestFixState(c *check.C) { + defer testleak.AfterTest(c)() + + testCases := []struct { + info *ChangeFeedInfo + expectedState FeedState + }{ + { + info: &ChangeFeedInfo{ + AdminJobType: AdminNone, + State: StateNormal, + Error: nil, + }, + expectedState: StateNormal, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminResume, + State: StateNormal, + Error: nil, + }, + expectedState: StateNormal, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminNone, + State: StateNormal, + Error: &RunningError{ + Code: string(cerror.ErrGCTTLExceeded.RFCCode()), + }, + }, + expectedState: StateFailed, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminResume, + State: StateNormal, + Error: &RunningError{ + Code: string(cerror.ErrGCTTLExceeded.RFCCode()), + }, + }, + expectedState: StateFailed, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminNone, + State: StateNormal, + Error: &RunningError{ + Code: string(cerror.ErrUnknownSortEngine.RFCCode()), + }, + }, + expectedState: StateError, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminResume, + State: StateNormal, + Error: &RunningError{ + Code: string(cerror.ErrUnknownSortEngine.RFCCode()), + }, + }, + expectedState: StateError, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminStop, + State: StateNormal, + Error: nil, + }, + expectedState: StateStopped, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminFinish, + State: StateNormal, + Error: nil, + }, + expectedState: StateFinished, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminRemove, + State: StateNormal, + Error: nil, + }, + expectedState: StateRemoved, + }, + { + info: &ChangeFeedInfo{ + AdminJobType: AdminRemove, + State: StateNormal, + Error: nil, + }, + expectedState: StateRemoved, + }, + } + + for _, tc := range testCases { + tc.info.fixState() + c.Assert(tc.info.State, check.Equals, tc.expectedState) + } +} diff --git a/cdc/model/reactor_state.go b/cdc/model/reactor_state.go index 05a64f145bb..d91bac874cc 100644 --- a/cdc/model/reactor_state.go +++ b/cdc/model/reactor_state.go @@ -209,7 +209,7 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er return errors.Trace(err) } if key.Tp == etcd.CDCKeyTypeChangefeedInfo { - if err := s.Info.VerifyAndFix(); err != nil { + if err := s.Info.VerifyAndComplete(); err != nil { return errors.Trace(err) } } diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 17fb2d6a0b3..53ba3bbf62e 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -76,6 +76,10 @@ type Owner struct { lastTickTime time.Time closed int32 + // bootstrapped specifies whether the owner has been initialized. + // This will only be done when the owner starts the first Tick. + // NOTICE: Do not use it in a method other than tick unexpectedly, as it is not a thread-safe value. + bootstrapped bool newChangefeed func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed } @@ -97,6 +101,8 @@ func NewOwner4Test( pdClient pd.Client, ) *Owner { o := NewOwner(pdClient) + // Most tests do not need to test bootstrap. + o.bootstrapped = true o.newChangefeed = func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed { return newChangefeed4Test(id, gcManager, newDDLPuller, newSink) } @@ -109,8 +115,15 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) failpoint.Return(nil, errors.New("owner run with injected error")) }) failpoint.Inject("sleep-in-owner-tick", nil) - ctx := stdCtx.(cdcContext.Context) state := rawState.(*model.GlobalReactorState) + // At the first Tick, we need to do a bootstrap operation. + // Fix incompatible or incorrect meta information. + if !o.bootstrapped { + o.Bootstrap(state) + o.bootstrapped = true + return state, nil + } + o.updateMetrics(state) if !o.clusterVersionConsistent(state.Captures) { // sleep one second to avoid printing too much log @@ -128,6 +141,7 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) } o.handleJobs() + ctx := stdCtx.(cdcContext.Context) for changefeedID, changefeedState := range state.Changefeeds { if changefeedState.Info == nil { o.cleanUpChangefeed(changefeedState) @@ -238,6 +252,24 @@ func (o *Owner) cleanUpChangefeed(state *model.ChangefeedReactorState) { } } +// Bootstrap checks if the state contains incompatible or incorrect information and tries to fix it. +func (o *Owner) Bootstrap(state *model.GlobalReactorState) { + log.Info("Start bootstrapping", zap.Any("state", state)) + fixChangefeedInfos(state) +} + +// fixChangefeedInfos attempts to fix incompatible or incorrect meta information in changefeed state. +func fixChangefeedInfos(state *model.GlobalReactorState) { + for _, changefeedState := range state.Changefeeds { + if changefeedState != nil { + changefeedState.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + info.FixIncompatible() + return info, true, nil + }) + } + } +} + func (o *Owner) updateMetrics(state *model.GlobalReactorState) { // Keep the value of prometheus expression `rate(counter)` = 1 // Please also change alert rule in ticdc.rules.yml when change the expression value. diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 2086e6aa0bb..98cf891b0db 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -187,6 +187,44 @@ func (s *ownerSuite) TestStopChangefeed(c *check.C) { c.Assert(state.Changefeeds, check.Not(check.HasKey), changefeedID) } +func (s *ownerSuite) TestFixChangefeedInfos(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(false) + owner, state, tester := createOwner4Test(ctx, c) + // We need to do bootstrap. + owner.bootstrapped = false + changefeedID := "test-changefeed" + // Mismatched state and admin job. + changefeedInfo := &model.ChangeFeedInfo{ + State: model.StateNormal, + AdminJobType: model.AdminStop, + StartTs: oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0), + Config: config.GetDefaultReplicaConfig(), + CreatorVersion: "4.0.14", + } + changefeedStr, err := changefeedInfo.Marshal() + c.Assert(err, check.IsNil) + cdcKey := etcd.CDCKey{ + Tp: etcd.CDCKeyTypeChangefeedInfo, + ChangefeedID: changefeedID, + } + tester.MustUpdate(cdcKey.String(), []byte(changefeedStr)) + // For the first tick, we do a bootstrap, and it tries to fix the meta information. + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.bootstrapped, check.IsTrue) + c.Assert(owner.changefeeds, check.Not(check.HasKey), changefeedID) + + // Start tick normally. + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.changefeeds, check.HasKey, changefeedID) + // The meta information is fixed correctly. + c.Assert(owner.changefeeds[changefeedID].state.Info.State, check.Equals, model.StateStopped) +} + func (s *ownerSuite) TestCheckClusterVersion(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index 1697a0b8cba..78959047142 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -162,7 +162,7 @@ func (m *Manager) getCheckpointTs(tableID model.TableID) uint64 { return atomic.LoadUint64(&m.changeFeedCheckpointTs) } -// UpdateChangeFeedCheckpointTs update the changeFeedCheckpointTs every processor tick +// UpdateChangeFeedCheckpointTs updates changefeed and backend sink checkpoint ts. func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) { atomic.StoreUint64(&m.changeFeedCheckpointTs, checkpointTs) if m.backendSink != nil { diff --git a/cmd/client_changefeed.go b/cmd/client_changefeed.go index 83096636c26..ada78c85810 100644 --- a/cmd/client_changefeed.go +++ b/cmd/client_changefeed.go @@ -484,7 +484,7 @@ func newCreateChangefeedCommand() *cobra.Command { if err != nil { return err } - cdcClusterVer, err := version.GetTiCDCClusterVersion(captureInfos) + cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos)) if err != nil { return err } diff --git a/cmd/util.go b/cmd/util.go index 806ced3b54c..1738885be9c 100644 --- a/cmd/util.go +++ b/cmd/util.go @@ -427,7 +427,7 @@ func verifyAndGetTiCDCClusterVersion( if err != nil { return version.TiCDCClusterVersion{}, err } - cdcClusterVer, err := version.GetTiCDCClusterVersion(captureInfos) + cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos)) if err != nil { return version.TiCDCClusterVersion{}, err } diff --git a/pkg/httputil/httputil.go b/pkg/httputil/httputil.go index a212b7a6802..012a84dc4d6 100644 --- a/pkg/httputil/httputil.go +++ b/pkg/httputil/httputil.go @@ -16,7 +16,6 @@ package httputil import ( "net/http" - "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/security" ) @@ -44,21 +43,3 @@ func NewClient(credential *security.Credential) (*Client, error) { Client: http.Client{Transport: transport}, }, nil } - -// IsFiltered return true if the given feedState matches the whiteList. -func IsFiltered(whiteList string, feedState model.FeedState) bool { - if whiteList == "all" { - return true - } - if whiteList == "" { - switch feedState { - case model.StateNormal: - return true - case model.StateStopped: - return true - case model.StateFailed: - return true - } - } - return whiteList == string(feedState) -} diff --git a/pkg/version/check.go b/pkg/version/check.go index 0593082c5e0..3074870e7fc 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -22,8 +22,6 @@ import ( "regexp" "strings" - "github.com/pingcap/tiflow/cdc/model" - "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -211,16 +209,16 @@ func (v *TiCDCClusterVersion) ShouldEnableUnifiedSorterByDefault() bool { var TiCDCClusterVersionUnknown = TiCDCClusterVersion{} // GetTiCDCClusterVersion returns the version of ticdc cluster -func GetTiCDCClusterVersion(captureInfos []*model.CaptureInfo) (TiCDCClusterVersion, error) { - if len(captureInfos) == 0 { +func GetTiCDCClusterVersion(captureVersion []string) (TiCDCClusterVersion, error) { + if len(captureVersion) == 0 { return TiCDCClusterVersionUnknown, nil } var minVer *semver.Version - for _, captureInfo := range captureInfos { + for _, versionStr := range captureVersion { var ver *semver.Version var err error - if captureInfo.Version != "" { - ver, err = semver.NewVersion(removeVAndHash(captureInfo.Version)) + if versionStr != "" { + ver, err = semver.NewVersion(removeVAndHash(versionStr)) } else { ver = defaultTiCDCVersion } diff --git a/pkg/version/check_test.go b/pkg/version/check_test.go index 87f64fd76bc..e09c9ec3d64 100644 --- a/pkg/version/check_test.go +++ b/pkg/version/check_test.go @@ -24,7 +24,6 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/util/testleak" pd "github.com/tikv/pd/client" "github.com/tikv/pd/pkg/tempurl" @@ -196,56 +195,56 @@ func (s *checkSuite) TestReleaseSemver(c *check.C) { func (s *checkSuite) TestGetTiCDCClusterVersion(c *check.C) { defer testleak.AfterTest(c)() testCases := []struct { - captureInfos []*model.CaptureInfo - expected TiCDCClusterVersion + captureVersions []string + expected TiCDCClusterVersion }{ { - captureInfos: []*model.CaptureInfo{}, - expected: TiCDCClusterVersionUnknown, + captureVersions: []string{}, + expected: TiCDCClusterVersionUnknown, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: ""}, - {ID: "capture2", Version: ""}, - {ID: "capture3", Version: ""}, + captureVersions: []string{ + "", + "", + "", }, expected: TiCDCClusterVersion{defaultTiCDCVersion}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "5.0.1"}, - {ID: "capture2", Version: "4.0.7"}, - {ID: "capture3", Version: "5.0.0-rc"}, + captureVersions: []string{ + "5.0.1", + "4.0.7", + "5.0.0-rc", }, expected: TiCDCClusterVersion{semver.New("4.0.7")}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "5.0.0-rc"}, + captureVersions: []string{ + "5.0.0-rc", }, expected: TiCDCClusterVersion{semver.New("5.0.0-rc")}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "5.0.0"}, + captureVersions: []string{ + "5.0.0", }, expected: TiCDCClusterVersion{semver.New("5.0.0")}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "4.1.0"}, + captureVersions: []string{ + "4.1.0", }, expected: TiCDCClusterVersion{semver.New("4.1.0")}, }, { - captureInfos: []*model.CaptureInfo{ - {ID: "capture1", Version: "4.0.10"}, + captureVersions: []string{ + "4.0.10", }, expected: TiCDCClusterVersion{semver.New("4.0.10")}, }, } for _, tc := range testCases { - ver, err := GetTiCDCClusterVersion(tc.captureInfos) + ver, err := GetTiCDCClusterVersion(tc.captureVersions) c.Assert(err, check.IsNil) c.Assert(ver, check.DeepEquals, tc.expected) } diff --git a/pkg/version/creator_version_gate.go b/pkg/version/creator_version_gate.go new file mode 100644 index 00000000000..f75a204bf25 --- /dev/null +++ b/pkg/version/creator_version_gate.go @@ -0,0 +1,61 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "github.com/coreos/go-semver/semver" +) + +// CreatorVersionGate determines the introduced version and compatibility +// of some features based on the creator's version value. +type CreatorVersionGate struct { + version string +} + +// changefeedStateFromAdminJobVersions specifies the version before +// which we use the admin job type to control the state of the changefeed. +var changefeedStateFromAdminJobVersions = []semver.Version{ + // Introduced in https://github.com/pingcap/ticdc/pull/3014. + *semver.New("4.0.16"), + // Introduced in https://github.com/pingcap/ticdc/pull/2946. + *semver.New("5.0.6"), +} + +// NewCreatorVersionGate creates the creator version gate. +func NewCreatorVersionGate(version string) *CreatorVersionGate { + return &CreatorVersionGate{ + version: version, + } +} + +// ChangefeedStateFromAdminJob determines if admin job is the state +// of changefeed based on the version of the creator. +func (f *CreatorVersionGate) ChangefeedStateFromAdminJob() bool { + // Introduced in https://github.com/pingcap/ticdc/pull/1341. + // The changefeed before it was introduced was using the old owner. + if f.version == "" { + return true + } + + creatorVersion := semver.New(removeVAndHash(f.version)) + for _, version := range changefeedStateFromAdminJobVersions { + // NOTICE: To compare against the same major version. + if creatorVersion.Major == version.Major && + creatorVersion.LessThan(version) { + return true + } + } + + return false +} diff --git a/pkg/version/creator_version_gate_test.go b/pkg/version/creator_version_gate_test.go new file mode 100644 index 00000000000..4b28c2bc4ca --- /dev/null +++ b/pkg/version/creator_version_gate_test.go @@ -0,0 +1,79 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestChangefeedStateFromAdminJob(t *testing.T) { + t.Parallel() + + testCases := []struct { + creatorVersion string + expected bool + }{ + { + creatorVersion: "", + expected: true, + }, + { + creatorVersion: "4.0.12", + expected: true, + }, + { + creatorVersion: "4.0.14", + expected: true, + }, + { + creatorVersion: "4.0.15", + expected: true, + }, + { + creatorVersion: "4.0.16", + expected: false, + }, + { + creatorVersion: "5.0.0", + expected: true, + }, + { + creatorVersion: "5.0.1", + expected: true, + }, + { + creatorVersion: "5.0.6", + expected: false, + }, + { + creatorVersion: "5.1.0", + expected: false, + }, + { + creatorVersion: "5.2.0", + expected: false, + }, + { + creatorVersion: "5.3.0", + expected: false, + }, + } + + for _, tc := range testCases { + creatorVersionGate := CreatorVersionGate{version: tc.creatorVersion} + require.Equal(t, tc.expected, creatorVersionGate.ChangefeedStateFromAdminJob()) + } +}