Skip to content

Commit

Permalink
add the metrics and the test
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx committed Aug 6, 2018
1 parent 97086a6 commit 8ae950c
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 11 deletions.
6 changes: 6 additions & 0 deletions server/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (s *Server) syncTimestamp() error {
// Here is some constraints that this function must satisfy:
// 1. The physical time is monotonically increasing.
// 2. The saved time is monotonically increasing.
// 3. The physical time is always less than the saved timestamp.
func (s *Server) updateTimestamp() error {
prev := s.ts.Load().(*atomicObject)
now := time.Now()
Expand All @@ -127,6 +128,10 @@ func (s *Server) updateTimestamp() error {
tsoCounter.WithLabelValues("slow_save").Inc()
}

if jetLag < 0 {
tsoCounter.WithLabelValues("system_time_slow").Inc()
}

var next time.Time
prevLogical := atomic.LoadInt64(&prev.logical)
// If the system time is greater, it will be synchronized with the system time.
Expand Down Expand Up @@ -179,6 +184,7 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) {
resp.Logical = atomic.AddInt64(&current.logical, int64(count))
if resp.Logical >= maxLogical {
log.Errorf("logical part outside of max logical interval %v, please check ntp time, retry count %d", resp, i)
tsoCounter.WithLabelValues("logical_overflow").Inc()
time.Sleep(updateTimestampStep)
continue
}
Expand Down
97 changes: 86 additions & 11 deletions server/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,82 @@ func (s *testTsoSuite) testGetTimestamp(c *C, n int) *pdpb.Timestamp {
return res
}

func mustGetLeader(c *C, client *clientv3.Client, leaderPath string) *pdpb.Member {
for i := 0; i < 20; i++ {
leader, err := getLeader(client, leaderPath)
c.Assert(err, IsNil)
if leader != nil {
return leader
}
time.Sleep(500 * time.Millisecond)
func (s *testTsoSuite) TestTso(c *C) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()

last := &pdpb.Timestamp{
Physical: 0,
Logical: 0,
}

for j := 0; j < 50; j++ {
ts := s.testGetTimestamp(c, 10)
c.Assert(ts.GetPhysical(), Not(Less), last.GetPhysical())
if ts.GetPhysical() == last.GetPhysical() {
c.Assert(ts.GetLogical(), Greater, last.GetLogical())
}
last = ts
time.Sleep(10 * time.Millisecond)
}
}()
}

c.Fatal("get leader error")
return nil
wg.Wait()
}

func (s *testTsoSuite) TestTso(c *C) {
var _ = Suite(&testTimeFallBackSuite{})

type testTimeFallBackSuite struct {
client *clientv3.Client
svr *Server
cleanup cleanupFunc
grpcPDClient pdpb.PDClient
}

func (s *testTimeFallBackSuite) SetUpSuite(c *C) {
s.svr, s.cleanup = mustRunTestServer(c)
s.client = s.svr.client
mustWaitLeader(c, []*Server{s.svr})
s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr())
err := s.svr.saveTimestamp(time.Now().Add(time.Hour))
c.Assert(err, IsNil)
s.svr.Close()
err = s.svr.Run(context.TODO())
c.Assert(err, IsNil)
mustWaitLeader(c, []*Server{s.svr})
}

func (s *testTimeFallBackSuite) TearDownSuite(c *C) {
s.cleanup()
}

func (s *testTimeFallBackSuite) testGetTimestamp(c *C, n int) *pdpb.Timestamp {
req := &pdpb.TsoRequest{
Header: newRequestHeader(s.svr.clusterID),
Count: uint32(n),
}

tsoClient, err := s.grpcPDClient.Tso(context.Background())
c.Assert(err, IsNil)
defer tsoClient.CloseSend()
err = tsoClient.Send(req)
c.Assert(err, IsNil)
resp, err := tsoClient.Recv()
c.Assert(err, IsNil)
c.Assert(resp.GetCount(), Equals, uint32(n))

res := resp.GetTimestamp()
c.Assert(res.GetLogical(), Greater, int64(0))
c.Assert(res.GetPhysical(), Greater, time.Now().UnixNano()/int64(time.Millisecond))

return res
}

func (s *testTimeFallBackSuite) TestTimeFallBack(c *C) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
Expand All @@ -104,3 +165,17 @@ func (s *testTsoSuite) TestTso(c *C) {

wg.Wait()
}

func mustGetLeader(c *C, client *clientv3.Client, leaderPath string) *pdpb.Member {
for i := 0; i < 20; i++ {
leader, err := getLeader(client, leaderPath)
c.Assert(err, IsNil)
if leader != nil {
return leader
}
time.Sleep(500 * time.Millisecond)
}

c.Fatal("get leader error")
return nil
}

0 comments on commit 8ae950c

Please sign in to comment.