From aefa4ebbb179d30769862ab3c11863372a96fb98 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 24 Nov 2021 12:15:02 +0800 Subject: [PATCH 1/8] owner_test: fix case TestHandleJobsDontBlock --- cdc/owner/owner_test.go | 44 +++++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index c500eff2787..6491f08f16e 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -55,7 +55,7 @@ func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *orchestrator return safePoint, nil }, } - cf := NewOwner4Test(func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { + owner := NewOwner4Test(func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { return &mockDDLPuller{resolvedTs: startTs - 1}, nil }, func(ctx cdcContext.Context) (AsyncSink, error) { return &mockAsyncSink{}, nil @@ -73,7 +73,7 @@ func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *orchestrator captureBytes, err := ctx.GlobalVars().CaptureInfo.Marshal() c.Assert(err, check.IsNil) tester.MustUpdate(cdcKey.String(), captureBytes) - return cf, state, tester + return owner, state, tester } func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) { @@ -373,22 +373,24 @@ func (s *ownerSuite) TestHandleJobsDontBlock(c *check.C) { owner, state, tester := createOwner4Test(ctx, c) statusProvider := owner.StatusProvider() // work well - changefeedID := "test-changefeed" - changefeedInfo := &model.ChangeFeedInfo{ + cf1 := "test-changefeed" + cfInfo1 := &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), Config: config.GetDefaultReplicaConfig(), + State: model.StateNormal, } - changefeedStr, err := changefeedInfo.Marshal() + changefeedStr, err := cfInfo1.Marshal() c.Assert(err, check.IsNil) cdcKey := etcd.CDCKey{ Tp: etcd.CDCKeyTypeChangefeedInfo, - ChangefeedID: changefeedID, + ChangefeedID: cf1, } tester.MustUpdate(cdcKey.String(), []byte(changefeedStr)) _, err = owner.Tick(ctx, state) tester.MustApplyPatches() c.Assert(err, check.IsNil) - c.Assert(owner.changefeeds, check.HasKey, changefeedID) + + c.Assert(owner.changefeeds, check.HasKey, cf1) // add an non-consistent version capture captureInfo := &model.CaptureInfo{ @@ -405,16 +407,17 @@ func (s *ownerSuite) TestHandleJobsDontBlock(c *check.C) { tester.MustUpdate(cdcKey.String(), v) // try to add another changefeed - changefeedID1 := "test-changefeed1" - changefeedInfo1 := &model.ChangeFeedInfo{ + cf2 := "test-changefeed1" + cfInfo2 := &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), Config: config.GetDefaultReplicaConfig(), + State: model.StateNormal, } - changefeedStr1, err := changefeedInfo1.Marshal() + changefeedStr1, err := cfInfo2.Marshal() c.Assert(err, check.IsNil) cdcKey = etcd.CDCKey{ Tp: etcd.CDCKeyTypeChangefeedInfo, - ChangefeedID: changefeedID, + ChangefeedID: cf2, } tester.MustUpdate(cdcKey.String(), []byte(changefeedStr1)) _, err = owner.Tick(ctx, state) @@ -422,20 +425,27 @@ func (s *ownerSuite) TestHandleJobsDontBlock(c *check.C) { c.Assert(err, check.IsNil) // make sure this changefeed add failed, which means that owner are return // in clusterVersionConsistent check - c.Assert(owner.changefeeds[changefeedID1], check.IsNil) + c.Assert(owner.changefeeds[cf2], check.IsNil) // make sure statusProvider works well ctx1, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - wg := sync.WaitGroup{} + + var errIn error + var infos map[model.ChangeFeedID]*model.ChangeFeedInfo + var wg sync.WaitGroup wg.Add(1) go func() { - infos, err := statusProvider.GetAllChangeFeedInfo(ctx1) - c.Assert(err, check.IsNil) - c.Assert(infos[changefeedID], check.NotNil) - c.Assert(infos[changefeedID1], check.IsNil) + infos, errIn = statusProvider.GetAllChangeFeedInfo(ctx1) wg.Done() }() + + time.Sleep(1 * time.Second) _, err = owner.Tick(ctx, state) c.Assert(err, check.IsNil) + + wg.Wait() + c.Assert(errIn, check.IsNil) + c.Assert(infos[cf1], check.NotNil) + c.Assert(infos[cf2], check.IsNil) } From c3bc4d62485ed7dc23a9bf63f4dc308fc5e788bb Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 24 Nov 2021 12:44:24 +0800 Subject: [PATCH 2/8] owner_test: resolves comment --- cdc/owner/owner_test.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 6491f08f16e..0219facc628 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "math" - "sync" "time" "github.com/pingcap/check" @@ -433,18 +432,26 @@ func (s *ownerSuite) TestHandleJobsDontBlock(c *check.C) { var errIn error var infos map[model.ChangeFeedID]*model.ChangeFeedInfo - var wg sync.WaitGroup - wg.Add(1) + done := make(chan struct{}) go func() { infos, errIn = statusProvider.GetAllChangeFeedInfo(ctx1) - wg.Done() + done <- struct{}{} }() - time.Sleep(1 * time.Second) - _, err = owner.Tick(ctx, state) - c.Assert(err, check.IsNil) - - wg.Wait() + ticker := time.NewTicker(100 * time.Millisecond) +workloop: + for { + select { + case <-ctx1.Done(): + break workloop + case <-ticker.C: + _, err = owner.Tick(ctx, state) + c.Assert(err, check.IsNil) + case <-done: + break workloop + } + } + ticker.Stop() c.Assert(errIn, check.IsNil) c.Assert(infos[cf1], check.NotNil) c.Assert(infos[cf2], check.IsNil) From d249af4166d10a177e1551721ca477be2b81cf86 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 24 Nov 2021 13:00:09 +0800 Subject: [PATCH 3/8] owner_test: resolves comment --- cdc/owner/owner_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 0219facc628..22d7e8f2278 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -438,7 +438,8 @@ func (s *ownerSuite) TestHandleJobsDontBlock(c *check.C) { done <- struct{}{} }() - ticker := time.NewTicker(100 * time.Millisecond) + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() workloop: for { select { @@ -451,7 +452,6 @@ workloop: break workloop } } - ticker.Stop() c.Assert(errIn, check.IsNil) c.Assert(infos[cf1], check.NotNil) c.Assert(infos[cf2], check.IsNil) From 51e524767992feca318e2222a1ea8c378e529b7d Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 24 Nov 2021 13:52:08 +0800 Subject: [PATCH 4/8] owner_test: resolves comment --- cdc/owner/owner_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 22d7e8f2278..f572900e9a8 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -440,16 +440,16 @@ func (s *ownerSuite) TestHandleJobsDontBlock(c *check.C) { ticker := time.NewTicker(20 * time.Millisecond) defer ticker.Stop() -workloop: +WorkLoop: for { select { case <-ctx1.Done(): - break workloop + c.Fatal(ctx1.Err()) case <-ticker.C: _, err = owner.Tick(ctx, state) c.Assert(err, check.IsNil) case <-done: - break workloop + break WorkLoop } } c.Assert(errIn, check.IsNil) From d43bcefa2a561ea7df43998ff759e81b36165afe Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 24 Nov 2021 14:38:18 +0800 Subject: [PATCH 5/8] owner: add a logLimiter field for Owner to limit log output --- cdc/owner/owner.go | 11 +++++++---- cdc/owner/owner_test.go | 5 +++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index fcc88329c0c..1e66e5644d6 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/ticdc/pkg/version" pd "github.com/tikv/pd/client" "go.uber.org/zap" + "golang.org/x/time/rate" ) type ownerJobType int @@ -78,9 +79,9 @@ type Owner struct { ownerJobQueueMu sync.Mutex ownerJobQueue []*ownerJob + logLimiter *rate.Limiter lastTickTime time.Time - - closed int32 + closed int32 newChangefeed func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed } @@ -92,6 +93,7 @@ func NewOwner(pdClient pd.Client) *Owner { gcManager: gc.NewManager(pdClient), lastTickTime: time.Now(), newChangefeed: newChangefeed, + logLimiter: rate.NewLimiter(1, 1), } } @@ -127,7 +129,6 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) if !o.clusterVersionConsistent(state.Captures) { // sleep one second to avoid printing too much log - time.Sleep(1 * time.Second) return state, nil } // Owner should update GC safepoint before initializing changefeed, so @@ -280,7 +281,9 @@ func (o *Owner) clusterVersionConsistent(captures map[model.CaptureID]*model.Cap myVersion := version.ReleaseVersion for _, capture := range captures { if myVersion != capture.Version { - log.Warn("the capture version is different with the owner", zap.Reflect("capture", capture), zap.String("my-version", myVersion)) + if o.logLimiter.Allow() { + log.Warn("the capture version is different with the owner", zap.Reflect("capture", capture), zap.String("my-version", myVersion)) + } return false } } diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index f572900e9a8..c9e23311199 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -370,6 +370,7 @@ func (s *ownerSuite) TestHandleJobsDontBlock(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) owner, state, tester := createOwner4Test(ctx, c) + statusProvider := owner.StatusProvider() // work well cf1 := "test-changefeed" @@ -443,13 +444,13 @@ func (s *ownerSuite) TestHandleJobsDontBlock(c *check.C) { WorkLoop: for { select { + case <-done: + break WorkLoop case <-ctx1.Done(): c.Fatal(ctx1.Err()) case <-ticker.C: _, err = owner.Tick(ctx, state) c.Assert(err, check.IsNil) - case <-done: - break WorkLoop } } c.Assert(errIn, check.IsNil) From b143b54898749ab73bf78349a1da6c7ead5ebf58 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 24 Nov 2021 14:45:11 +0800 Subject: [PATCH 6/8] owner: resolves comment --- cdc/owner/owner.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 1e66e5644d6..e14fb019f60 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -128,7 +128,6 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) o.handleJobs() if !o.clusterVersionConsistent(state.Captures) { - // sleep one second to avoid printing too much log return state, nil } // Owner should update GC safepoint before initializing changefeed, so From 53f8d8cb2018d7a0289e515c04398f4da00ef159 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 24 Nov 2021 14:57:35 +0800 Subject: [PATCH 7/8] owner: resolves comment --- cdc/owner/owner.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index e14fb019f60..c14278d6348 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -47,6 +47,10 @@ const ( ownerJobTypeQuery ) +// versionInconsistentLogRate represents the rate of log output when there are +// captures version is different with the owner version in the cluster +const versionInconsistentLogRate = 1 + type ownerJob struct { tp ownerJobType changefeedID model.ChangeFeedID @@ -78,7 +82,7 @@ type Owner struct { ownerJobQueueMu sync.Mutex ownerJobQueue []*ownerJob - + // logLimiter controls cluster version check log output rate logLimiter *rate.Limiter lastTickTime time.Time closed int32 @@ -93,7 +97,7 @@ func NewOwner(pdClient pd.Client) *Owner { gcManager: gc.NewManager(pdClient), lastTickTime: time.Now(), newChangefeed: newChangefeed, - logLimiter: rate.NewLimiter(1, 1), + logLimiter: rate.NewLimiter(versionInconsistentLogRate, versionInconsistentLogRate), } } From b8da81ea26946919ff300c1369c7ad81c50907c6 Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Wed, 24 Nov 2021 16:07:25 +0800 Subject: [PATCH 8/8] resolves comments Co-authored-by: Zixiong Liu --- cdc/owner/owner.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index c14278d6348..a7943b748bd 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -48,7 +48,7 @@ const ( ) // versionInconsistentLogRate represents the rate of log output when there are -// captures version is different with the owner version in the cluster +// captures with versions different from that of the owner const versionInconsistentLogRate = 1 type ownerJob struct { @@ -285,7 +285,7 @@ func (o *Owner) clusterVersionConsistent(captures map[model.CaptureID]*model.Cap for _, capture := range captures { if myVersion != capture.Version { if o.logLimiter.Allow() { - log.Warn("the capture version is different with the owner", zap.Reflect("capture", capture), zap.String("my-version", myVersion)) + log.Warn("the capture version is different with the owner", zap.Reflect("capture", capture), zap.String("owner-version", myVersion)) } return false }