Skip to content

Commit

Permalink
owner(ticdc): Add bootstrap and try to fix the meta information in it (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 14, 2022
1 parent 2a587ec commit 433fdb9
Show file tree
Hide file tree
Showing 14 changed files with 471 additions and 41 deletions.
2 changes: 1 addition & 1 deletion cdc/capture/http_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
return nil, err
}
// set sortEngine and EnableOldValue
cdcClusterVer, err := version.GetTiCDCClusterVersion(captureInfos)
cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos))
if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions cdc/model/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ func (c *CaptureInfo) Unmarshal(data []byte) error {
return errors.Annotatef(cerror.WrapError(cerror.ErrUnmarshalFailed, err),
"unmarshal data: %v", data)
}

// ListVersionsFromCaptureInfos returns the version list of the CaptureInfo list.
func ListVersionsFromCaptureInfos(captureInfos []*CaptureInfo) []string {
var captureVersions []string
for _, ci := range captureInfos {
captureVersions = append(captureVersions, ci.Version)
}

return captureVersions
}
17 changes: 17 additions & 0 deletions cdc/model/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,20 @@ func (s *captureSuite) TestMarshalUnmarshal(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(decodedInfo, check.DeepEquals, info)
}

func (s *captureSuite) TestListVersionsFromCaptureInfos(c *check.C) {
defer testleak.AfterTest(c)()
infos := []*CaptureInfo{
{
ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891",
AdvertiseAddr: "127.0.0.1:8300",
Version: "dev",
},
{
ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891",
AdvertiseAddr: "127.0.0.1:8300",
Version: "",
},
}
c.Assert(ListVersionsFromCaptureInfos(infos), check.DeepEquals, []string{"dev", ""})
}
59 changes: 54 additions & 5 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)
Expand All @@ -48,7 +49,7 @@ const (
StateError FeedState = "error"
StateFailed FeedState = "failed"
StateStopped FeedState = "stopped"
StateRemoved FeedState = "removed" // deprecated, will be removed in the next version
StateRemoved FeedState = "removed"
StateFinished FeedState = "finished"
)

Expand Down Expand Up @@ -226,10 +227,10 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) {
return cloned, err
}

// VerifyAndFix verifies changefeed info and may fillin some fields.
// If a must field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fillin it.
func (info *ChangeFeedInfo) VerifyAndFix() error {
// VerifyAndComplete verifies changefeed info and may fill in some fields.
// If a required field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fill in it.
func (info *ChangeFeedInfo) VerifyAndComplete() error {
defaultConfig := config.GetDefaultReplicaConfig()
if info.Engine == "" {
info.Engine = SortUnified
Expand All @@ -252,6 +253,54 @@ func (info *ChangeFeedInfo) VerifyAndFix() error {
return nil
}

// FixIncompatible fixes incompatible changefeed meta info.
func (info *ChangeFeedInfo) FixIncompatible() {
creatorVersionGate := version.NewCreatorVersionGate(info.CreatorVersion)
if creatorVersionGate.ChangefeedStateFromAdminJob() {
log.Info("Start fixing incompatible changefeed state", zap.Any("changefeed", info))
info.fixState()
log.Info("Fix incompatibility changefeed state completed", zap.Any("changefeed", info))
}
}

// fixState attempts to fix state loss from upgrading the old owner to the new owner.
func (info *ChangeFeedInfo) fixState() {
// Notice: In the old owner we used AdminJobType field to determine if the task was paused or not,
// we need to handle this field in the new owner.
// Otherwise, we will see that the old version of the task is paused and then upgraded,
// and the task is automatically resumed after the upgrade.
state := info.State
// Upgrading from an old owner, we need to deal with cases where the state is normal,
// but actually contains errors and does not match the admin job type.
if state == StateNormal {
switch info.AdminJobType {
// This corresponds to the case of failure or error.
case AdminNone, AdminResume:
if info.Error != nil {
if cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) {
state = StateFailed
} else {
state = StateError
}
}
case AdminStop:
state = StateStopped
case AdminFinish:
state = StateFinished
case AdminRemove:
state = StateRemoved
}
}

if state != info.State {
log.Info("handle old owner inconsistent state",
zap.String("old state", string(info.State)),
zap.String("admin job type", info.AdminJobType.String()),
zap.String("new state", string(state)))
info.State = state
}
}

// CheckErrorHistory checks error history of a changefeed
// if having error record older than GC interval, set needSave to true.
// if error counts reach threshold, set canInit to false.
Expand Down
150 changes: 148 additions & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *configSuite) TestFillV1(c *check.C) {
})
}

func (s *configSuite) TestVerifyAndFix(c *check.C) {
func (s *configSuite) TestVerifyAndComplete(c *check.C) {
defer testleak.AfterTest(c)()
info := &ChangeFeedInfo{
SinkURI: "blackhole://",
Expand All @@ -178,7 +178,7 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) {
},
}

err := info.VerifyAndFix()
err := info.VerifyAndComplete()
c.Assert(err, check.IsNil)
c.Assert(info.Engine, check.Equals, SortUnified)

Expand All @@ -190,6 +190,152 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) {
c.Assert(marshalConfig1, check.Equals, marshalConfig2)
}

func (s *configSuite) TestFixIncompatible(c *check.C) {
defer testleak.AfterTest(c)()

// Test to fix incompatible states.
testCases := []struct {
info *ChangeFeedInfo
expectedState FeedState
}{
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "",
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "4.0.14",
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "5.0.5",
},
expectedState: StateStopped,
},
}

for _, tc := range testCases {
tc.info.FixIncompatible()
c.Assert(tc.info.State, check.Equals, tc.expectedState)
}
}

func (s *configSuite) TestFixState(c *check.C) {
defer testleak.AfterTest(c)()

testCases := []struct {
info *ChangeFeedInfo
expectedState FeedState
}{
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: nil,
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: nil,
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrKafkaNewSaramaProducer.RFCCode()),
},
},
expectedState: StateError,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrKafkaNewSaramaProducer.RFCCode()),
},
},
expectedState: StateError,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminFinish,
State: StateNormal,
Error: nil,
},
expectedState: StateFinished,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminRemove,
State: StateNormal,
Error: nil,
},
expectedState: StateRemoved,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminRemove,
State: StateNormal,
Error: nil,
},
expectedState: StateRemoved,
},
}

for _, tc := range testCases {
tc.info.fixState()
c.Assert(tc.info.State, check.Equals, tc.expectedState)
}
}

func (s *configSuite) TestChangeFeedInfoClone(c *check.C) {
defer testleak.AfterTest(c)()
info := &ChangeFeedInfo{
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er
return errors.Trace(err)
}
if key.Tp == etcd.CDCKeyTypeChangefeedInfo {
if err := s.Info.VerifyAndFix(); err != nil {
if err := s.Info.VerifyAndComplete(); err != nil {
return errors.Trace(err)
}
}
Expand Down
35 changes: 34 additions & 1 deletion cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ type Owner struct {

lastTickTime time.Time

// bootstrapped specifies whether the owner has been initialized.
// This will only be done when the owner starts the first Tick.
// NOTICE: Do not use it in a method other than tick unexpectedly, as it is not a thread-safe value.
bootstrapped bool

closed int32

newChangefeed func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed
Expand All @@ -97,6 +102,8 @@ func NewOwner4Test(
pdClient pd.Client,
) *Owner {
o := NewOwner(pdClient)
// Most tests do not need to test bootstrap.
o.bootstrapped = true
o.newChangefeed = func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
return newChangefeed4Test(id, gcManager, newDDLPuller, newSink)
}
Expand All @@ -109,8 +116,15 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState)
failpoint.Return(nil, errors.New("owner run with injected error"))
})
failpoint.Inject("sleep-in-owner-tick", nil)
ctx := stdCtx.(cdcContext.Context)
state := rawState.(*model.GlobalReactorState)
// At the first Tick, we need to do a bootstrap operation.
// Fix incompatible or incorrect meta information.
if !o.bootstrapped {
o.Bootstrap(state)
o.bootstrapped = true
return state, nil
}

o.updateMetrics(state)
if !o.clusterVersionConsistent(state.Captures) {
// sleep one second to avoid printing too much log
Expand All @@ -128,6 +142,7 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState)
}

o.handleJobs()
ctx := stdCtx.(cdcContext.Context)
for changefeedID, changefeedState := range state.Changefeeds {
if changefeedState.Info == nil {
o.cleanUpChangefeed(changefeedState)
Expand Down Expand Up @@ -238,6 +253,24 @@ func (o *Owner) cleanUpChangefeed(state *model.ChangefeedReactorState) {
}
}

// Bootstrap checks if the state contains incompatible or incorrect information and tries to fix it.
func (o *Owner) Bootstrap(state *model.GlobalReactorState) {
log.Info("Start bootstrapping", zap.Any("state", state))
fixChangefeedInfos(state)
}

// fixChangefeedInfos attempts to fix incompatible or incorrect meta information in changefeed state.
func fixChangefeedInfos(state *model.GlobalReactorState) {
for _, changefeedState := range state.Changefeeds {
if changefeedState != nil {
changefeedState.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
info.FixIncompatible()
return info, true, nil
})
}
}
}

func (o *Owner) updateMetrics(state *model.GlobalReactorState) {
// Keep the value of prometheus expression `rate(counter)` = 1
// Please also change alert rule in ticdc.rules.yml when change the expression value.
Expand Down
Loading

0 comments on commit 433fdb9

Please sign in to comment.