From 154689e591c2d163fa26b54d78a1c88c1d03ffbb Mon Sep 17 00:00:00 2001 From: rleungx Date: Thu, 2 Aug 2018 21:27:24 +0800 Subject: [PATCH] refactor updateTimestamp according to the comment --- server/tso.go | 97 +++++++++++++-------------------------------------- 1 file changed, 25 insertions(+), 72 deletions(-) diff --git a/server/tso.go b/server/tso.go index 571b0c5d3a40..56015d08cba5 100644 --- a/server/tso.go +++ b/server/tso.go @@ -82,30 +82,24 @@ func (s *Server) syncTimestamp() error { return errors.Trace(err) } - now := time.Now() + next := time.Now() // If the current system time minus the saved etcd timestamp is less than updateTimestampGuard, // the timestamp allocation will start from the saved etcd timestamp temporarily. - if subTimeByWallClock(now, last) < updateTimestampGuard { - log.Errorf("system time may be incorrect, last: %v, now: %v", last, now) - last = last.Add(updateTimestampGuard) - save := last.Add(s.cfg.TsoSaveInterval.Duration) - if err = s.saveTimestamp(save); err != nil { - return errors.Trace(err) - } - s.ts.Store(&atomicObject{physical: last}) - return nil + if subTimeByWallClock(next, last) < updateTimestampGuard { + log.Errorf("system time may be incorrect: last: %v next %v", last, next) + next = next.Add(updateTimestampGuard) } - save := now.Add(s.cfg.TsoSaveInterval.Duration) + save := next.Add(s.cfg.TsoSaveInterval.Duration) if err = s.saveTimestamp(save); err != nil { return errors.Trace(err) } tsoCounter.WithLabelValues("sync_ok").Inc() - log.Infof("sync and save timestamp: last %v save %v", last, save) + log.Infof("sync and save timestamp: last %v save %v next %v", last, save, next) current := &atomicObject{ - physical: now, + physical: next, } s.ts.Store(current) @@ -118,76 +112,36 @@ func (s *Server) updateTimestamp() error { tsoCounter.WithLabelValues("save").Inc() - since := subTimeByWallClock(now, prev.physical) - if since > 3*updateTimestampStep { - log.Warnf("clock offset: %v, prev: %v, now: %v", since, prev.physical, now) + jetLag := subTimeByWallClock(now, prev.physical) + if jetLag > 3*updateTimestampStep { + log.Warnf("clock offset: %v, prev: %v, now: %v", jetLag, prev.physical, now) tsoCounter.WithLabelValues("slow_save").Inc() } - // Check if the logical is still enough. - notEnough := atomic.LoadInt64(&prev.logical) > maxLogical/2 + var next time.Time + if jetLag > updateTimestampGuard { + next = now + } else if atomic.LoadInt64(&prev.logical) > maxLogical/2 { + next = prev.physical.Add(time.Millisecond) + } else { + return nil + } saved := s.lastSavedTime - // This is a normal situation. - if since > 0 { - // Avoid the same physical timestamp. - if since <= updateTimestampGuard { - tsoCounter.WithLabelValues("skip_save").Inc() - log.Warnf("invalid physical timestamp, prev: %v, now: %v, re-update later", prev.physical, now) - return nil - } - // If the time window is no more than updateTimestampGuard, - // it will adjust the time window. - if subTimeByWallClock(saved, now) <= updateTimestampGuard { - last := saved - save := now.Add(s.cfg.TsoSaveInterval.Duration) - if err := s.saveTimestamp(save); err != nil { - return errors.Trace(err) - } - - tsoCounter.WithLabelValues("save_ok").Inc() - log.Debugf("save timestamp ok: prev %v last %v save %v", prev.physical, last, save) - } else { - if notEnough { - last := &atomicObject{ - physical: now.Add(time.Millisecond), - logical: 0, - } - s.ts.Store(last) - return nil - } - } - } else { - if subTimeByWallClock(saved, prev.physical) <= updateTimestampGuard { - last := saved - save := prev.physical.Add(s.cfg.TsoSaveInterval.Duration) - if err := s.saveTimestamp(save); err != nil { - return errors.Trace(err) - } - - tsoCounter.WithLabelValues("save_incorrect").Inc() - log.Debugf("save timestamp incorrect: now %v prev %v last %v save %v", now, prev.physical, last, save) - } else { - if notEnough { - last := &atomicObject{ - physical: prev.physical.Add(time.Millisecond), - logical: 0, - } - s.ts.Store(last) - return nil - } + if subTimeByWallClock(saved, next) <= updateTimestampGuard { + save := next.Add(s.cfg.TsoSaveInterval.Duration) + if err := s.saveTimestamp(save); err != nil { + return errors.Trace(err) } - // Since the physical timestamp is greater than the current system time, - // the physical timestamp will be used to alloc timestamp temporarily. - return nil } current := &atomicObject{ - physical: now, + physical: next, + logical: 0, } s.ts.Store(current) - metadataGauge.WithLabelValues("tso").Set(float64(current.physical.Unix())) + metadataGauge.WithLabelValues("tso").Set(float64(next.Unix())) return nil } @@ -206,7 +160,6 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) { resp.Physical = current.physical.UnixNano() / int64(time.Millisecond) resp.Logical = atomic.AddInt64(¤t.logical, int64(count)) - if resp.Logical >= maxLogical { log.Errorf("logical part outside of max logical interval %v, please check ntp time, retry count %d", resp, i) time.Sleep(updateTimestampStep)