diff --git a/server/grpc_service.go b/server/grpc_service.go index ac8b85ac99c..3d1d4b727c4 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -564,7 +564,7 @@ func (s *Server) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSa } log.Infof("updated gc safe point to %d", newSafePoint) } else if newSafePoint < oldSafePoint { - log.Warn("trying to update gc safe point from %d to %d", oldSafePoint, newSafePoint) + log.Warnf("trying to update gc safe point from %d to %d", oldSafePoint, newSafePoint) newSafePoint = oldSafePoint } diff --git a/server/tso.go b/server/tso.go index fe978b858cd..872d9315691 100644 --- a/server/tso.go +++ b/server/tso.go @@ -57,8 +57,8 @@ func (s *Server) loadTimestamp() (time.Time, error) { // save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it, // otherwise, update it. -func (s *Server) saveTimestamp(now time.Time) error { - data := uint64ToBytes(uint64(now.UnixNano())) +func (s *Server) saveTimestamp(ts time.Time) error { + data := uint64ToBytes(uint64(ts.UnixNano())) key := s.getTimestampPath() resp, err := s.leaderTxn().Then(clientv3.OpPut(key, string(data))).Commit() @@ -69,7 +69,7 @@ func (s *Server) saveTimestamp(now time.Time) error { return errors.New("save timestamp failed, maybe we lost leader") } - s.lastSavedTime = now + s.lastSavedTime = ts return nil } @@ -82,69 +82,88 @@ func (s *Server) syncTimestamp() error { return errors.Trace(err) } - var now time.Time - - for { - now = time.Now() - if wait := subTimeByWallClock(last, now) + updateTimestampGuard; wait > 0 { - tsoCounter.WithLabelValues("sync_wait").Inc() - log.Warnf("wait %v to guarantee valid generated timestamp", wait) - time.Sleep(wait) - continue - } - break + next := time.Now() + // If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`, + // the timestamp allocation will start from the saved etcd timestamp temporarily. + if subTimeByWallClock(next, last) < updateTimestampGuard { + log.Errorf("system time may be incorrect: last: %v next %v", last, next) + next = last.Add(updateTimestampGuard) } - save := now.Add(s.cfg.TsoSaveInterval.Duration) + save := next.Add(s.cfg.TsoSaveInterval.Duration) if err = s.saveTimestamp(save); err != nil { return errors.Trace(err) } tsoCounter.WithLabelValues("sync_ok").Inc() - log.Infof("sync and save timestamp: last %v save %v", last, save) + log.Infof("sync and save timestamp: last %v save %v next %v", last, save, next) current := &atomicObject{ - physical: now, + physical: next, } s.ts.Store(current) return nil } +// This function will do two things: +// 1. When the logical time is going to be used up, the current physical time needs to increase. +// 2. If the time window is not enough, which means the saved etcd time minus the next physical time +// is less than or equal to `updateTimestampGuard`, it will need to be updated and save the +// next physical time plus `TsoSaveInterval` into etcd. +// +// Here is some constraints that this function must satisfy: +// 1. The physical time is monotonically increasing. +// 2. The saved time is monotonically increasing. +// 3. The physical time is always less than the saved timestamp. func (s *Server) updateTimestamp() error { - prev := s.ts.Load().(*atomicObject).physical + prev := s.ts.Load().(*atomicObject) now := time.Now() tsoCounter.WithLabelValues("save").Inc() - since := subTimeByWallClock(now, prev) - if since > 3*updateTimestampStep { - log.Warnf("clock offset: %v, prev: %v, now: %v", since, prev, now) + jetLag := subTimeByWallClock(now, prev.physical) + if jetLag > 3*updateTimestampStep { + log.Warnf("clock offset: %v, prev: %v, now: %v", jetLag, prev.physical, now) tsoCounter.WithLabelValues("slow_save").Inc() } - // Avoid the same physical time stamp - if since <= updateTimestampGuard { + + if jetLag < 0 { + tsoCounter.WithLabelValues("system_time_slow").Inc() + } + + var next time.Time + prevLogical := atomic.LoadInt64(&prev.logical) + // If the system time is greater, it will be synchronized with the system time. + if jetLag > updateTimestampGuard { + next = now + } else if prevLogical > maxLogical/2 { + // The reason choosing maxLogical/2 here is that it's big enough for common cases. + // Because there is enough timestamp can be allocated before next update. + log.Warnf("the logical time may be not enough, prevLogical: %v", prevLogical) + next = prev.physical.Add(time.Millisecond) + } else { + // It will still use the previous physical time to alloc the timestamp. tsoCounter.WithLabelValues("skip_save").Inc() - log.Warnf("invalid physical timestamp, prev: %v, now: %v, re-update later", prev, now) return nil } - if subTimeByWallClock(now, s.lastSavedTime) >= 0 { - last := s.lastSavedTime - save := now.Add(s.cfg.TsoSaveInterval.Duration) + // It is not safe to increase the physical time to `next`. + // The time window needs to be updated and saved to etcd. + if subTimeByWallClock(s.lastSavedTime, next) <= updateTimestampGuard { + save := next.Add(s.cfg.TsoSaveInterval.Duration) if err := s.saveTimestamp(save); err != nil { return errors.Trace(err) } - - tsoCounter.WithLabelValues("save_ok").Inc() - log.Debugf("save timestamp ok: prev %v last %v save %v", prev, last, save) } current := &atomicObject{ - physical: now, + physical: next, + logical: 0, } + s.ts.Store(current) - metadataGauge.WithLabelValues("tso").Set(float64(now.Unix())) + metadataGauge.WithLabelValues("tso").Set(float64(next.Unix())) return nil } @@ -165,6 +184,7 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) { resp.Logical = atomic.AddInt64(¤t.logical, int64(count)) if resp.Logical >= maxLogical { log.Errorf("logical part outside of max logical interval %v, please check ntp time, retry count %d", resp, i) + tsoCounter.WithLabelValues("logical_overflow").Inc() time.Sleep(updateTimestampStep) continue }