Skip to content

Commit

Permalink
*: use another etcd client for election (#6409)
Browse files Browse the repository at this point in the history
ref #6403

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx authored May 12, 2023
1 parent e8de93d commit d2e73d1
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 20 deletions.
16 changes: 3 additions & 13 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,9 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value
return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID))
}

// CreateClientsWithMultiEndpoint creates etcd v3 client and http client.
func CreateClientsWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) {
client, err := createEtcdClientWithMultiEndpoint(tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
httpClient := createHTTPClient(tlsConfig)
return client, httpClient, nil
}

// CreateClients creates etcd v3 client and http client.
func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) {
client, err := createEtcdClient(tlsConfig, acUrls)
client, err := CreateEtcdClient(tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
Expand Down Expand Up @@ -255,9 +245,9 @@ func createEtcdClientWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL)
return client, err
}

// createEtcdClient creates etcd v3 client.
// CreateEtcdClient creates etcd v3 client.
// Note: it will be used by legacy pd-server, and only connect to leader only.
func createEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) {
func CreateEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) {
lgc := zap.NewProductionConfig()
lgc.Encoding = log.ZapEncodingName
client, err := clientv3.New(clientv3.Config{
Expand Down
12 changes: 6 additions & 6 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,12 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) {
// Create two etcd clients with etcd1 as endpoint.
urls, err := types.NewURLs([]string{ep1})
re.NoError(err)
client1, err := createEtcdClient(nil, urls[0]) // execute member change operation with this client
client1, err := CreateEtcdClient(nil, urls[0]) // execute member change operation with this client
defer func() {
client1.Close()
}()
re.NoError(err)
client2, err := createEtcdClient(nil, urls[0]) // check member change with this client
client2, err := CreateEtcdClient(nil, urls[0]) // check member change with this client
defer func() {
client2.Close()
}()
Expand Down Expand Up @@ -482,7 +482,7 @@ func (suite *loopWatcherTestSuite) SetupSuite() {
ep1 := suite.config.LCUrls[0].String()
urls, err := types.NewURLs([]string{ep1})
suite.NoError(err)
suite.client, err = createEtcdClient(nil, urls[0])
suite.client, err = CreateEtcdClient(nil, urls[0])
suite.NoError(err)
suite.cleans = append(suite.cleans, func() {
suite.client.Close()
Expand Down Expand Up @@ -685,23 +685,23 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() {

// Case2: close the etcd client and put a new value after watcher restarts
suite.client.Close()
suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0])
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0])
suite.NoError(err)
watcher.updateClientCh <- suite.client
suite.put("TestWatcherBreak", "2")
checkCache("2")

// Case3: close the etcd client and put a new value before watcher restarts
suite.client.Close()
suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0])
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0])
suite.NoError(err)
suite.put("TestWatcherBreak", "3")
watcher.updateClientCh <- suite.client
checkCache("3")

// Case4: close the etcd client and put a new value with compact
suite.client.Close()
suite.client, err = createEtcdClient(nil, suite.config.LCUrls[0])
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0])
suite.NoError(err)
suite.put("TestWatcherBreak", "4")
resp, err := EtcdKVGet(suite.client, "TestWatcherBreak")
Expand Down
27 changes: 26 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ type Server struct {
member *member.EmbeddedEtcdMember
// etcd client
client *clientv3.Client
// electionClient is used for leader election.
electionClient *clientv3.Client
// http client
httpClient *http.Client
clusterID uint64 // pd cluster id.
Expand Down Expand Up @@ -335,6 +337,11 @@ func (s *Server) startEtcd(ctx context.Context) error {
return err
}

s.electionClient, err = startElectionClient(s.cfg)
if err != nil {
return err
}

// update advertise peer urls.
etcdMembers, err := etcdutil.ListEtcdMembers(s.client)
if err != nil {
Expand All @@ -353,7 +360,7 @@ func (s *Server) startEtcd(ctx context.Context) error {
failpoint.Inject("memberNil", func() {
time.Sleep(1500 * time.Millisecond)
})
s.member = member.NewMember(etcd, s.client, etcdServerID)
s.member = member.NewMember(etcd, s.electionClient, etcdServerID)
return nil
}

Expand All @@ -369,6 +376,19 @@ func startClient(cfg *config.Config) (*clientv3.Client, *http.Client, error) {
return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls[0])
}

func startElectionClient(cfg *config.Config) (*clientv3.Client, error) {
tlsConfig, err := cfg.Security.ToTLSConfig()
if err != nil {
return nil, err
}
etcdCfg, err := cfg.GenEmbedEtcdConfig()
if err != nil {
return nil, err
}

return etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls[0])
}

// AddStartCallback adds a callback in the startServer phase.
func (s *Server) AddStartCallback(callbacks ...func()) {
s.startCallbacks = append(s.startCallbacks, callbacks...)
Expand Down Expand Up @@ -484,6 +504,11 @@ func (s *Server) Close() {
log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err))
}
}
if s.electionClient != nil {
if err := s.electionClient.Close(); err != nil {
log.Error("close election client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err))
}
}

if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
Expand Down

0 comments on commit d2e73d1

Please sign in to comment.