Skip to content

Commit

Permalink
mcs: use getClusterInfo to check whether api service is ready (tikv#6422
Browse files Browse the repository at this point in the history
)

ref tikv#5836

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent cce54ad commit b1114b1
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 6 deletions.
62 changes: 62 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ import (
"time"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/pkg/errors"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
bs "github.com/tikv/pd/pkg/basicserver"
Expand Down Expand Up @@ -66,6 +69,11 @@ const (
// tsoSvcRootPathFormat defines the root path for all etcd paths used for different purposes.
// format: "/ms/{cluster_id}/tso".
tsoSvcRootPathFormat = msServiceRootPath + "/%d/" + mcsutils.TSOServiceName

// maxRetryTimesWaitAPIService is the max retry times for initializing the cluster ID.
maxRetryTimesWaitAPIService = 60
// retryIntervalWaitAPIService is the interval to retry.
retryIntervalWaitAPIService = 3 * time.Second
)

var _ bs.Server = (*Server)(nil)
Expand Down Expand Up @@ -147,6 +155,15 @@ func (s *Server) GetAddr() string {

// Run runs the TSO server.
func (s *Server) Run() error {
skipWaitAPIServiceReady := false
failpoint.Inject("skipWaitAPIServiceReady", func() {
skipWaitAPIServiceReady = true
})
if !skipWaitAPIServiceReady {
if err := s.waitAPIServiceReady(); err != nil {
return err
}
}
go systimemon.StartMonitor(s.ctx, time.Now, func() {
log.Error("system time jumps backward", errs.ZapError(errs.ErrIncorrectSystemTime))
timeJumpBackCounter.Inc()
Expand Down Expand Up @@ -517,6 +534,51 @@ func (s *Server) startServer() (err error) {
return nil
}

func (s *Server) waitAPIServiceReady() error {
for i := 0; i < maxRetryTimesWaitAPIService; i++ {
ready, err := s.isAPIServiceReady()
if err != nil {
log.Warn("failed to check api server ready", errs.ZapError(err))
}
if ready {
return nil
}
select {
case <-s.ctx.Done():
return errors.New("context canceled while waiting api server ready")
case <-time.After(retryIntervalWaitAPIService):
log.Debug("api server is not ready, retrying")
}
}
return errors.Errorf("failed to wait api server ready after retrying %d times", maxRetryTimesWaitAPIService)
}

func (s *Server) isAPIServiceReady() (bool, error) {
urls := strings.Split(s.cfg.BackendEndpoints, ",")
if len(urls) == 0 {
return false, errors.New("no backend endpoints")
}
cc, err := s.GetDelegateClient(s.ctx, urls[0])
if err != nil {
return false, err
}
clusterInfo, err := pdpb.NewPDClient(cc).GetClusterInfo(s.ctx, &pdpb.GetClusterInfoRequest{})
if err != nil {
return false, err
}
if clusterInfo.GetHeader().GetError() != nil {
return false, errors.Errorf(clusterInfo.GetHeader().GetError().String())
}
modes := clusterInfo.ServiceModes
if len(modes) == 0 {
return false, errors.New("no service mode")
}
if modes[0] == pdpb.ServiceMode_API_SVC_MODE {
return true, nil
}
return false, nil
}

// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *Config) *Server {
svr := &Server{
Expand Down
11 changes: 7 additions & 4 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,22 @@ func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Asser
return s, cleanup
}

// StartSingleTSOTestServer creates and starts a tso server with default config for testing.
func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func()) {
// StartSingleTSOTestServerWithoutCheck creates and starts a tso server with default config for testing.
func StartSingleTSOTestServerWithoutCheck(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func(), error) {
cfg := tso.NewConfig()
cfg.BackendEndpoints = backendEndpoints
cfg.ListenAddr = listenAddrs
cfg, err := tso.GenerateConfig(cfg)
re.NoError(err)

// Setup the logger.
err = InitLogger(cfg)
re.NoError(err)
return NewTSOTestServer(ctx, cfg)
}

s, cleanup, err := NewTSOTestServer(ctx, cfg)
// StartSingleTSOTestServer creates and starts a tso server with default config for testing.
func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func()) {
s, cleanup, err := StartSingleTSOTestServerWithoutCheck(ctx, re, backendEndpoints, listenAddrs)
re.NoError(err)
testutil.Eventually(re, func() bool {
return !s.IsClosed()
Expand Down
52 changes: 52 additions & 0 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ func (suite *tsoServerTestSuite) TestParticipantStartWithAdvertiseListenAddr() {
func TestTSOPath(t *testing.T) {
re := require.New(t)
checkTSOPath(re, true /*isAPIServiceMode*/)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/tso/server/skipWaitAPIServiceReady", "return(true)"))
checkTSOPath(re, false /*isAPIServiceMode*/)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/tso/server/skipWaitAPIServiceReady"))
}

func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) {
Expand Down Expand Up @@ -210,6 +212,56 @@ func getEtcdTimestampKeyNum(re *require.Assertions, client *clientv3.Client) int
return count
}

func TestWaitAPIServiceReady(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

startCluster := func(isAPIServiceMode bool) (cluster *tests.TestCluster, backendEndpoints string) {
var err error
if isAPIServiceMode {
cluster, err = tests.NewTestAPICluster(ctx, 1)
} else {
cluster, err = tests.NewTestCluster(ctx, 1)
}
re.NoError(err)
err = cluster.RunInitialServers()
re.NoError(err)
leaderName := cluster.WaitLeader()
pdLeader := cluster.GetServer(leaderName)
return cluster, pdLeader.GetAddr()
}

// tso server cannot be started because the pd server is not ready as api service.
cluster, backendEndpoints := startCluster(false /*isAPIServiceMode*/)
sctx, scancel := context.WithTimeout(ctx, time.Second*10)
defer scancel()
s, _, err := mcs.StartSingleTSOTestServerWithoutCheck(sctx, re, backendEndpoints, tempurl.Alloc())
re.Error(err)
re.Nil(s)
cluster.Destroy()

// tso server can be started because the pd server is ready as api service.
cluster, backendEndpoints = startCluster(true /*isAPIServiceMode*/)
sctx, scancel = context.WithTimeout(ctx, time.Second*10)
defer scancel()
s, cleanup, err := mcs.StartSingleTSOTestServerWithoutCheck(sctx, re, backendEndpoints, tempurl.Alloc())
re.NoError(err)
defer cluster.Destroy()
defer cleanup()

for i := 0; i < 12; i++ {
select {
case <-time.After(time.Second):
case <-sctx.Done():
return
}
if s != nil && s.IsServing() {
break
}
}
}

type APIServerForwardTestSuite struct {
suite.Suite
ctx context.Context
Expand Down
8 changes: 6 additions & 2 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,13 @@ func TestMixedTSODeployment(t *testing.T) {
re := require.New(t)

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
defer re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)"))
defer re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateServiceMode"))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/tso/server/skipWaitAPIServiceReady", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateServiceMode"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/tso/server/skipWaitAPIServiceReady"))
}()

ctx, cancel := context.WithCancel(context.Background())
cluster, err := tests.NewTestCluster(ctx, 1)
Expand Down

0 comments on commit b1114b1

Please sign in to comment.