From 99eaa82012266fd5dc8bc7d17fb38d579d08240e Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 20 Dec 2021 19:17:46 +0800 Subject: [PATCH] owner: fix owner tick block http request (#3490) (#3530) --- cdc/owner/owner.go | 27 +++++++---- cdc/owner/owner_test.go | 101 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 118 insertions(+), 10 deletions(-) diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index c830f5081f7..8cd40ad1a6e 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tiflow/pkg/version" pd "github.com/tikv/pd/client" "go.uber.org/zap" + "golang.org/x/time/rate" ) type ownerJobType int @@ -46,6 +47,10 @@ const ( ownerJobTypeQuery ) +// versionInconsistentLogRate represents the rate of log output when there are +// captures with versions different from that of the owner +const versionInconsistentLogRate = 1 + type ownerJob struct { tp ownerJobType changefeedID model.ChangeFeedID @@ -77,10 +82,10 @@ type Owner struct { ownerJobQueueMu sync.Mutex ownerJobQueue []*ownerJob - + // logLimiter controls cluster version check log output rate + logLimiter *rate.Limiter lastTickTime time.Time - - closed int32 + closed int32 newChangefeed func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed } @@ -92,6 +97,7 @@ func NewOwner(pdClient pd.Client) *Owner { gcManager: gc.NewManager(pdClient), lastTickTime: time.Now(), newChangefeed: newChangefeed, + logLimiter: rate.NewLimiter(versionInconsistentLogRate, versionInconsistentLogRate), } } @@ -118,12 +124,16 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) state := rawState.(*orchestrator.GlobalReactorState) o.captures = state.Captures o.updateMetrics(state) + + // handleJobs() should be called before clusterVersionConsistent(), because + // when there are different versions of cdc nodes in the cluster, + // the admin job may not be processed all the time. And http api relies on + // admin job, which will cause all http api unavailable. + o.handleJobs() + if !o.clusterVersionConsistent(state.Captures) { - // sleep one second to avoid printing too much log - time.Sleep(1 * time.Second) return state, nil } - // Owner should update GC safepoint before initializing changefeed, so // changefeed can remove its "ticdc-creating" service GC safepoint during // initializing. @@ -133,7 +143,6 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) return nil, errors.Trace(err) } - o.handleJobs() for changefeedID, changefeedState := range state.Changefeeds { if changefeedState.Info == nil { o.cleanUpChangefeed(changefeedState) @@ -275,7 +284,9 @@ func (o *Owner) clusterVersionConsistent(captures map[model.CaptureID]*model.Cap myVersion := version.ReleaseVersion for _, capture := range captures { if myVersion != capture.Version { - log.Warn("the capture version is different with the owner", zap.Reflect("capture", capture), zap.String("my-version", myVersion)) + if o.logLimiter.Allow() { + log.Warn("the capture version is different with the owner", zap.Reflect("capture", capture), zap.String("owner-version", myVersion)) + } return false } } diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 21fdaa43937..fe18e2e69a9 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -54,7 +54,7 @@ func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *orchestrator return safePoint, nil }, } - cf := NewOwner4Test(func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { + owner := NewOwner4Test(func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { return &mockDDLPuller{resolvedTs: startTs - 1}, nil }, func(ctx cdcContext.Context) (AsyncSink, error) { return &mockAsyncSink{}, nil @@ -72,13 +72,14 @@ func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *orchestrator captureBytes, err := ctx.GlobalVars().CaptureInfo.Marshal() c.Assert(err, check.IsNil) tester.MustUpdate(cdcKey.String(), captureBytes) - return cf, state, tester + return owner, state, tester } func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) owner, state, tester := createOwner4Test(ctx, c) + changefeedID := "test-changefeed" changefeedInfo := &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), @@ -362,3 +363,99 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) { case <-ch: } } + +// make sure handleJobs works well even if there is two different +// version of captures in the cluster +func (s *ownerSuite) TestHandleJobsDontBlock(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(false) + ctx, cancel := cdcContext.WithCancel(ctx) + defer cancel() + owner, state, tester := createOwner4Test(ctx, c) + + statusProvider := owner.StatusProvider() + // work well + cf1 := "test-changefeed" + cfInfo1 := &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + State: model.StateNormal, + } + changefeedStr, err := cfInfo1.Marshal() + c.Assert(err, check.IsNil) + cdcKey := etcd.CDCKey{ + Tp: etcd.CDCKeyTypeChangefeedInfo, + ChangefeedID: cf1, + } + tester.MustUpdate(cdcKey.String(), []byte(changefeedStr)) + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + + c.Assert(owner.changefeeds, check.HasKey, cf1) + + // add an non-consistent version capture + captureInfo := &model.CaptureInfo{ + ID: "capture-id-owner-test", + AdvertiseAddr: "127.0.0.1:0000", + Version: " v0.0.1-test-only", + } + cdcKey = etcd.CDCKey{ + Tp: etcd.CDCKeyTypeCapture, + CaptureID: captureInfo.ID, + } + v, err := captureInfo.Marshal() + c.Assert(err, check.IsNil) + tester.MustUpdate(cdcKey.String(), v) + + // try to add another changefeed + cf2 := "test-changefeed1" + cfInfo2 := &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + State: model.StateNormal, + } + changefeedStr1, err := cfInfo2.Marshal() + c.Assert(err, check.IsNil) + cdcKey = etcd.CDCKey{ + Tp: etcd.CDCKeyTypeChangefeedInfo, + ChangefeedID: cf2, + } + tester.MustUpdate(cdcKey.String(), []byte(changefeedStr1)) + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + // make sure this changefeed add failed, which means that owner are return + // in clusterVersionConsistent check + c.Assert(owner.changefeeds[cf2], check.IsNil) + + // make sure statusProvider works well + ctx1, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + var errIn error + var infos map[model.ChangeFeedID]*model.ChangeFeedInfo + done := make(chan struct{}) + go func() { + infos, errIn = statusProvider.GetAllChangeFeedInfo(ctx1) + done <- struct{}{} + }() + + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() +WorkLoop: + for { + select { + case <-done: + break WorkLoop + case <-ctx1.Done(): + c.Fatal(ctx1.Err()) + case <-ticker.C: + _, err = owner.Tick(ctx, state) + c.Assert(err, check.IsNil) + } + } + c.Assert(errIn, check.IsNil) + c.Assert(infos[cf1], check.NotNil) + c.Assert(infos[cf2], check.IsNil) +}