Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed May 10, 2023
1 parent c020b28 commit 88a3be6
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 24 deletions.
23 changes: 12 additions & 11 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,29 +320,30 @@ func (c *tsoClient) getMinTS(ctx context.Context) (physical, logical int64, err
minTS *pdpb.Timestamp
keyspaceGroupsAsked uint32
)
emptyTS := &pdpb.Timestamp{}

if len(resps) != len(addrs) {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("failed to get min ts from all tso servers/pods")
}
for i := 0; i < len(resps); i++ {
if resps[i].KeyspaceGroupsTotal == 0 {
emptyTS := &pdpb.Timestamp{}
keyspaceGroupsTotal := resps[0].KeyspaceGroupsTotal
for _, resp := range resps {
if resp.KeyspaceGroupsTotal == 0 {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("the tso service has no keyspace group")
}
if i > 0 && resps[i-1].KeyspaceGroupsTotal != resps[i].KeyspaceGroupsTotal {
if resp.KeyspaceGroupsTotal != keyspaceGroupsTotal {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs(
"the tso service has inconsistent keyspace group total count")
}
keyspaceGroupsAsked += resps[i].KeyspaceGroupsTotal
if tsoutil.CompareTimestamp(resps[i].Timestamp, emptyTS) > 0 &&
(minTS == nil || tsoutil.CompareTimestamp(resps[i].Timestamp, minTS) < 0) {
minTS = resps[i].Timestamp
keyspaceGroupsAsked += resp.KeyspaceGroupsServing
if tsoutil.CompareTimestamp(resp.Timestamp, emptyTS) > 0 &&
(minTS == nil || tsoutil.CompareTimestamp(resp.Timestamp, minTS) < 0) {
minTS = resp.Timestamp
}
}

if keyspaceGroupsAsked != resps[0].KeyspaceGroupsTotal {
if keyspaceGroupsAsked != keyspaceGroupsTotal {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs(
"can't query all the tso keyspace groups")
fmt.Sprintf("can't query all the tso keyspace groups. Asked %d, expected %d",
keyspaceGroupsAsked, keyspaceGroupsTotal))
}

if minTS == nil {
Expand Down
10 changes: 7 additions & 3 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,13 @@ func (t *tsoServerDiscovery) getTSOServer(sd ServiceDiscovery) (string, error) {
return tsoServerAddr, nil
}

func (t *tsoServerDiscovery) countFailure() {
// countFailure increases the failure count by 1 and return true if the failure count
// reaches the number of TSO servers.
func (t *tsoServerDiscovery) countFailure() bool {
t.Lock()
defer t.Unlock()
t.failureCount++
return t.failureCount >= len(t.addrs)
}

func (t *tsoServerDiscovery) resetFailure() {
Expand Down Expand Up @@ -437,8 +440,9 @@ func (c *tsoServiceDiscovery) updateMember() error {
}
keyspaceGroup, err := c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout)
if err != nil {
c.tsoServerDiscovery.countFailure()
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))
if c.tsoServerDiscovery.countFailure() {
log.Error("[tso] failed to find the keyspace group", errs.ZapError(err))
}
return err
}
c.tsoServerDiscovery.resetFailure()
Expand Down
20 changes: 10 additions & 10 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,17 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
func (s *Service) FindGroupByKeyspaceID(
ctx context.Context, request *tsopb.FindGroupByKeyspaceIDRequest,
) (*tsopb.FindGroupByKeyspaceIDResponse, error) {
// if s.IsClosed() || s.keyspaceGroupManager == nil {
// return &tsopb.FindGroupByKeyspaceIDResponse{
// Header: s.wrapErrorToHeader(tsopb.ErrorType_NOT_BOOTSTRAPPED, ErrNotStarted.Error(), 0),
// }, nil
// }
if s.IsClosed() || s.keyspaceGroupManager == nil {
return &tsopb.FindGroupByKeyspaceIDResponse{
Header: s.wrapErrorToHeader(tsopb.ErrorType_NOT_BOOTSTRAPPED, ErrNotStarted.Error(), 0),
}, nil
}

// if request.GetHeader().GetClusterId() != s.clusterID {
// return &tsopb.FindGroupByKeyspaceIDResponse{
// Header: s.wrapErrorToHeader(tsopb.ErrorType_CLUSTER_MISMATCHED, ErrClusterMismatched.Error(), 0),
// }, nil
// }
if request.GetHeader().GetClusterId() != s.clusterID {
return &tsopb.FindGroupByKeyspaceIDResponse{
Header: s.wrapErrorToHeader(tsopb.ErrorType_CLUSTER_MISMATCHED, ErrClusterMismatched.Error(), 0),
}, nil
}

keyspaceID := request.GetKeyspaceId()
am, keyspaceGroup, keyspaceGroupID, err := s.keyspaceGroupManager.FindGroupByKeyspaceID(keyspaceID)
Expand Down
2 changes: 2 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,8 @@ func (kgm *KeyspaceGroupManager) GetMinTS(
return pdpb.Timestamp{}, kgAskedCount, kgTotalCount, nil
}

log.Info("get min ts!!!!!!!!!!!!!!!!!!!!", zap.String("dc-location", dcLocation), zap.Int64("min-ts-physical", minTS.Physical), zap.Int64("min-ts-logical", minTS.Logical))

return *minTS, kgAskedCount, kgTotalCount, nil
}

Expand Down
21 changes: 21 additions & 0 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,27 @@ func (suite *tsoClientTestSuite) TestGetTSAsync() {
wg.Wait()
}

func (suite *tsoClientTestSuite) TestGetMinTS() {
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber * len(suite.clients))
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
for _, client := range suite.clients {
go func(client pd.Client) {
defer wg.Done()
var lastTS uint64
for j := 0; j < tsoRequestRound; j++ {
physical, logical, err := client.GetMinTS(suite.ctx)
suite.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
suite.Less(lastTS, ts)
lastTS = ts
}
}(client)
}
}
wg.Wait()
}

// More details can be found in this issue: https://github.com/tikv/pd/issues/4884
func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
re := suite.Require()
Expand Down

0 comments on commit 88a3be6

Please sign in to comment.