diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index f2a1c9a9172..8cd40ad1a6e 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tiflow/pkg/version" pd "github.com/tikv/pd/client" "go.uber.org/zap" + "golang.org/x/time/rate" ) type ownerJobType int @@ -46,6 +47,10 @@ const ( ownerJobTypeQuery ) +// versionInconsistentLogRate represents the rate of log output when there are +// captures with versions different from that of the owner +const versionInconsistentLogRate = 1 + type ownerJob struct { tp ownerJobType changefeedID model.ChangeFeedID @@ -77,10 +82,10 @@ type Owner struct { ownerJobQueueMu sync.Mutex ownerJobQueue []*ownerJob - + // logLimiter controls cluster version check log output rate + logLimiter *rate.Limiter lastTickTime time.Time - - closed int32 + closed int32 newChangefeed func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed } @@ -92,6 +97,7 @@ func NewOwner(pdClient pd.Client) *Owner { gcManager: gc.NewManager(pdClient), lastTickTime: time.Now(), newChangefeed: newChangefeed, + logLimiter: rate.NewLimiter(versionInconsistentLogRate, versionInconsistentLogRate), } } @@ -126,8 +132,6 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) o.handleJobs() if !o.clusterVersionConsistent(state.Captures) { - // sleep one second to avoid printing too much log - time.Sleep(1 * time.Second) return state, nil } // Owner should update GC safepoint before initializing changefeed, so @@ -280,7 +284,9 @@ func (o *Owner) clusterVersionConsistent(captures map[model.CaptureID]*model.Cap myVersion := version.ReleaseVersion for _, capture := range captures { if myVersion != capture.Version { - log.Warn("the capture version is different with the owner", zap.Reflect("capture", capture), zap.String("my-version", myVersion)) + if o.logLimiter.Allow() { + log.Warn("the capture version is different with the owner", zap.Reflect("capture", capture), zap.String("owner-version", myVersion)) + } return false } } diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index ceeaeb2fc3b..fe18e2e69a9 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -54,7 +54,7 @@ func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *orchestrator return safePoint, nil }, } - cf := NewOwner4Test(func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { + owner := NewOwner4Test(func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { return &mockDDLPuller{resolvedTs: startTs - 1}, nil }, func(ctx cdcContext.Context) (AsyncSink, error) { return &mockAsyncSink{}, nil @@ -72,7 +72,7 @@ func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *orchestrator captureBytes, err := ctx.GlobalVars().CaptureInfo.Marshal() c.Assert(err, check.IsNil) tester.MustUpdate(cdcKey.String(), captureBytes) - return cf, state, tester + return owner, state, tester } func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) {