From f827a89ca7bca98e82c8b0bc620cbe4733652ccf Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 2 Jun 2023 11:04:42 +0800 Subject: [PATCH] etcd (ticdc): add grpc keepalive params and add timeout for check pd version ctx. (#9106) (#9118) close pingcap/tiflow#8808 --- cdc/capture/capture.go | 1 + cdc/server/server.go | 18 +++++++++++++++--- pkg/etcd/client.go | 2 +- pkg/version/check.go | 7 ++++++- pkg/version/check_test.go | 2 +- 5 files changed, 24 insertions(+), 6 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 0cc89ffe4d2..7c412f30bd7 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -203,6 +203,7 @@ func (c *captureImpl) reset(ctx context.Context) error { if err != nil { return errors.Trace(err) } + log.Info("reset session successfully", zap.Any("session", sess)) c.captureMu.Lock() defer c.captureMu.Unlock() diff --git a/cdc/server/server.go b/cdc/server/server.go index ee8ffe22f99..738204bbfd0 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -54,6 +54,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/keepalive" ) const ( @@ -152,6 +153,7 @@ func (s *server) prepare(ctx context.Context) error { logConfig := logutil.DefaultZapLoggerConfig logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) + log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints)) // we do not pass a `context` to the etcd client, // to prevent it's cancelled when the server is closing. // For example, when the non-owner node goes offline, @@ -178,6 +180,10 @@ func (s *server) prepare(ctx context.Context) error { }, MinConnectTimeout: 3 * time.Second, }), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 20 * time.Second, + }), }, }) if err != nil { @@ -335,9 +341,6 @@ func (s *server) startStatusHTTP(serverCtx context.Context, lis net.Listener) er } func (s *server) etcdHealthChecker(ctx context.Context) error { - ticker := time.NewTicker(time.Second * 3) - defer ticker.Stop() - conf := config.GetGlobalServerConfig() grpcClient, err := pd.NewClientWithContext(ctx, s.pdEndpoints, conf.Security.PDSecurityOption()) if err != nil { @@ -349,6 +352,9 @@ func (s *server) etcdHealthChecker(ctx context.Context) error { } defer pc.Close() + ticker := time.NewTicker(time.Second * 3) + defer ticker.Stop() + for { select { case <-ctx.Done(): @@ -370,6 +376,12 @@ func (s *server) etcdHealthChecker(ctx context.Context) error { Observe(time.Since(start).Seconds()) cancel() } + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + _, err = s.etcdClient.GetEtcdClient().Unwrap().MemberList(ctx) + cancel() + if err != nil { + log.Warn("etcd health check error, fail to list etcd members", zap.Error(err)) + } } } } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index a1c82c07fc1..1e96c1546be 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -65,7 +65,7 @@ var ( ) // set to var instead of const for mocking the value to speedup test -var maxTries uint64 = 8 +var maxTries uint64 = 12 // Client is a simple wrapper that adds retry to etcd RPC type Client struct { diff --git a/pkg/version/check.go b/pkg/version/check.go index 3217187f292..2be6566f2a3 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -71,6 +71,8 @@ func SanitizeVersion(v string) string { return strings.TrimPrefix(v, "v") } +var checkClusterVersionRetryTimes = 10 + // CheckClusterVersion check TiKV and PD version. // need only one PD alive and match the cdc version. func CheckClusterVersion( @@ -92,7 +94,8 @@ func CheckClusterVersion( return checkPDVersion(ctx, pdAddr, credential) }, retry.WithBackoffBaseDelay(time.Millisecond.Milliseconds()*10), retry.WithBackoffMaxDelay(time.Second.Milliseconds()), - retry.WithMaxTries(5)) + retry.WithMaxTries(uint64(checkClusterVersionRetryTimes)), + retry.WithIsRetryableErr(cerror.IsRetryableError)) if err == nil { break } @@ -145,6 +148,8 @@ func checkPDVersion(ctx context.Context, pdAddr string, credential *security.Cre return err } + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() resp, err := httpClient.Get(ctx, fmt.Sprintf("%s/pd/api/v1/version", pdAddr)) if err != nil { return cerror.ErrCheckClusterVersionFromPD.GenWithStackByArgs(err) diff --git a/pkg/version/check_test.go b/pkg/version/check_test.go index f67cb3fb1db..fb13b356bf8 100644 --- a/pkg/version/check_test.go +++ b/pkg/version/check_test.go @@ -237,7 +237,7 @@ func TestCheckClusterVersion(t *testing.T) { { retryTimes := 0 mock.getStatusCode = func() int { - if retryTimes < 5 { + if retryTimes < checkClusterVersionRetryTimes { retryTimes++ return http.StatusBadRequest }