Skip to content

Commit

Permalink
owner: fix owner tick block http request (#3490) (#3530)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 20, 2021
1 parent 2b0f9b7 commit 81af769
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 10 deletions.
27 changes: 19 additions & 8 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tiflow/pkg/version"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

type ownerJobType int
Expand All @@ -46,6 +47,10 @@ const (
ownerJobTypeQuery
)

// versionInconsistentLogRate represents the rate of log output when there are
// captures with versions different from that of the owner
const versionInconsistentLogRate = 1

type ownerJob struct {
tp ownerJobType
changefeedID model.ChangeFeedID
Expand Down Expand Up @@ -77,10 +82,10 @@ type Owner struct {

ownerJobQueueMu sync.Mutex
ownerJobQueue []*ownerJob

// logLimiter controls cluster version check log output rate
logLimiter *rate.Limiter
lastTickTime time.Time

closed int32
closed int32

newChangefeed func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed
}
Expand All @@ -92,6 +97,7 @@ func NewOwner(pdClient pd.Client) *Owner {
gcManager: gc.NewManager(pdClient),
lastTickTime: time.Now(),
newChangefeed: newChangefeed,
logLimiter: rate.NewLimiter(versionInconsistentLogRate, versionInconsistentLogRate),
}
}

Expand All @@ -118,12 +124,16 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState)
state := rawState.(*orchestrator.GlobalReactorState)
o.captures = state.Captures
o.updateMetrics(state)

// handleJobs() should be called before clusterVersionConsistent(), because
// when there are different versions of cdc nodes in the cluster,
// the admin job may not be processed all the time. And http api relies on
// admin job, which will cause all http api unavailable.
o.handleJobs()

if !o.clusterVersionConsistent(state.Captures) {
// sleep one second to avoid printing too much log
time.Sleep(1 * time.Second)
return state, nil
}

// Owner should update GC safepoint before initializing changefeed, so
// changefeed can remove its "ticdc-creating" service GC safepoint during
// initializing.
Expand All @@ -133,7 +143,6 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState)
return nil, errors.Trace(err)
}

o.handleJobs()
for changefeedID, changefeedState := range state.Changefeeds {
if changefeedState.Info == nil {
o.cleanUpChangefeed(changefeedState)
Expand Down Expand Up @@ -275,7 +284,9 @@ func (o *Owner) clusterVersionConsistent(captures map[model.CaptureID]*model.Cap
myVersion := version.ReleaseVersion
for _, capture := range captures {
if myVersion != capture.Version {
log.Warn("the capture version is different with the owner", zap.Reflect("capture", capture), zap.String("my-version", myVersion))
if o.logLimiter.Allow() {
log.Warn("the capture version is different with the owner", zap.Reflect("capture", capture), zap.String("owner-version", myVersion))
}
return false
}
}
Expand Down
101 changes: 99 additions & 2 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *orchestrator
return safePoint, nil
},
}
cf := NewOwner4Test(func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) {
owner := NewOwner4Test(func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) {
return &mockDDLPuller{resolvedTs: startTs - 1}, nil
}, func(ctx cdcContext.Context) (AsyncSink, error) {
return &mockAsyncSink{}, nil
Expand All @@ -72,13 +72,14 @@ func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *orchestrator
captureBytes, err := ctx.GlobalVars().CaptureInfo.Marshal()
c.Assert(err, check.IsNil)
tester.MustUpdate(cdcKey.String(), captureBytes)
return cf, state, tester
return owner, state, tester
}

func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
owner, state, tester := createOwner4Test(ctx, c)

changefeedID := "test-changefeed"
changefeedInfo := &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Expand Down Expand Up @@ -362,3 +363,99 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) {
case <-ch:
}
}

// make sure handleJobs works well even if there is two different
// version of captures in the cluster
func (s *ownerSuite) TestHandleJobsDontBlock(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
ctx, cancel := cdcContext.WithCancel(ctx)
defer cancel()
owner, state, tester := createOwner4Test(ctx, c)

statusProvider := owner.StatusProvider()
// work well
cf1 := "test-changefeed"
cfInfo1 := &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
State: model.StateNormal,
}
changefeedStr, err := cfInfo1.Marshal()
c.Assert(err, check.IsNil)
cdcKey := etcd.CDCKey{
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: cf1,
}
tester.MustUpdate(cdcKey.String(), []byte(changefeedStr))
_, err = owner.Tick(ctx, state)
tester.MustApplyPatches()
c.Assert(err, check.IsNil)

c.Assert(owner.changefeeds, check.HasKey, cf1)

// add an non-consistent version capture
captureInfo := &model.CaptureInfo{
ID: "capture-id-owner-test",
AdvertiseAddr: "127.0.0.1:0000",
Version: " v0.0.1-test-only",
}
cdcKey = etcd.CDCKey{
Tp: etcd.CDCKeyTypeCapture,
CaptureID: captureInfo.ID,
}
v, err := captureInfo.Marshal()
c.Assert(err, check.IsNil)
tester.MustUpdate(cdcKey.String(), v)

// try to add another changefeed
cf2 := "test-changefeed1"
cfInfo2 := &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
State: model.StateNormal,
}
changefeedStr1, err := cfInfo2.Marshal()
c.Assert(err, check.IsNil)
cdcKey = etcd.CDCKey{
Tp: etcd.CDCKeyTypeChangefeedInfo,
ChangefeedID: cf2,
}
tester.MustUpdate(cdcKey.String(), []byte(changefeedStr1))
_, err = owner.Tick(ctx, state)
tester.MustApplyPatches()
c.Assert(err, check.IsNil)
// make sure this changefeed add failed, which means that owner are return
// in clusterVersionConsistent check
c.Assert(owner.changefeeds[cf2], check.IsNil)

// make sure statusProvider works well
ctx1, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

var errIn error
var infos map[model.ChangeFeedID]*model.ChangeFeedInfo
done := make(chan struct{})
go func() {
infos, errIn = statusProvider.GetAllChangeFeedInfo(ctx1)
done <- struct{}{}
}()

ticker := time.NewTicker(20 * time.Millisecond)
defer ticker.Stop()
WorkLoop:
for {
select {
case <-done:
break WorkLoop
case <-ctx1.Done():
c.Fatal(ctx1.Err())
case <-ticker.C:
_, err = owner.Tick(ctx, state)
c.Assert(err, check.IsNil)
}
}
c.Assert(errIn, check.IsNil)
c.Assert(infos[cf1], check.NotNil)
c.Assert(infos[cf2], check.IsNil)
}

0 comments on commit 81af769

Please sign in to comment.