Skip to content

Commit

Permalink
refactor updateTimestamp according to the comment
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx committed Aug 3, 2018
1 parent e9e5df7 commit fa214e9
Showing 1 changed file with 25 additions and 73 deletions.
98 changes: 25 additions & 73 deletions server/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,30 +82,24 @@ func (s *Server) syncTimestamp() error {
return errors.Trace(err)
}

now := time.Now()
next := time.Now()
// If the current system time minus the saved etcd timestamp is less than updateTimestampGuard,
// the timestamp allocation will start from the saved etcd timestamp temporarily.
if subTimeByWallClock(now, last) < updateTimestampGuard {
log.Errorf("system time may be incorrect, last: %v, now: %v", last, now)
last = last.Add(updateTimestampGuard)
save := last.Add(s.cfg.TsoSaveInterval.Duration)
if err = s.saveTimestamp(save); err != nil {
return errors.Trace(err)
}
s.ts.Store(&atomicObject{physical: last})
return nil
if subTimeByWallClock(next, last) < updateTimestampGuard {
log.Errorf("system time may be incorrect: last: %v next %v", last, next)
next = next.Add(updateTimestampGuard)
}

save := now.Add(s.cfg.TsoSaveInterval.Duration)
save := next.Add(s.cfg.TsoSaveInterval.Duration)
if err = s.saveTimestamp(save); err != nil {
return errors.Trace(err)
}

tsoCounter.WithLabelValues("sync_ok").Inc()
log.Infof("sync and save timestamp: last %v save %v", last, save)
log.Infof("sync and save timestamp: last %v save %v next %v", last, save, next)

current := &atomicObject{
physical: now,
physical: next,
}
s.ts.Store(current)

Expand All @@ -118,76 +112,35 @@ func (s *Server) updateTimestamp() error {

tsoCounter.WithLabelValues("save").Inc()

since := subTimeByWallClock(now, prev.physical)
if since > 3*updateTimestampStep {
log.Warnf("clock offset: %v, prev: %v, now: %v", since, prev.physical, now)
jetLag := subTimeByWallClock(now, prev.physical)
if jetLag > 3*updateTimestampStep {
log.Warnf("clock offset: %v, prev: %v, now: %v", jetLag, prev.physical, now)
tsoCounter.WithLabelValues("slow_save").Inc()
}

// Check if the logical is still enough.
notEnough := atomic.LoadInt64(&prev.logical) > maxLogical/2

saved := s.lastSavedTime
// This is a normal situation.
if since > 0 {
// Avoid the same physical timestamp.
if since <= updateTimestampGuard {
tsoCounter.WithLabelValues("skip_save").Inc()
log.Warnf("invalid physical timestamp, prev: %v, now: %v, re-update later", prev.physical, now)
return nil
}
// If the time window is no more than updateTimestampGuard,
// it will adjust the time window.
if subTimeByWallClock(saved, now) <= updateTimestampGuard {
last := saved
save := now.Add(s.cfg.TsoSaveInterval.Duration)
if err := s.saveTimestamp(save); err != nil {
return errors.Trace(err)
}

tsoCounter.WithLabelValues("save_ok").Inc()
log.Debugf("save timestamp ok: prev %v last %v save %v", prev.physical, last, save)
} else {
if notEnough {
last := &atomicObject{
physical: now.Add(time.Millisecond),
logical: 0,
}
s.ts.Store(last)
return nil
}
}
var next time.Time
if jetLag > updateTimestampGuard {
next = now
} else if atomic.LoadInt64(&prev.logical) > maxLogical/2 {
next = prev.physical.Add(time.Millisecond)
} else {
if subTimeByWallClock(saved, prev.physical) <= updateTimestampGuard {
last := saved
save := prev.physical.Add(s.cfg.TsoSaveInterval.Duration)
if err := s.saveTimestamp(save); err != nil {
return errors.Trace(err)
}

tsoCounter.WithLabelValues("save_incorrect").Inc()
log.Debugf("save timestamp incorrect: now %v prev %v last %v save %v", now, prev.physical, last, save)
} else {
if notEnough {
last := &atomicObject{
physical: prev.physical.Add(time.Millisecond),
logical: 0,
}
s.ts.Store(last)
return nil
}
}
// Since the physical timestamp is greater than the current system time,
// the physical timestamp will be used to alloc timestamp temporarily.
return nil
}

if subTimeByWallClock(s.lastSavedTime, next) <= updateTimestampGuard {
save := next.Add(s.cfg.TsoSaveInterval.Duration)
if err := s.saveTimestamp(save); err != nil {
return errors.Trace(err)
}
}

current := &atomicObject{
physical: now,
physical: next,
logical: 0,
}

s.ts.Store(current)
metadataGauge.WithLabelValues("tso").Set(float64(current.physical.Unix()))
metadataGauge.WithLabelValues("tso").Set(float64(next.Unix()))

return nil
}
Expand All @@ -206,7 +159,6 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) {

resp.Physical = current.physical.UnixNano() / int64(time.Millisecond)
resp.Logical = atomic.AddInt64(&current.logical, int64(count))

if resp.Logical >= maxLogical {
log.Errorf("logical part outside of max logical interval %v, please check ntp time, retry count %d", resp, i)
time.Sleep(updateTimestampStep)
Expand Down

0 comments on commit fa214e9

Please sign in to comment.