Skip to content

Commit

Permalink
Merge branch 'master' into disksing/metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Sep 25, 2023
2 parents 8ede4a5 + 7e6cd0a commit 179676d
Showing 1 changed file with 34 additions and 10 deletions.
44 changes: 34 additions & 10 deletions pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -156,6 +157,7 @@ func (t *timestampOracle) GetTimestampPath() string {

// SyncTimestamp is used to synchronize the timestamp.
func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error {
log.Info("start to sync timestamp", logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0))
t.metrics.syncEvent.Inc()

failpoint.Inject("delaySyncTimestamp", func() {
Expand All @@ -167,9 +169,20 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error {
return err
}
lastSavedTime := t.getLastSavedTime()
// If `lastSavedTime` is not zero, it means that the `timestampOracle` has already been initialized
// before, so we could safely skip the sync if `lastSavedTime` is equal to `last`.
if lastSavedTime != typeutil.ZeroTime && typeutil.SubRealTimeByWallClock(lastSavedTime, last) == 0 {
// We could skip the synchronization if the following conditions are met:
// 1. The timestamp in memory has been initialized.
// 2. The last saved timestamp in etcd is not zero.
// 3. The last saved timestamp in memory is not zero.
// 4. The last saved timestamp in etcd is equal to the last saved timestamp in memory.
// 1 is to ensure the timestamp in memory could always be initialized. 2-4 are to ensure
// the synchronization could be skipped safely.
if t.isInitialized() &&
last != typeutil.ZeroTime &&
lastSavedTime != typeutil.ZeroTime &&
typeutil.SubRealTimeByWallClock(last, lastSavedTime) == 0 {
log.Info("skip sync timestamp",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.Time("last", last), zap.Time("last-saved", lastSavedTime))
t.metrics.skipSyncEvent.Inc()
return nil
}
Expand All @@ -185,7 +198,10 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error {
// the timestamp allocation will start from the saved etcd timestamp temporarily.
if typeutil.SubRealTimeByWallClock(next, last) < UpdateTimestampGuard {
log.Warn("system time may be incorrect",
zap.Time("last", last), zap.Time("next", next), errs.ZapError(errs.ErrIncorrectSystemTime))
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.Time("last", last), zap.Time("last-saved", lastSavedTime),
zap.Time("next", next),
errs.ZapError(errs.ErrIncorrectSystemTime))
next = last.Add(UpdateTimestampGuard)
}
failpoint.Inject("failedToSaveTimestamp", func() {
Expand All @@ -197,11 +213,14 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error {
t.metrics.errSaveSyncTSEvent.Inc()
return err
}
t.metrics.syncSaveDuration.Observe(time.Since(start).Seconds())
t.lastSavedTime.Store(save)
t.metrics.syncSaveDuration.Observe(time.Since(start).Seconds())

t.metrics.syncOKEvent.Inc()
log.Info("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next))
log.Info("sync and save timestamp",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.Time("last", last), zap.Time("last-saved", lastSavedTime),
zap.Time("save", save), zap.Time("next", next))
// save into memory
t.setTSOPhysical(next, true)
return nil
Expand Down Expand Up @@ -267,8 +286,8 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi
t.metrics.errSaveResetTSEvent.Inc()
return err
}
t.metrics.resetSaveDuration.Observe(time.Since(start).Seconds())
t.lastSavedTime.Store(save)
t.metrics.resetSaveDuration.Observe(time.Since(start).Seconds())
}
// save into memory only if nextPhysical or nextLogical is greater.
t.tsoMux.physical = nextPhysical
Expand Down Expand Up @@ -313,6 +332,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error
jetLag := typeutil.SubRealTimeByWallClock(now, prevPhysical)
if jetLag > 3*t.updatePhysicalInterval && jetLag > jetLagWarningThreshold {
log.Warn("clock offset",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.Duration("jet-lag", jetLag),
zap.Time("prev-physical", prevPhysical),
zap.Time("now", now),
Expand All @@ -331,7 +351,9 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error
} else if prevLogical > maxLogical/2 {
// The reason choosing maxLogical/2 here is that it's big enough for common cases.
// Because there is enough timestamp can be allocated before next update.
log.Warn("the logical time may be not enough", zap.Int64("prev-logical", prevLogical))
log.Warn("the logical time may be not enough",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.Int64("prev-logical", prevLogical))
next = prevPhysical.Add(time.Millisecond)
} else {
// It will still use the previous physical time to alloc the timestamp.
Expand All @@ -346,14 +368,15 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error
start := time.Now()
if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil {
log.Warn("save timestamp failed",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.String("dc-location", t.dcLocation),
zap.String("timestamp-path", t.GetTimestampPath()),
zap.Error(err))
t.metrics.errSaveUpdateTSEvent.Inc()
return err
}
t.metrics.updateSaveDuration.Observe(time.Since(start).Seconds())
t.lastSavedTime.Store(save)
t.metrics.updateSaveDuration.Observe(time.Since(start).Seconds())
}
// save into memory
t.setTSOPhysical(next, false)
Expand Down Expand Up @@ -388,6 +411,7 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader
}
if resp.GetLogical() >= maxLogical {
log.Warn("logical part outside of max logical interval, please check ntp time, or adjust config item `tso-update-physical-interval`",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.Reflect("response", resp),
zap.Int("retry-count", i), errs.ZapError(errs.ErrLogicOverflow))
t.metrics.logicalOverflowEvent.Inc()
Expand All @@ -409,7 +433,7 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader
func (t *timestampOracle) ResetTimestamp() {
t.tsoMux.Lock()
defer t.tsoMux.Unlock()
log.Info("reset the timestamp in memory")
log.Info("reset the timestamp in memory", logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0))
t.tsoMux.physical = typeutil.ZeroTime
t.tsoMux.logical = 0
t.tsoMux.updateTime = typeutil.ZeroTime
Expand Down

0 comments on commit 179676d

Please sign in to comment.