Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838) #3861

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions cdc/http_api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/httputil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -99,7 +98,7 @@ func (s *Server) handleChangefeedsList(w http.ResponseWriter, req *http.Request)
writeInternalServerErrorJSON(w, err)
return
}
if !httputil.IsFiltered(state, cfInfo.State) {
if !isFiltered(state, cfInfo.State) {
continue
}
cfStatus, _, err := s.etcdClient.GetChangeFeedStatus(req.Context(), changefeedID)
Expand Down Expand Up @@ -144,3 +143,21 @@ func writeErrorJSON(w http.ResponseWriter, statusCode int, cerr errors.Error) {
log.Error("fail to write data", zap.Error(err))
}
}

// isFiltered return true if the given feedState matches the whiteList.
func isFiltered(whiteList string, feedState model.FeedState) bool {
if whiteList == "all" {
return true
}
if whiteList == "" {
switch feedState {
case model.StateNormal:
return true
case model.StateStopped:
return true
case model.StateFailed:
return true
}
}
return whiteList == string(feedState)
}
55 changes: 52 additions & 3 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/version"
"go.uber.org/zap"
)

Expand All @@ -48,7 +49,7 @@ const (
StateError FeedState = "error"
StateFailed FeedState = "failed"
StateStopped FeedState = "stopped"
StateRemoved FeedState = "removed" // deprecated, will be removed in the next version
StateRemoved FeedState = "removed"
StateFinished FeedState = "finished"
)

Expand Down Expand Up @@ -208,10 +209,10 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) {
return cloned, err
}

// VerifyAndFix verifies changefeed info and may fillin some fields.
// VerifyAndComplete verifies changefeed info and may fillin some fields.
// If a must field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fillin it.
func (info *ChangeFeedInfo) VerifyAndFix() error {
func (info *ChangeFeedInfo) VerifyAndComplete() error {
defaultConfig := config.GetDefaultReplicaConfig()
if info.Engine == "" {
info.Engine = SortUnified
Expand All @@ -234,6 +235,54 @@ func (info *ChangeFeedInfo) VerifyAndFix() error {
return nil
}

// FixIncompatible fixes incompatible changefeed meta info.
func (info *ChangeFeedInfo) FixIncompatible() {
creatorVersionGate := version.NewCreatorVersionGate(info.CreatorVersion)
if creatorVersionGate.ChangefeedStateFromAdminJob() {
log.Info("Start fixing incompatible changefeed state", zap.Any("changefeed", info))
info.fixState()
log.Info("Fix incompatibility changefeed state completed", zap.Any("changefeed", info))
}
}

// fixState attempts to fix state loss from upgrading the old owner to the new owner.
func (info *ChangeFeedInfo) fixState() {
// Notice: In the old owner we used AdminJobType field to determine if the task was paused or not,
// we need to handle this field in the new owner.
// Otherwise, we will see that the old version of the task is paused and then upgraded,
// and the task is automatically resumed after the upgrade.
state := info.State
// Upgrading from an old owner, we need to deal with cases where the state is normal,
// but actually contains errors and does not match the admin job type.
if state == StateNormal {
switch info.AdminJobType {
// This corresponds to the case of failure or error.
case AdminNone, AdminResume:
if info.Error != nil {
if cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) {
state = StateFailed
} else {
state = StateError
}
}
case AdminStop:
state = StateStopped
case AdminFinish:
state = StateFinished
case AdminRemove:
state = StateRemoved
}
}

if state != info.State {
log.Info("handle old owner inconsistent state",
zap.String("old state", string(info.State)),
zap.String("admin job type", info.AdminJobType.String()),
zap.String("new state", string(state)))
info.State = state
}
}

// CheckErrorHistory checks error history of a changefeed
// if having error record older than GC interval, set needSave to true.
// if error counts reach threshold, set canInit to false.
Expand Down
149 changes: 147 additions & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *configSuite) TestFillV1(c *check.C) {
})
}

func (s *configSuite) TestVerifyAndFix(c *check.C) {
func (s *configSuite) TestVerifyAndComplete(c *check.C) {
defer testleak.AfterTest(c)()
info := &ChangeFeedInfo{
SinkURI: "blackhole://",
Expand All @@ -177,7 +177,7 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) {
},
}

err := info.VerifyAndFix()
err := info.VerifyAndComplete()
c.Assert(err, check.IsNil)
c.Assert(info.Engine, check.Equals, SortUnified)

Expand Down Expand Up @@ -357,3 +357,148 @@ func (s *changefeedSuite) TestGetTs(c *check.C) {
status := &ChangeFeedStatus{CheckpointTs: checkpointTs}
c.Assert(info.GetCheckpointTs(status), check.Equals, checkpointTs)
}

func (s *changefeedSuite) TestFixIncompatible(c *check.C) {
defer testleak.AfterTest(c)()
// Test to fix incompatible states.
testCases := []struct {
info *ChangeFeedInfo
expectedState FeedState
}{
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "",
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "4.0.14",
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "5.0.5",
},
expectedState: StateStopped,
},
}

for _, tc := range testCases {
tc.info.FixIncompatible()
c.Assert(tc.info.State, check.Equals, tc.expectedState)
}
}

func (s *changefeedSuite) TestFixState(c *check.C) {
defer testleak.AfterTest(c)()

testCases := []struct {
info *ChangeFeedInfo
expectedState FeedState
}{
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: nil,
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: nil,
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrUnknownSortEngine.RFCCode()),
},
},
expectedState: StateError,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrUnknownSortEngine.RFCCode()),
},
},
expectedState: StateError,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminFinish,
State: StateNormal,
Error: nil,
},
expectedState: StateFinished,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminRemove,
State: StateNormal,
Error: nil,
},
expectedState: StateRemoved,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminRemove,
State: StateNormal,
Error: nil,
},
expectedState: StateRemoved,
},
}

for _, tc := range testCases {
tc.info.fixState()
c.Assert(tc.info.State, check.Equals, tc.expectedState)
}
}
2 changes: 1 addition & 1 deletion cdc/model/reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er
return errors.Trace(err)
}
if key.Tp == etcd.CDCKeyTypeChangefeedInfo {
if err := s.Info.VerifyAndFix(); err != nil {
if err := s.Info.VerifyAndComplete(); err != nil {
return errors.Trace(err)
}
}
Expand Down
34 changes: 33 additions & 1 deletion cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ type Owner struct {
lastTickTime time.Time

closed int32
// bootstrapped specifies whether the owner has been initialized.
// This will only be done when the owner starts the first Tick.
// NOTICE: Do not use it in a method other than tick unexpectedly, as it is not a thread-safe value.
bootstrapped bool

newChangefeed func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed
}
Expand All @@ -97,6 +101,8 @@ func NewOwner4Test(
pdClient pd.Client,
) *Owner {
o := NewOwner(pdClient)
// Most tests do not need to test bootstrap.
o.bootstrapped = true
o.newChangefeed = func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
return newChangefeed4Test(id, gcManager, newDDLPuller, newSink)
}
Expand All @@ -109,8 +115,15 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState)
failpoint.Return(nil, errors.New("owner run with injected error"))
})
failpoint.Inject("sleep-in-owner-tick", nil)
ctx := stdCtx.(cdcContext.Context)
state := rawState.(*model.GlobalReactorState)
// At the first Tick, we need to do a bootstrap operation.
// Fix incompatible or incorrect meta information.
if !o.bootstrapped {
o.Bootstrap(state)
o.bootstrapped = true
return state, nil
}

o.updateMetrics(state)
if !o.clusterVersionConsistent(state.Captures) {
// sleep one second to avoid printing too much log
Expand All @@ -128,6 +141,7 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState)
}

o.handleJobs()
ctx := stdCtx.(cdcContext.Context)
for changefeedID, changefeedState := range state.Changefeeds {
if changefeedState.Info == nil {
o.cleanUpChangefeed(changefeedState)
Expand Down Expand Up @@ -238,6 +252,24 @@ func (o *Owner) cleanUpChangefeed(state *model.ChangefeedReactorState) {
}
}

// Bootstrap checks if the state contains incompatible or incorrect information and tries to fix it.
func (o *Owner) Bootstrap(state *model.GlobalReactorState) {
log.Info("Start bootstrapping", zap.Any("state", state))
fixChangefeedInfos(state)
}

// fixChangefeedInfos attempts to fix incompatible or incorrect meta information in changefeed state.
func fixChangefeedInfos(state *model.GlobalReactorState) {
for _, changefeedState := range state.Changefeeds {
if changefeedState != nil {
changefeedState.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
info.FixIncompatible()
return info, true, nil
})
}
}
}

func (o *Owner) updateMetrics(state *model.GlobalReactorState) {
// Keep the value of prometheus expression `rate(counter)` = 1
// Please also change alert rule in ticdc.rules.yml when change the expression value.
Expand Down
Loading