diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index b65f8b901a4..cba08c7bb99 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -205,7 +205,7 @@ func CreateClientsWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*c // CreateClients creates etcd v3 client and http client. func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) { - client, err := createEtcdClient(tlsConfig, acUrls) + client, err := CreateEtcdClient(tlsConfig, acUrls) if err != nil { return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() } @@ -252,9 +252,9 @@ func createEtcdClientWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) return client, err } -// createEtcdClient creates etcd v3 client. +// CreateEtcdClient creates etcd v3 client. // Note: it will be used by legacy pd-server, and only connect to leader only. -func createEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) { +func CreateEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) { lgc := zap.NewProductionConfig() lgc.Encoding = log.ZapEncodingName client, err := clientv3.New(clientv3.Config{ diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index ad0b277c8f0..63c7d2276bb 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -257,7 +257,7 @@ func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck", "return(true)")) err = checkEtcdWithHangLeader(t) re.Error(err) - require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck")) } func checkEtcdWithHangLeader(t *testing.T) error { diff --git a/server/server.go b/server/server.go index e1dcadf2d7e..36a3cefd0af 100644 --- a/server/server.go +++ b/server/server.go @@ -150,6 +150,8 @@ type Server struct { member *member.EmbeddedEtcdMember // etcd client client *clientv3.Client + // electionClient is used for leader election. + electionClient *clientv3.Client // http client httpClient *http.Client clusterID uint64 // pd cluster id. @@ -342,6 +344,11 @@ func (s *Server) startEtcd(ctx context.Context) error { return err } + s.electionClient, err = startElectionClient(s.cfg) + if err != nil { + return err + } + // update advertise peer urls. etcdMembers, err := etcdutil.ListEtcdMembers(s.client) if err != nil { @@ -360,7 +367,7 @@ func (s *Server) startEtcd(ctx context.Context) error { failpoint.Inject("memberNil", func() { time.Sleep(1500 * time.Millisecond) }) - s.member = member.NewMember(etcd, s.client, etcdServerID) + s.member = member.NewMember(etcd, s.electionClient, etcdServerID) return nil } @@ -376,6 +383,19 @@ func startClient(cfg *config.Config) (*clientv3.Client, *http.Client, error) { return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls[0]) } +func startElectionClient(cfg *config.Config) (*clientv3.Client, error) { + tlsConfig, err := cfg.Security.ToTLSConfig() + if err != nil { + return nil, err + } + etcdCfg, err := cfg.GenEmbedEtcdConfig() + if err != nil { + return nil, err + } + + return etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls[0]) +} + // AddStartCallback adds a callback in the startServer phase. func (s *Server) AddStartCallback(callbacks ...func()) { s.startCallbacks = append(s.startCallbacks, callbacks...) @@ -491,6 +511,11 @@ func (s *Server) Close() { log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) } } + if s.electionClient != nil { + if err := s.electionClient.Close(); err != nil { + log.Error("close election client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) + } + } if s.httpClient != nil { s.httpClient.CloseIdleConnections()