Skip to content

Commit

Permalink
etcd, pd (ticdc): refine pdClient and etcdClient initialization (#9661)…
Browse files Browse the repository at this point in the history
… (#9708)

close #9565
  • Loading branch information
ti-chi-bot authored Sep 11, 2023
1 parent 2af501a commit 4a31086
Show file tree
Hide file tree
Showing 12 changed files with 487 additions and 104 deletions.
16 changes: 9 additions & 7 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap"
Expand Down Expand Up @@ -81,6 +82,7 @@ type captureImpl struct {
liveness model.Liveness
config *config.ServerConfig

pdClient pd.Client
pdEndpoints []string
ownerMu sync.Mutex
owner owner.Owner
Expand Down Expand Up @@ -133,6 +135,7 @@ func NewCapture(pdEndpoints []string,
tableActorSystem *system.System,
sortEngineMangerFactory *factory.SortEngineFactory,
sorterSystem *ssystem.System,
pdClient pd.Client,
) Capture {
conf := config.GetGlobalServerConfig()
return &captureImpl{
Expand All @@ -146,12 +149,11 @@ func NewCapture(pdEndpoints []string,
newProcessorManager: processor.NewManager,
newOwner: owner.NewOwner,
info: &model.CaptureInfo{},

useSortEngine: sortEngineMangerFactory != nil,
sortEngineFactory: sortEngineMangerFactory,
sorterSystem: sorterSystem,

migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf),
useSortEngine: sortEngineMangerFactory != nil,
sortEngineFactory: sortEngineMangerFactory,
sorterSystem: sorterSystem,
migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf),
pdClient: pdClient,
}
}

Expand Down Expand Up @@ -217,7 +219,7 @@ func (c *captureImpl) reset(ctx context.Context) error {
c.upstreamManager.Close()
}
c.upstreamManager = upstream.NewManager(ctx, c.EtcdClient.GetGCServiceID())
_, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security)
_, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security, c.pdClient)
if err != nil {
return errors.Trace(err)
}
Expand Down
110 changes: 54 additions & 56 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,11 @@ import (
"github.com/pingcap/tiflow/pkg/util"
p2pProto "github.com/pingcap/tiflow/proto/p2p"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/netutil"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)

const (
Expand Down Expand Up @@ -82,13 +78,14 @@ type Server interface {
// TODO: we need to make server more unit testable and add more test cases.
// Especially we need to decouple the HTTPServer out of server.
type server struct {
capture capture.Capture
tcpServer tcpserver.TCPServer
grpcService *p2p.ServerWrapper
statusServer *http.Server
etcdClient etcd.CDCEtcdClient
pdEndpoints []string

capture capture.Capture
tcpServer tcpserver.TCPServer
grpcService *p2p.ServerWrapper
statusServer *http.Server
etcdClient etcd.CDCEtcdClient
pdEndpoints []string
pdClient pd.Client
pdAPIClient *pdutil.PDAPIClient
tableActorSystem *system.System

// If it's true sortEngineManager will be used, otherwise sorterSystem will be used.
Expand Down Expand Up @@ -140,35 +137,21 @@ func New(pdEndpoints []string) (*server, error) {
func (s *server) prepare(ctx context.Context) error {
conf := config.GetGlobalServerConfig()

grpcTLSOption, err := conf.Security.ToGRPCDialOption()
tlsConfig, err := conf.Security.ToTLSConfig()
if err != nil {
return errors.Trace(err)
}

tlsConfig, err := conf.Security.ToTLSConfig()
grpcTLSOption, err := conf.Security.ToGRPCDialOption()
if err != nil {
return errors.Trace(err)
}

logConfig := logutil.DefaultZapLoggerConfig
logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel)

log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints))
// we do not pass a `context` to the etcd client,
// to prevent it's cancelled when the server is closing.
// For example, when the non-owner node goes offline,
// it would resign the campaign key which was put by call `campaign`,
// if this is not done due to the passed context cancelled,
// the key will be kept for the lease TTL, which is 10 seconds,
// then cause the new owner cannot be elected immediately after the old owner offline.
// see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: s.pdEndpoints,
TLS: tlsConfig,
LogConfig: &logConfig,
DialTimeout: 5 * time.Second,
AutoSyncInterval: 30 * time.Second,
DialOptions: []grpc.DialOption{
log.Info("create pd client", zap.Strings("endpoints", s.pdEndpoints))
s.pdClient, err = pd.NewClientWithContext(
ctx, s.pdEndpoints, conf.Security.PDSecurityOption(),
// the default `timeout` is 3s, maybe too small if the pd is busy,
// set to 10s to avoid frequent timeout.
pd.WithCustomTimeoutOption(10*time.Second),
pd.WithGRPCDialOptions(
grpcTLSOption,
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Expand All @@ -180,12 +163,24 @@ func (s *server) prepare(ctx context.Context) error {
},
MinConnectTimeout: 3 * time.Second,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 20 * time.Second,
}),
},
})
))
if err != nil {
return errors.Trace(err)
}
s.pdAPIClient, err = pdutil.NewPDAPIClient(s.pdClient, conf.Security)
if err != nil {
return errors.Trace(err)
}
log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints))
// we do not pass a `context` to create a the etcd client,
// to prevent it's cancelled when the server is closing.
// For example, when the non-owner node goes offline,
// it would resign the campaign key which was put by call `campaign`,
// if this is not done due to the passed context cancelled,
// the key will be kept for the lease TTL, which is 10 seconds,
// then cause the new owner cannot be elected immediately after the old owner offline.
// see https://github.com/etcd-io/etcd/blob/525d53bd41/client/v3/concurrency/election.go#L98
etcdCli, err := etcd.CreateRawEtcdClient(tlsConfig, grpcTLSOption, s.pdEndpoints...)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -196,6 +191,15 @@ func (s *server) prepare(ctx context.Context) error {
}
s.etcdClient = cdcEtcdClient

// Collect all endpoints from pd here to make the server more robust.
// Because in some scenarios, the deployer may only provide one pd endpoint,
// this will cause the TiCDC server to fail to restart when some pd node is down.
allPDEndpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx)
if err != nil {
return errors.Trace(err)
}
s.pdEndpoints = append(s.pdEndpoints, allPDEndpoints...)

err = s.initDir(ctx)
if err != nil {
return errors.Trace(err)
Expand All @@ -211,7 +215,7 @@ func (s *server) prepare(ctx context.Context) error {

s.capture = capture.NewCapture(
s.pdEndpoints, cdcEtcdClient, s.grpcService,
s.tableActorSystem, s.sortEngineFactory, s.sorterSystem)
s.tableActorSystem, s.sortEngineFactory, s.sorterSystem, s.pdClient)

return nil
}
Expand Down Expand Up @@ -340,18 +344,7 @@ func (s *server) startStatusHTTP(serverCtx context.Context, lis net.Listener) er
return nil
}

func (s *server) etcdHealthChecker(ctx context.Context) error {
conf := config.GetGlobalServerConfig()
grpcClient, err := pd.NewClientWithContext(ctx, s.pdEndpoints, conf.Security.PDSecurityOption())
if err != nil {
return errors.Trace(err)
}
pc, err := pdutil.NewPDAPIClient(grpcClient, conf.Security)
if err != nil {
return errors.Trace(err)
}
defer pc.Close()

func (s *server) upstreamPDHealthChecker(ctx context.Context) error {
ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()

Expand All @@ -360,15 +353,15 @@ func (s *server) etcdHealthChecker(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
endpoints, err := pc.CollectMemberEndpoints(ctx)
endpoints, err := s.pdAPIClient.CollectMemberEndpoints(ctx)
if err != nil {
log.Warn("etcd health check: cannot collect all members", zap.Error(err))
continue
}
for _, endpoint := range endpoints {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
if err := pc.Healthy(ctx, endpoint); err != nil {
if err := s.pdAPIClient.Healthy(ctx, endpoint); err != nil {
log.Warn("etcd health check error",
zap.String("endpoint", endpoint), zap.Error(err))
}
Expand All @@ -389,6 +382,7 @@ func (s *server) etcdHealthChecker(ctx context.Context) error {
func (s *server) run(ctx context.Context) (err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer s.pdAPIClient.Close()

wg, cctx := errgroup.WithContext(ctx)

Expand All @@ -397,7 +391,7 @@ func (s *server) run(ctx context.Context) (err error) {
})

wg.Go(func() error {
return s.etcdHealthChecker(cctx)
return s.upstreamPDHealthChecker(cctx)
})

wg.Go(func() error {
Expand Down Expand Up @@ -460,6 +454,10 @@ func (s *server) Close() {
}
s.tcpServer = nil
}

if s.pdClient != nil {
s.pdClient.Close()
}
}

func (s *server) stopActorSystems() {
Expand Down
Loading

0 comments on commit 4a31086

Please sign in to comment.