From aa9733328a4e4c2c09ed046ee8bb0d8675bc11dc Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 14 Jul 2023 10:36:44 +0800 Subject: [PATCH] pdutil (ticdc): refine pd clock interface (#9298) (#9302) close pingcap/tiflow#9294 --- cdc/kv/region_worker.go | 11 ++--------- cdc/owner/changefeed.go | 5 +---- cdc/owner/feed_state_manager.go | 1 + cdc/processor/pipeline/table_actor.go | 2 +- cdc/processor/processor.go | 2 +- cdc/scheduler/internal/v3/coordinator.go | 5 +---- pkg/pdutil/clock.go | 23 +++++++++-------------- pkg/pdutil/clock_test.go | 8 +++----- pkg/txnutil/gc/gc_manager.go | 15 +++------------ 9 files changed, 22 insertions(+), 50 deletions(-) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 0f1e5eb63b9..597f60d8448 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -266,14 +266,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { w.rtsManager.Upsert(regionID, resolvedTs, eventTime) } case <-advanceCheckTicker.C: - currentTimeFromPD, err := w.session.client.pdClock.CurrentTime() - if err != nil { - log.Warn("failed to get current time from PD", - zap.Error(err), - zap.String("namespace", w.session.client.changefeed.Namespace), - zap.String("changefeed", w.session.client.changefeed.ID)) - continue - } + currentTimeFromPD := w.session.client.pdClock.CurrentTime() expired := make([]*regionTsInfo, 0) for w.rtsManager.Len() > 0 { item := w.rtsManager.Pop() @@ -335,7 +328,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { zap.Uint64("resolvedTs", lastResolvedTs), ) } - err = w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion) + err := w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion) if err != nil { log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index ae90b5e7079..1409265cb73 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -370,10 +370,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* return errors.Trace(err) } - pdTime, err := c.upstream.PDClock.CurrentTime() - if err != nil { - return errors.Trace(err) - } + pdTime := c.upstream.PDClock.CurrentTime() currentTs := oracle.GetPhysical(pdTime) // CheckpointCannotProceed implies that not all tables are being replicated normally, diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 5836c089934..21d16b22ae4 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -441,6 +441,7 @@ func (m *feedStateManager) warningsReportedByProcessors() []*model.RunningError if position == nil { return nil, false, nil } + // set Warning to nil after it has been handled position.Warning = nil return position, true, nil }) diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index bb3b4614865..f30cc74a652 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -464,7 +464,7 @@ func (t *tableActor) AsyncStop() bool { func (t *tableActor) Stats() tablepb.Stats { pullerStats := t.pullerNode.plr.Stats() sinkStats := t.sinkNode.Stats() - now, _ := t.upstream.PDClock.CurrentTime() + now := t.upstream.PDClock.CurrentTime() stats := tablepb.Stats{ RegionCount: pullerStats.RegionCount, diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 8289c0579f1..2624d7db9c5 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -463,7 +463,7 @@ func (p *processor) GetTableStatus(tableID model.TableID, collectStat bool) tabl func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats sinkmanager.TableStats) tablepb.Stats { pullerStats := p.sourceManager.GetTablePullerStats(tableID) - now, _ := p.upstream.PDClock.CurrentTime() + now := p.upstream.PDClock.CurrentTime() stats := tablepb.Stats{ RegionCount: pullerStats.RegionCount, diff --git a/cdc/scheduler/internal/v3/coordinator.go b/cdc/scheduler/internal/v3/coordinator.go index 83a75719c34..b61fdf3d498 100644 --- a/cdc/scheduler/internal/v3/coordinator.go +++ b/cdc/scheduler/internal/v3/coordinator.go @@ -294,10 +294,7 @@ func (c *coordinator) poll( pdTime := time.Now() // only nil in unit test if c.pdClock != nil { - pdTime, err = c.pdClock.CurrentTime() - if err != nil { - log.Warn("schedulerv3: failed to get pd time", zap.Error(err)) - } + pdTime = c.pdClock.CurrentTime() } if !c.captureM.CheckAllCaptureInitialized() { diff --git a/pkg/pdutil/clock.go b/pkg/pdutil/clock.go index ee7edec660b..fa79ec4074b 100644 --- a/pkg/pdutil/clock.go +++ b/pkg/pdutil/clock.go @@ -31,7 +31,7 @@ const pdTimeUpdateInterval = 10 * time.Millisecond // Clock is a time source of PD cluster. type Clock interface { // CurrentTime returns approximate current time from pd. - CurrentTime() (time.Time, error) + CurrentTime() time.Time Run(ctx context.Context) Stop() } @@ -45,7 +45,6 @@ type clock struct { tsEventTime time.Time // The time we receive PD ts. tsProcessingTime time.Time - err error } updateInterval time.Duration cancel context.CancelFunc @@ -91,30 +90,26 @@ func (c *clock) Run(ctx context.Context) { c.mu.Lock() c.mu.tsEventTime = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0)) c.mu.tsProcessingTime = time.Now() - c.mu.err = nil c.mu.Unlock() return nil }, retry.WithBackoffBaseDelay(200), retry.WithMaxTries(10)) if err != nil { - log.Warn("get time from pd failed, will use local time as pd time") - c.mu.Lock() - now := time.Now() - c.mu.tsEventTime = now - c.mu.tsProcessingTime = now - c.mu.err = err - c.mu.Unlock() + log.Warn("get time from pd failed, do not update time cache", + zap.Time("cachedTime", c.mu.tsEventTime), + zap.Time("processingTime", c.mu.tsProcessingTime), + zap.Error(err)) } } } } // CurrentTime returns approximate current time from pd. -func (c *clock) CurrentTime() (time.Time, error) { +func (c *clock) CurrentTime() time.Time { c.mu.RLock() defer c.mu.RUnlock() tsEventTime := c.mu.tsEventTime current := tsEventTime.Add(time.Since(c.mu.tsProcessingTime)) - return current, errors.Trace(c.mu.err) + return current } // Stop clock. @@ -132,8 +127,8 @@ func NewClock4Test() Clock { return &clock4Test{} } -func (c *clock4Test) CurrentTime() (time.Time, error) { - return time.Now(), nil +func (c *clock4Test) CurrentTime() time.Time { + return time.Now() } func (c *clock4Test) Run(ctx context.Context) { diff --git a/pkg/pdutil/clock_test.go b/pkg/pdutil/clock_test.go index e96f0477e1b..57f7c56360b 100644 --- a/pkg/pdutil/clock_test.go +++ b/pkg/pdutil/clock_test.go @@ -43,13 +43,11 @@ func TestTimeFromPD(t *testing.T) { defer clock.Stop() time.Sleep(1 * time.Second) - t1, err := clock.CurrentTime() - require.Nil(t, err) + t1 := clock.CurrentTime() time.Sleep(400 * time.Millisecond) // assume that the gc safe point updated one hour ago - t2, err := clock.CurrentTime() - require.Nil(t, err) + t2 := clock.CurrentTime() // should return new time require.NotEqual(t, t1, t2) } @@ -70,7 +68,7 @@ func TestEventTimeAndProcessingTime(t *testing.T) { sleep := time.Second time.Sleep(sleep) - t1, err := clock.CurrentTime() + t1 := clock.CurrentTime() now := time.Now() require.Nil(t, err) require.Less(t, now.Sub(t1), sleep/2) diff --git a/pkg/txnutil/gc/gc_manager.go b/pkg/txnutil/gc/gc_manager.go index 639aba6e2be..bf0232951a9 100644 --- a/pkg/txnutil/gc/gc_manager.go +++ b/pkg/txnutil/gc/gc_manager.go @@ -113,10 +113,7 @@ func (m *gcManager) CheckStaleCheckpointTs( ) error { gcSafepointUpperBound := checkpointTs - 1 if m.isTiCDCBlockGC { - pdTime, err := m.pdClock.CurrentTime() - if err != nil { - return err - } + pdTime := m.pdClock.CurrentTime() if pdTime.Sub( oracle.GetTimeFromTS(gcSafepointUpperBound), ) > time.Duration(m.gcTTL)*time.Second { @@ -143,14 +140,8 @@ func (m *gcManager) CheckStaleCheckpointTs( func (m *gcManager) IgnoreFailedChangeFeed( checkpointTs uint64, ) bool { - pdTime, err := m.pdClock.CurrentTime() - if err != nil { - log.Warn("failed to get ts", - zap.String("GcManagerID", m.gcServiceID), - zap.Error(err), - ) - return false - } + pdTime := m.pdClock.CurrentTime() + // ignore the changefeed if its current checkpoint TS is earlier // than the (currentPDTso - failedFeedDataRetentionTime). gcSafepointUpperBound := checkpointTs - 1