Skip to content

Commit

Permalink
owner_test (ticdc): fix unstable case TestHandleJobsDontBlock (#3601)
Browse files Browse the repository at this point in the history
(cherry picked from commit f3d7d9a)
  • Loading branch information
asddongmen committed Dec 20, 2021
1 parent 31c8182 commit c444765
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
18 changes: 12 additions & 6 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tiflow/pkg/version"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

type ownerJobType int
Expand All @@ -46,6 +47,10 @@ const (
ownerJobTypeQuery
)

// versionInconsistentLogRate represents the rate of log output when there are
// captures with versions different from that of the owner
const versionInconsistentLogRate = 1

type ownerJob struct {
tp ownerJobType
changefeedID model.ChangeFeedID
Expand Down Expand Up @@ -77,10 +82,10 @@ type Owner struct {

ownerJobQueueMu sync.Mutex
ownerJobQueue []*ownerJob

// logLimiter controls cluster version check log output rate
logLimiter *rate.Limiter
lastTickTime time.Time

closed int32
closed int32

newChangefeed func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed
}
Expand All @@ -92,6 +97,7 @@ func NewOwner(pdClient pd.Client) *Owner {
gcManager: gc.NewManager(pdClient),
lastTickTime: time.Now(),
newChangefeed: newChangefeed,
logLimiter: rate.NewLimiter(versionInconsistentLogRate, versionInconsistentLogRate),
}
}

Expand Down Expand Up @@ -126,8 +132,6 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState)
o.handleJobs()

if !o.clusterVersionConsistent(state.Captures) {
// sleep one second to avoid printing too much log
time.Sleep(1 * time.Second)
return state, nil
}
// Owner should update GC safepoint before initializing changefeed, so
Expand Down Expand Up @@ -280,7 +284,9 @@ func (o *Owner) clusterVersionConsistent(captures map[model.CaptureID]*model.Cap
myVersion := version.ReleaseVersion
for _, capture := range captures {
if myVersion != capture.Version {
log.Warn("the capture version is different with the owner", zap.Reflect("capture", capture), zap.String("my-version", myVersion))
if o.logLimiter.Allow() {
log.Warn("the capture version is different with the owner", zap.Reflect("capture", capture), zap.String("owner-version", myVersion))
}
return false
}
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *orchestrator
return safePoint, nil
},
}
cf := NewOwner4Test(func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) {
owner := NewOwner4Test(func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) {
return &mockDDLPuller{resolvedTs: startTs - 1}, nil
}, func(ctx cdcContext.Context) (AsyncSink, error) {
return &mockAsyncSink{}, nil
Expand All @@ -72,7 +72,7 @@ func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *orchestrator
captureBytes, err := ctx.GlobalVars().CaptureInfo.Marshal()
c.Assert(err, check.IsNil)
tester.MustUpdate(cdcKey.String(), captureBytes)
return cf, state, tester
return owner, state, tester
}

func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) {
Expand Down

0 comments on commit c444765

Please sign in to comment.