Skip to content

Commit

Permalink
pdutil (ticdc): refine pd clock interface (pingcap#9298) (pingcap#9302)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 14, 2023
1 parent 0124d11 commit aa97333
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 50 deletions.
11 changes: 2 additions & 9 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
w.rtsManager.Upsert(regionID, resolvedTs, eventTime)
}
case <-advanceCheckTicker.C:
currentTimeFromPD, err := w.session.client.pdClock.CurrentTime()
if err != nil {
log.Warn("failed to get current time from PD",
zap.Error(err),
zap.String("namespace", w.session.client.changefeed.Namespace),
zap.String("changefeed", w.session.client.changefeed.ID))
continue
}
currentTimeFromPD := w.session.client.pdClock.CurrentTime()
expired := make([]*regionTsInfo, 0)
for w.rtsManager.Len() > 0 {
item := w.rtsManager.Pop()
Expand Down Expand Up @@ -335,7 +328,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
zap.Uint64("resolvedTs", lastResolvedTs),
)
}
err = w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion)
err := w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion)
if err != nil {
log.Warn("failed to resolve lock",
zap.Uint64("regionID", rts.regionID),
Expand Down
5 changes: 1 addition & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return errors.Trace(err)
}

pdTime, err := c.upstream.PDClock.CurrentTime()
if err != nil {
return errors.Trace(err)
}
pdTime := c.upstream.PDClock.CurrentTime()
currentTs := oracle.GetPhysical(pdTime)

// CheckpointCannotProceed implies that not all tables are being replicated normally,
Expand Down
1 change: 1 addition & 0 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ func (m *feedStateManager) warningsReportedByProcessors() []*model.RunningError
if position == nil {
return nil, false, nil
}
// set Warning to nil after it has been handled
position.Warning = nil
return position, true, nil
})
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (t *tableActor) AsyncStop() bool {
func (t *tableActor) Stats() tablepb.Stats {
pullerStats := t.pullerNode.plr.Stats()
sinkStats := t.sinkNode.Stats()
now, _ := t.upstream.PDClock.CurrentTime()
now := t.upstream.PDClock.CurrentTime()

stats := tablepb.Stats{
RegionCount: pullerStats.RegionCount,
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func (p *processor) GetTableStatus(tableID model.TableID, collectStat bool) tabl

func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats sinkmanager.TableStats) tablepb.Stats {
pullerStats := p.sourceManager.GetTablePullerStats(tableID)
now, _ := p.upstream.PDClock.CurrentTime()
now := p.upstream.PDClock.CurrentTime()

stats := tablepb.Stats{
RegionCount: pullerStats.RegionCount,
Expand Down
5 changes: 1 addition & 4 deletions cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,7 @@ func (c *coordinator) poll(
pdTime := time.Now()
// only nil in unit test
if c.pdClock != nil {
pdTime, err = c.pdClock.CurrentTime()
if err != nil {
log.Warn("schedulerv3: failed to get pd time", zap.Error(err))
}
pdTime = c.pdClock.CurrentTime()
}

if !c.captureM.CheckAllCaptureInitialized() {
Expand Down
23 changes: 9 additions & 14 deletions pkg/pdutil/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const pdTimeUpdateInterval = 10 * time.Millisecond
// Clock is a time source of PD cluster.
type Clock interface {
// CurrentTime returns approximate current time from pd.
CurrentTime() (time.Time, error)
CurrentTime() time.Time
Run(ctx context.Context)
Stop()
}
Expand All @@ -45,7 +45,6 @@ type clock struct {
tsEventTime time.Time
// The time we receive PD ts.
tsProcessingTime time.Time
err error
}
updateInterval time.Duration
cancel context.CancelFunc
Expand Down Expand Up @@ -91,30 +90,26 @@ func (c *clock) Run(ctx context.Context) {
c.mu.Lock()
c.mu.tsEventTime = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0))
c.mu.tsProcessingTime = time.Now()
c.mu.err = nil
c.mu.Unlock()
return nil
}, retry.WithBackoffBaseDelay(200), retry.WithMaxTries(10))
if err != nil {
log.Warn("get time from pd failed, will use local time as pd time")
c.mu.Lock()
now := time.Now()
c.mu.tsEventTime = now
c.mu.tsProcessingTime = now
c.mu.err = err
c.mu.Unlock()
log.Warn("get time from pd failed, do not update time cache",
zap.Time("cachedTime", c.mu.tsEventTime),
zap.Time("processingTime", c.mu.tsProcessingTime),
zap.Error(err))
}
}
}
}

// CurrentTime returns approximate current time from pd.
func (c *clock) CurrentTime() (time.Time, error) {
func (c *clock) CurrentTime() time.Time {
c.mu.RLock()
defer c.mu.RUnlock()
tsEventTime := c.mu.tsEventTime
current := tsEventTime.Add(time.Since(c.mu.tsProcessingTime))
return current, errors.Trace(c.mu.err)
return current
}

// Stop clock.
Expand All @@ -132,8 +127,8 @@ func NewClock4Test() Clock {
return &clock4Test{}
}

func (c *clock4Test) CurrentTime() (time.Time, error) {
return time.Now(), nil
func (c *clock4Test) CurrentTime() time.Time {
return time.Now()
}

func (c *clock4Test) Run(ctx context.Context) {
Expand Down
8 changes: 3 additions & 5 deletions pkg/pdutil/clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,11 @@ func TestTimeFromPD(t *testing.T) {
defer clock.Stop()
time.Sleep(1 * time.Second)

t1, err := clock.CurrentTime()
require.Nil(t, err)
t1 := clock.CurrentTime()

time.Sleep(400 * time.Millisecond)
// assume that the gc safe point updated one hour ago
t2, err := clock.CurrentTime()
require.Nil(t, err)
t2 := clock.CurrentTime()
// should return new time
require.NotEqual(t, t1, t2)
}
Expand All @@ -70,7 +68,7 @@ func TestEventTimeAndProcessingTime(t *testing.T) {

sleep := time.Second
time.Sleep(sleep)
t1, err := clock.CurrentTime()
t1 := clock.CurrentTime()
now := time.Now()
require.Nil(t, err)
require.Less(t, now.Sub(t1), sleep/2)
Expand Down
15 changes: 3 additions & 12 deletions pkg/txnutil/gc/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ func (m *gcManager) CheckStaleCheckpointTs(
) error {
gcSafepointUpperBound := checkpointTs - 1
if m.isTiCDCBlockGC {
pdTime, err := m.pdClock.CurrentTime()
if err != nil {
return err
}
pdTime := m.pdClock.CurrentTime()
if pdTime.Sub(
oracle.GetTimeFromTS(gcSafepointUpperBound),
) > time.Duration(m.gcTTL)*time.Second {
Expand All @@ -143,14 +140,8 @@ func (m *gcManager) CheckStaleCheckpointTs(
func (m *gcManager) IgnoreFailedChangeFeed(
checkpointTs uint64,
) bool {
pdTime, err := m.pdClock.CurrentTime()
if err != nil {
log.Warn("failed to get ts",
zap.String("GcManagerID", m.gcServiceID),
zap.Error(err),
)
return false
}
pdTime := m.pdClock.CurrentTime()

// ignore the changefeed if its current checkpoint TS is earlier
// than the (currentPDTso - failedFeedDataRetentionTime).
gcSafepointUpperBound := checkpointTs - 1
Expand Down

0 comments on commit aa97333

Please sign in to comment.