Skip to content

Commit

Permalink
server: alloc timestamp after recover (#1173)
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored and disksing committed Aug 9, 2018
1 parent 9ab5f9f commit aea8e1c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 33 deletions.
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func (s *Server) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSa
}
log.Infof("updated gc safe point to %d", newSafePoint)
} else if newSafePoint < oldSafePoint {
log.Warn("trying to update gc safe point from %d to %d", oldSafePoint, newSafePoint)
log.Warnf("trying to update gc safe point from %d to %d", oldSafePoint, newSafePoint)
newSafePoint = oldSafePoint
}

Expand Down
84 changes: 52 additions & 32 deletions server/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func (s *Server) loadTimestamp() (time.Time, error) {

// save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it,
// otherwise, update it.
func (s *Server) saveTimestamp(now time.Time) error {
data := uint64ToBytes(uint64(now.UnixNano()))
func (s *Server) saveTimestamp(ts time.Time) error {
data := uint64ToBytes(uint64(ts.UnixNano()))
key := s.getTimestampPath()

resp, err := s.leaderTxn().Then(clientv3.OpPut(key, string(data))).Commit()
Expand All @@ -69,7 +69,7 @@ func (s *Server) saveTimestamp(now time.Time) error {
return errors.New("save timestamp failed, maybe we lost leader")
}

s.lastSavedTime = now
s.lastSavedTime = ts

return nil
}
Expand All @@ -82,69 +82,88 @@ func (s *Server) syncTimestamp() error {
return errors.Trace(err)
}

var now time.Time

for {
now = time.Now()
if wait := subTimeByWallClock(last, now) + updateTimestampGuard; wait > 0 {
tsoCounter.WithLabelValues("sync_wait").Inc()
log.Warnf("wait %v to guarantee valid generated timestamp", wait)
time.Sleep(wait)
continue
}
break
next := time.Now()
// If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`,
// the timestamp allocation will start from the saved etcd timestamp temporarily.
if subTimeByWallClock(next, last) < updateTimestampGuard {
log.Errorf("system time may be incorrect: last: %v next %v", last, next)
next = last.Add(updateTimestampGuard)
}

save := now.Add(s.cfg.TsoSaveInterval.Duration)
save := next.Add(s.cfg.TsoSaveInterval.Duration)
if err = s.saveTimestamp(save); err != nil {
return errors.Trace(err)
}

tsoCounter.WithLabelValues("sync_ok").Inc()
log.Infof("sync and save timestamp: last %v save %v", last, save)
log.Infof("sync and save timestamp: last %v save %v next %v", last, save, next)

current := &atomicObject{
physical: now,
physical: next,
}
s.ts.Store(current)

return nil
}

// This function will do two things:
// 1. When the logical time is going to be used up, the current physical time needs to increase.
// 2. If the time window is not enough, which means the saved etcd time minus the next physical time
// is less than or equal to `updateTimestampGuard`, it will need to be updated and save the
// next physical time plus `TsoSaveInterval` into etcd.
//
// Here is some constraints that this function must satisfy:
// 1. The physical time is monotonically increasing.
// 2. The saved time is monotonically increasing.
// 3. The physical time is always less than the saved timestamp.
func (s *Server) updateTimestamp() error {
prev := s.ts.Load().(*atomicObject).physical
prev := s.ts.Load().(*atomicObject)
now := time.Now()

tsoCounter.WithLabelValues("save").Inc()

since := subTimeByWallClock(now, prev)
if since > 3*updateTimestampStep {
log.Warnf("clock offset: %v, prev: %v, now: %v", since, prev, now)
jetLag := subTimeByWallClock(now, prev.physical)
if jetLag > 3*updateTimestampStep {
log.Warnf("clock offset: %v, prev: %v, now: %v", jetLag, prev.physical, now)
tsoCounter.WithLabelValues("slow_save").Inc()
}
// Avoid the same physical time stamp
if since <= updateTimestampGuard {

if jetLag < 0 {
tsoCounter.WithLabelValues("system_time_slow").Inc()
}

var next time.Time
prevLogical := atomic.LoadInt64(&prev.logical)
// If the system time is greater, it will be synchronized with the system time.
if jetLag > updateTimestampGuard {
next = now
} else if prevLogical > maxLogical/2 {
// The reason choosing maxLogical/2 here is that it's big enough for common cases.
// Because there is enough timestamp can be allocated before next update.
log.Warnf("the logical time may be not enough, prevLogical: %v", prevLogical)
next = prev.physical.Add(time.Millisecond)
} else {
// It will still use the previous physical time to alloc the timestamp.
tsoCounter.WithLabelValues("skip_save").Inc()
log.Warnf("invalid physical timestamp, prev: %v, now: %v, re-update later", prev, now)
return nil
}

if subTimeByWallClock(now, s.lastSavedTime) >= 0 {
last := s.lastSavedTime
save := now.Add(s.cfg.TsoSaveInterval.Duration)
// It is not safe to increase the physical time to `next`.
// The time window needs to be updated and saved to etcd.
if subTimeByWallClock(s.lastSavedTime, next) <= updateTimestampGuard {
save := next.Add(s.cfg.TsoSaveInterval.Duration)
if err := s.saveTimestamp(save); err != nil {
return errors.Trace(err)
}

tsoCounter.WithLabelValues("save_ok").Inc()
log.Debugf("save timestamp ok: prev %v last %v save %v", prev, last, save)
}

current := &atomicObject{
physical: now,
physical: next,
logical: 0,
}

s.ts.Store(current)
metadataGauge.WithLabelValues("tso").Set(float64(now.Unix()))
metadataGauge.WithLabelValues("tso").Set(float64(next.Unix()))

return nil
}
Expand All @@ -165,6 +184,7 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) {
resp.Logical = atomic.AddInt64(&current.logical, int64(count))
if resp.Logical >= maxLogical {
log.Errorf("logical part outside of max logical interval %v, please check ntp time, retry count %d", resp, i)
tsoCounter.WithLabelValues("logical_overflow").Inc()
time.Sleep(updateTimestampStep)
continue
}
Expand Down

0 comments on commit aea8e1c

Please sign in to comment.