Skip to content

Commit

Permalink
etcd (ticdc): add grpc keepalive params and add timeout for check pd …
Browse files Browse the repository at this point in the history
…version ctx. (#9106)

close #8808
  • Loading branch information
asddongmen authored Jun 1, 2023
1 parent 156d1d5 commit e1826b3
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 6 deletions.
1 change: 1 addition & 0 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (c *captureImpl) reset(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
log.Info("reset session successfully", zap.Any("session", sess))

c.captureMu.Lock()
defer c.captureMu.Unlock()
Expand Down
18 changes: 15 additions & 3 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)

const (
Expand Down Expand Up @@ -138,6 +139,7 @@ func (s *server) prepare(ctx context.Context) error {
logConfig := logutil.DefaultZapLoggerConfig
logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel)

log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints))
// we do not pass a `context` to the etcd client,
// to prevent it's cancelled when the server is closing.
// For example, when the non-owner node goes offline,
Expand All @@ -164,6 +166,10 @@ func (s *server) prepare(ctx context.Context) error {
},
MinConnectTimeout: 3 * time.Second,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 20 * time.Second,
}),
},
})
if err != nil {
Expand Down Expand Up @@ -289,9 +295,6 @@ func (s *server) startStatusHTTP(serverCtx context.Context, lis net.Listener) er
}

func (s *server) etcdHealthChecker(ctx context.Context) error {
ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()

conf := config.GetGlobalServerConfig()
grpcClient, err := pd.NewClientWithContext(ctx, s.pdEndpoints, conf.Security.PDSecurityOption())
if err != nil {
Expand All @@ -303,6 +306,9 @@ func (s *server) etcdHealthChecker(ctx context.Context) error {
}
defer pc.Close()

ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -324,6 +330,12 @@ func (s *server) etcdHealthChecker(ctx context.Context) error {
Observe(time.Since(start).Seconds())
cancel()
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = s.etcdClient.GetEtcdClient().Unwrap().MemberList(ctx)
cancel()
if err != nil {
log.Warn("etcd health check error, fail to list etcd members", zap.Error(err))
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var (
)

// set to var instead of const for mocking the value to speedup test
var maxTries uint64 = 8
var maxTries uint64 = 12

// Client is a simple wrapper that adds retry to etcd RPC
type Client struct {
Expand Down
7 changes: 6 additions & 1 deletion pkg/version/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func SanitizeVersion(v string) string {
return strings.TrimPrefix(v, "v")
}

var checkClusterVersionRetryTimes = 10

// CheckClusterVersion check TiKV and PD version.
// need only one PD alive and match the cdc version.
func CheckClusterVersion(
Expand All @@ -92,7 +94,8 @@ func CheckClusterVersion(
return checkPDVersion(ctx, pdAddr, credential)
}, retry.WithBackoffBaseDelay(time.Millisecond.Milliseconds()*10),
retry.WithBackoffMaxDelay(time.Second.Milliseconds()),
retry.WithMaxTries(5))
retry.WithMaxTries(uint64(checkClusterVersionRetryTimes)),
retry.WithIsRetryableErr(cerror.IsRetryableError))
if err == nil {
break
}
Expand Down Expand Up @@ -145,6 +148,8 @@ func checkPDVersion(ctx context.Context, pdAddr string, credential *security.Cre
return err
}

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
resp, err := httpClient.Get(ctx, fmt.Sprintf("%s/pd/api/v1/version", pdAddr))
if err != nil {
return cerror.ErrCheckClusterVersionFromPD.GenWithStackByArgs(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/version/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestCheckClusterVersion(t *testing.T) {
{
retryTimes := 0
mock.getStatusCode = func() int {
if retryTimes < 5 {
if retryTimes < checkClusterVersionRetryTimes {
retryTimes++
return http.StatusBadRequest
}
Expand Down

0 comments on commit e1826b3

Please sign in to comment.