diff --git a/CHANGELOG/CHANGELOG-3.6.md b/CHANGELOG/CHANGELOG-3.6.md index 7b8d95e2fa39..31a75177ef96 100644 --- a/CHANGELOG/CHANGELOG-3.6.md +++ b/CHANGELOG/CHANGELOG-3.6.md @@ -37,6 +37,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0). - Add [`etcd --experimental-enable-lease-checkpoint-persist`](https://github.com/etcd-io/etcd/pull/13508) flag to handle upgrade from v3.5.2 clusters with this feature enabled. - Add [`etcdctl make-mirror --rev`](https://github.com/etcd-io/etcd/pull/13519) flag to support incremental mirror. - Add [`etcd --experimental-wait-cluster-ready-timeout`](https://github.com/etcd-io/etcd/pull/13525) flag to wait for cluster to be ready before serving client requests. +- Add [v3 discovery](https://github.com/etcd-io/etcd/pull/13635) to bootstrap a new etcd cluster. - Fix [non mutating requests pass through quotaKVServer when NOSPACE](https://github.com/etcd-io/etcd/pull/13435) - Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13467). - Fix [Provide a better liveness probe for when etcd runs as a Kubernetes pod](https://github.com/etcd-io/etcd/pull/13399) diff --git a/server/config/config.go b/server/config/config.go index 15cb526f469c..8f901444211d 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -25,6 +25,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/transport" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/netutil" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery" "go.etcd.io/etcd/server/v3/storage/datadir" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -34,12 +35,16 @@ import ( // ServerConfig holds the configuration of etcd as taken from the command line or discovery. type ServerConfig struct { - Name string - DiscoveryURL string - DiscoveryProxy string - ClientURLs types.URLs - PeerURLs types.URLs - DataDir string + Name string + + EnableV2Discovery bool + DiscoveryURL string + DiscoveryProxy string + DiscoveryCfg v3discovery.DiscoveryConfig + + ClientURLs types.URLs + PeerURLs types.URLs + DataDir string // DedicatedWALDir config will make the etcd to write the WAL to the WALDir // rather than the dataDir/member/wal. DedicatedWALDir string diff --git a/server/embed/config.go b/server/embed/config.go index c63a9f971285..778d2b1cbc26 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -15,6 +15,7 @@ package embed import ( + "errors" "fmt" "net" "net/http" @@ -36,6 +37,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery" bolt "go.etcd.io/bbolt" "go.uber.org/multierr" @@ -62,6 +64,11 @@ const ( DefaultDowngradeCheckTime = 5 * time.Second DefaultWaitClusterReadyTimeout = 5 * time.Second + DefaultDiscoveryDialTimeout = 2 * time.Second + DefaultDiscoveryRequestTimeOut = 5 * time.Second + DefaultDiscoveryKeepAliveTime = 2 * time.Second + DefaultDiscoveryKeepAliveTimeOut = 6 * time.Second + DefaultListenPeerURLs = "http://localhost:2380" DefaultListenClientURLs = "http://localhost:2379" @@ -88,6 +95,8 @@ const ( // It's enabled by default. DefaultStrictReconfigCheck = true + DefaultEnableV2Discovery = true + // maxElectionMs specifies the maximum value of election timeout. // More details are listed in ../Documentation/tuning.md#time-parameters. maxElectionMs = 50000 @@ -213,11 +222,15 @@ type Config struct { // Note that cipher suites are prioritized in the given order. CipherSuites []string `json:"cipher-suites"` - ClusterState string `json:"initial-cluster-state"` - DNSCluster string `json:"discovery-srv"` - DNSClusterServiceName string `json:"discovery-srv-name"` - Dproxy string `json:"discovery-proxy"` - Durl string `json:"discovery"` + ClusterState string `json:"initial-cluster-state"` + DNSCluster string `json:"discovery-srv"` + DNSClusterServiceName string `json:"discovery-srv-name"` + Dproxy string `json:"discovery-proxy"` + + EnableV2Discovery bool `json:"enable-v2-discovery"` + Durl string `json:"discovery"` + DiscoveryCfg v3discovery.DiscoveryConfig `json:"discovery-config"` + InitialCluster string `json:"initial-cluster"` InitialClusterToken string `json:"initial-cluster-token"` StrictReconfigCheck bool `json:"strict-reconfig-check"` @@ -504,6 +517,14 @@ func NewConfig() *Config { ExperimentalMaxLearners: membership.DefaultMaxLearners, V2Deprecation: config.V2_DEPR_DEFAULT, + + EnableV2Discovery: DefaultEnableV2Discovery, + DiscoveryCfg: v3discovery.DiscoveryConfig{ + DialTimeout: DefaultDiscoveryDialTimeout, + RequestTimeOut: DefaultDiscoveryRequestTimeOut, + KeepAliveTime: DefaultDiscoveryKeepAliveTime, + KeepAliveTimeout: DefaultDiscoveryKeepAliveTimeOut, + }, } cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) return cfg @@ -667,6 +688,22 @@ func (cfg *Config) Validate() error { return ErrConflictBootstrapFlags } + // Check if both v2 discovery and v3 discovery flags are passed. + v2discoveryFlagsExist := cfg.Dproxy != "" + v3discoveryFlagsExist := cfg.DiscoveryCfg.CertFile != "" || + cfg.DiscoveryCfg.KeyFile != "" || + cfg.DiscoveryCfg.TrustedCAFile != "" || + cfg.DiscoveryCfg.User != "" || + cfg.DiscoveryCfg.Password != "" + if cfg.EnableV2Discovery && v3discoveryFlagsExist { + return errors.New("v2 discovery is enabled, but some v3 discovery " + + "settings (discovery-cert, discovery-key, discovery-cacert, " + + "discovery-user, discovery-password) are set") + } + if !cfg.EnableV2Discovery && v2discoveryFlagsExist { + return errors.New("v3 discovery is enabled, but --discovery-proxy is set") + } + if cfg.TickMs == 0 { return fmt.Errorf("--heartbeat-interval must be >0 (set to %dms)", cfg.TickMs) } diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 825c56869ad0..46069b63900d 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -175,8 +175,10 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { MaxWALFiles: cfg.MaxWalFiles, InitialPeerURLsMap: urlsmap, InitialClusterToken: token, + EnableV2Discovery: cfg.EnableV2Discovery, DiscoveryURL: cfg.Durl, DiscoveryProxy: cfg.Dproxy, + DiscoveryCfg: cfg.DiscoveryCfg, NewCluster: cfg.IsNewCluster(), PeerTLSInfo: cfg.PeerTLSInfo, TickMs: cfg.TickMs, @@ -345,6 +347,18 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()), zap.String("discovery-url", sc.DiscoveryURL), zap.String("discovery-proxy", sc.DiscoveryProxy), + + zap.String("discovery-dial-timeout", sc.DiscoveryCfg.DialTimeout.String()), + zap.String("discovery-request-timeout", sc.DiscoveryCfg.RequestTimeOut.String()), + zap.String("discovery-keepalive-time", sc.DiscoveryCfg.KeepAliveTime.String()), + zap.String("discovery-keepalive-timeout", sc.DiscoveryCfg.KeepAliveTimeout.String()), + zap.Bool("discovery-insecure-transport", sc.DiscoveryCfg.InsecureTransport), + zap.Bool("discovery-insecure-skip-tls-verify", sc.DiscoveryCfg.InsecureSkipVerify), + zap.String("discovery-cert", sc.DiscoveryCfg.CertFile), + zap.String("discovery-key", sc.DiscoveryCfg.KeyFile), + zap.String("discovery-cacert", sc.DiscoveryCfg.TrustedCAFile), + zap.String("discovery-user", sc.DiscoveryCfg.User), + zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()), zap.Int("max-learners", sc.ExperimentalMaxLearners), ) diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index efb9368a14a5..ec2a2f6bb099 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -186,10 +186,23 @@ func newConfig() *config { "advertise-client-urls", "List of this member's client URLs to advertise to the public.", ) + fs.BoolVar(&cfg.ec.EnableV2Discovery, "enable-v2-discovery", cfg.ec.EnableV2Discovery, "Enable to bootstrap the cluster using v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8.") fs.StringVar(&cfg.ec.Durl, "discovery", cfg.ec.Durl, "Discovery URL used to bootstrap the cluster.") fs.Var(cfg.cf.fallback, "discovery-fallback", fmt.Sprintf("Valid values include %q", cfg.cf.fallback.Valids())) - fs.StringVar(&cfg.ec.Dproxy, "discovery-proxy", cfg.ec.Dproxy, "HTTP proxy to use for traffic to discovery service.") + fs.DurationVar(&cfg.ec.DiscoveryCfg.DialTimeout, "discovery-dial-timeout", cfg.ec.DiscoveryCfg.DialTimeout, "V3 discovery: dial timeout for client connections.") + fs.DurationVar(&cfg.ec.DiscoveryCfg.RequestTimeOut, "discovery-request-timeout", cfg.ec.DiscoveryCfg.RequestTimeOut, "V3 discovery: timeout for discovery requests (excluding dial timeout).") + fs.DurationVar(&cfg.ec.DiscoveryCfg.KeepAliveTime, "discovery-keepalive-time", cfg.ec.DiscoveryCfg.KeepAliveTime, "V3 discovery: keepalive time for client connections.") + fs.DurationVar(&cfg.ec.DiscoveryCfg.KeepAliveTimeout, "discovery-keepalive-timeout", cfg.ec.DiscoveryCfg.KeepAliveTimeout, "V3 discovery: keepalive timeout for client connections.") + fs.BoolVar(&cfg.ec.DiscoveryCfg.InsecureTransport, "discovery-insecure-transport", true, "V3 discovery: disable transport security for client connections.") + fs.BoolVar(&cfg.ec.DiscoveryCfg.InsecureSkipVerify, "discovery-insecure-skip-tls-verify", false, "V3 discovery: skip server certificate verification (CAUTION: this option should be enabled only for testing purposes).") + fs.StringVar(&cfg.ec.DiscoveryCfg.CertFile, "discovery-cert", "", "V3 discovery: identify secure client using this TLS certificate file.") + fs.StringVar(&cfg.ec.DiscoveryCfg.KeyFile, "discovery-key", "", "V3 discovery: identify secure client using this TLS key file.") + fs.StringVar(&cfg.ec.DiscoveryCfg.TrustedCAFile, "discovery-cacert", "", "V3 discovery: verify certificates of TLS-enabled secure servers using this CA bundle.") + fs.StringVar(&cfg.ec.DiscoveryCfg.User, "discovery-user", "", "V3 discovery: username[:password] for authentication (prompt if password is not supplied).") + fs.StringVar(&cfg.ec.DiscoveryCfg.Password, "discovery-password", "", "V3 discovery: password for authentication (if this option is used, --user option shouldn't include password).") + + fs.StringVar(&cfg.ec.Dproxy, "discovery-proxy", cfg.ec.Dproxy, "HTTP proxy to use for traffic to discovery service. Will be deprecated in v3.7, and be decommissioned in v3.8.") fs.StringVar(&cfg.ec.DNSCluster, "discovery-srv", cfg.ec.DNSCluster, "DNS domain used to bootstrap initial cluster.") fs.StringVar(&cfg.ec.DNSClusterServiceName, "discovery-srv-name", cfg.ec.DNSClusterServiceName, "Service name to query when using DNS discovery.") fs.StringVar(&cfg.ec.InitialCluster, "initial-cluster", cfg.ec.InitialCluster, "Initial cluster configuration for bootstrapping.") diff --git a/server/etcdmain/etcd.go b/server/etcdmain/etcd.go index 190364e2781b..255e5f42e4ec 100644 --- a/server/etcdmain/etcd.go +++ b/server/etcdmain/etcd.go @@ -17,6 +17,7 @@ package etcdmain import ( "encoding/json" "fmt" + "net/http" "os" "path/filepath" @@ -34,6 +35,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp" "go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery" "go.etcd.io/etcd/server/v3/proxy/httpproxy" "go.uber.org/zap" @@ -318,7 +320,11 @@ func startProxy(cfg *config) error { if cfg.ec.Durl != "" { var s string - s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy) + if cfg.ec.EnableV2Discovery { + s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy) + } else { + s, err = v3discovery.GetCluster(lg, cfg.ec.Durl, &cfg.ec.DiscoveryCfg) + } if err != nil { return err } diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index fdd62cb094d6..84b683b0dc7e 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -104,13 +104,37 @@ Clustering: --advertise-client-urls 'http://localhost:2379' List of this member's client URLs to advertise to the public. The client URLs advertised should be accessible to machines that talk to etcd cluster. etcd client libraries parse these URLs to connect to the cluster. + --enable-v2-discovery 'true' + Enable to bootstrap the cluster using v2 discovery. Will be deprecated in v3.7, and be decommissioned in v3.8. --discovery '' Discovery URL used to bootstrap the cluster. + --discovery-dial-timeout '2s' + V3 discovery: dial timeout for client connections. + --discovery-request-timeout '5s' + V3 discovery: timeout for discovery requests (excluding dial timeout). + --discovery-keepalive-time '2s' + V3 discovery: keepalive time for client connections. + --discovery-keepalive-timeout '6s' + V3 discovery: keepalive timeout for client connections. + --discovery-insecure-transport 'true' + V3 discovery: disable transport security for client connections. + --discovery-insecure-skip-tls-verify 'false' + V3 discovery: skip server certificate verification (CAUTION: this option should be enabled only for testing purposes). + --discovery-cert '' + V3 discovery: identify secure client using this TLS certificate file. + --discovery-key '' + V3 discovery: identify secure client using this TLS key file. + --discovery-cacert '' + V3 discovery: verify certificates of TLS-enabled secure servers using this CA bundle. + --discovery-user '' + V3 discovery: username[:password] for authentication (prompt if password is not supplied). + --discovery-password '' + V3 discovery: password for authentication (if this option is used, --user option shouldn't include password). --discovery-fallback 'proxy' Expected behavior ('exit' or 'proxy') when discovery services fails. "proxy" supports v2 API only. --discovery-proxy '' - HTTP proxy to use for traffic to discovery service. + HTTP proxy to use for traffic to discovery service. Will be deprecated in v3.7, and be decommissioned in v3.8. --discovery-srv '' DNS srv domain used to bootstrap the cluster. --discovery-srv-name '' diff --git a/server/etcdserver/api/v3discovery/discovery.go b/server/etcdserver/api/v3discovery/discovery.go new file mode 100644 index 000000000000..1ed12c59c80b --- /dev/null +++ b/server/etcdserver/api/v3discovery/discovery.go @@ -0,0 +1,580 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package v3discovery provides an implementation of the cluster discovery that +// is used by etcd with v3 client. +package v3discovery + +import ( + "context" + "crypto/tls" + "errors" + + "math" + "net/url" + "path" + "sort" + "strconv" + "strings" + "time" + + "go.etcd.io/etcd/client/pkg/v3/transport" + "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/client/v3" + + "github.com/jonboulle/clockwork" + "go.uber.org/zap" +) + +const ( + discoveryPrefix = "/_etcd/registry" +) + +var ( + ErrInvalidURL = errors.New("discovery: invalid peer URL") + ErrBadSizeKey = errors.New("discovery: size key is bad") + ErrSizeNotFound = errors.New("discovery: size key not found") + ErrFullCluster = errors.New("discovery: cluster is full") + ErrTooManyRetries = errors.New("discovery: too many retries") +) + +var ( + // Number of retries discovery will attempt before giving up and error out. + nRetries = uint(math.MaxUint32) + maxExponentialRetries = uint(8) +) + +type DiscoveryConfig struct { + Url string `json:"discovery"` + + DialTimeout time.Duration `json:"discovery-dial-timeout"` + RequestTimeOut time.Duration `json:"discovery-request-timeout"` + KeepAliveTime time.Duration `json:"discovery-keepalive-time"` + KeepAliveTimeout time.Duration `json:"discovery-keepalive-timeout"` + + InsecureTransport bool `json:"discovery-insecure-transport"` + InsecureSkipVerify bool `json:"discovery-insecure-skip-tls-verify"` + CertFile string `json:"discovery-cert"` + KeyFile string `json:"discovery-key"` + TrustedCAFile string `json:"discovery-cacert"` + + User string `json:"discovery-user"` + Password string `json:"discovery-password"` +} + +type memberInfo struct { + // peerRegKey is the key used by the member when registering in the + // discovery service. + // Format: "/_etcd/registry//members/". + peerRegKey string + // peerURLsMap format: "peerName=peerURLs", i.e., "member1=http://127.0.0.1:2380". + peerURLsMap string + // createRev is the member's CreateRevision in the etcd cluster backing + // the discovery service. + createRev int64 +} + +type clusterInfo struct { + clusterToken string + members []memberInfo +} + +// key prefix for each cluster: "/_etcd/registry/". +func geClusterKeyPrefix(cluster string) string { + return path.Join(discoveryPrefix, cluster) +} + +// key format for cluster size: "/_etcd/registry//_config/size". +func geClusterSizeKey(cluster string) string { + return path.Join(geClusterKeyPrefix(cluster), "_config/size") +} + +// key prefix for each member: "/_etcd/registry//members". +func getMemberKeyPrefix(clusterToken string) string { + return path.Join(geClusterKeyPrefix(clusterToken), "members") +} + +// key format for each member: "/_etcd/registry//members/". +func getMemberKey(cluster, memberId string) string { + return path.Join(getMemberKeyPrefix(cluster), memberId) +} + +// GetCluster will connect to the discovery service at the given url and +// retrieve a string describing the cluster +func GetCluster(lg *zap.Logger, dUrl string, cfg *DiscoveryConfig) (cs string, rerr error) { + d, err := newDiscovery(lg, dUrl, cfg, 0) + if err != nil { + return "", err + } + + defer d.close() + defer func() { + if rerr != nil { + d.lg.Error( + "discovery failed to get cluster", + zap.String("cluster", cs), + zap.Error(rerr), + ) + } else { + d.lg.Info( + "discovery got cluster successfully", + zap.String("cluster", cs), + ) + } + }() + + return d.getCluster() +} + +// JoinCluster will connect to the discovery service at the given url, and +// register the server represented by the given id and config to the cluster. +// The parameter `config` is supposed to be in the format "memberName=peerURLs", +// such as "member1=http://127.0.0.1:2380". +// +// The final returned string has the same format as "--initial-cluster", such as +// "infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380". +func JoinCluster(lg *zap.Logger, durl string, cfg *DiscoveryConfig, id types.ID, config string) (cs string, rerr error) { + d, err := newDiscovery(lg, durl, cfg, id) + if err != nil { + return "", err + } + + defer d.close() + defer func() { + if rerr != nil { + d.lg.Error( + "discovery failed to join cluster", + zap.String("cluster", cs), + zap.Error(rerr), + ) + } else { + d.lg.Info( + "discovery joined cluster successfully", + zap.String("cluster", cs), + ) + } + }() + + return d.joinCluster(config) +} + +type discovery struct { + lg *zap.Logger + clusterToken string + memberId types.ID + c *clientv3.Client + retries uint + durl string + + cfg *DiscoveryConfig + + clock clockwork.Clock +} + +func newDiscovery(lg *zap.Logger, durl string, dcfg *DiscoveryConfig, id types.ID) (*discovery, error) { + if lg == nil { + lg = zap.NewNop() + } + u, err := url.Parse(durl) + if err != nil { + return nil, err + } + token := u.Path + u.Path = "" + + lg = lg.With(zap.String("discovery-url", durl)) + cfg, err := newClientCfg(dcfg, u.String(), lg) + if err != nil { + return nil, err + } + + c, err := clientv3.New(*cfg) + if err != nil { + return nil, err + } + return &discovery{ + lg: lg, + clusterToken: token, + memberId: id, + c: c, + durl: u.String(), + cfg: dcfg, + clock: clockwork.NewRealClock(), + }, nil +} + +// The following function follows the same logic as etcdctl, refer to +// https://github.com/etcd-io/etcd/blob/f9a8c49c695b098d66a07948666664ea10d01a82/etcdctl/ctlv3/command/global.go#L191-L250 +func newClientCfg(dcfg *DiscoveryConfig, dUrl string, lg *zap.Logger) (*clientv3.Config, error) { + var cfgtls *transport.TLSInfo + + if dcfg.CertFile != "" || dcfg.KeyFile != "" || dcfg.TrustedCAFile != "" { + cfgtls = &transport.TLSInfo{ + CertFile: dcfg.CertFile, + KeyFile: dcfg.KeyFile, + TrustedCAFile: dcfg.TrustedCAFile, + Logger: lg, + } + } + + cfg := &clientv3.Config{ + Endpoints: []string{dUrl}, + DialTimeout: dcfg.DialTimeout, + DialKeepAliveTime: dcfg.KeepAliveTime, + DialKeepAliveTimeout: dcfg.KeepAliveTimeout, + Username: dcfg.User, + Password: dcfg.Password, + } + + if cfgtls != nil { + if clientTLS, err := cfgtls.ClientConfig(); err == nil { + cfg.TLS = clientTLS + } else { + return nil, err + } + } + + // if key/cert is not given but user wants secure connection, we + // should still setup an empty tls configuration for gRPC to setup + // secure connection. + if cfg.TLS == nil && !dcfg.InsecureTransport { + cfg.TLS = &tls.Config{} + } + + // If the user wants to skip TLS verification then we should set + // the InsecureSkipVerify flag in tls configuration. + if cfg.TLS != nil && dcfg.InsecureSkipVerify { + cfg.TLS.InsecureSkipVerify = true + } + + return cfg, nil +} + +func (d *discovery) getCluster() (string, error) { + cls, clusterSize, rev, err := d.checkCluster() + if err != nil { + if err == ErrFullCluster { + return cls.getInitClusterStr(clusterSize) + } + return "", err + } + + for cls.Len() < clusterSize { + d.waitPeers(cls, clusterSize, rev) + } + + return cls.getInitClusterStr(clusterSize) +} + +func (d *discovery) joinCluster(config string) (string, error) { + _, _, _, err := d.checkCluster() + if err != nil { + return "", err + } + + if err := d.registerSelf(config); err != nil { + return "", err + } + + cls, clusterSize, rev, err := d.checkCluster() + if err != nil { + return "", err + } + + for cls.Len() < clusterSize { + d.waitPeers(cls, clusterSize, rev) + } + + return cls.getInitClusterStr(clusterSize) +} + +func (d *discovery) getClusterSize() (int, error) { + configKey := geClusterSizeKey(d.clusterToken) + ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RequestTimeOut) + defer cancel() + + resp, err := d.c.Get(ctx, configKey) + if err != nil { + d.lg.Warn( + "failed to get cluster size from discovery service", + zap.String("clusterSizeKey", configKey), + zap.Error(err), + ) + return 0, err + } + + if len(resp.Kvs) == 0 { + return 0, ErrSizeNotFound + } + + clusterSize, err := strconv.ParseInt(string(resp.Kvs[0].Value), 10, 0) + if err != nil || clusterSize <= 0 { + return 0, ErrBadSizeKey + } + + return int(clusterSize), nil +} + +func (d *discovery) getClusterMembers() (*clusterInfo, int64, error) { + membersKeyPrefix := getMemberKeyPrefix(d.clusterToken) + ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RequestTimeOut) + defer cancel() + + resp, err := d.c.Get(ctx, membersKeyPrefix, clientv3.WithPrefix()) + if err != nil { + d.lg.Warn( + "failed to get cluster members from discovery service", + zap.String("membersKeyPrefix", membersKeyPrefix), + zap.Error(err), + ) + return nil, 0, err + } + + cls := &clusterInfo{clusterToken: d.clusterToken} + for _, kv := range resp.Kvs { + mKey := strings.TrimSpace(string(kv.Key)) + mValue := strings.TrimSpace(string(kv.Value)) + + if err := cls.add(mKey, mValue, kv.CreateRevision); err != nil { + d.lg.Warn( + err.Error(), + zap.String("memberKey", mKey), + zap.String("memberInfo", mValue), + ) + } else { + d.lg.Info( + "found peer from discovery service", + zap.String("memberKey", mKey), + zap.String("memberInfo", mValue), + ) + } + } + + return cls, resp.Header.Revision, nil +} + +func (d *discovery) checkClusterRetry() (*clusterInfo, int, int64, error) { + if d.retries < nRetries { + d.logAndBackoffForRetry("cluster status check") + return d.checkCluster() + } + return nil, 0, 0, ErrTooManyRetries +} + +func (d *discovery) checkCluster() (*clusterInfo, int, int64, error) { + clusterSize, err := d.getClusterSize() + if err != nil { + if err == ErrSizeNotFound || err == ErrBadSizeKey { + return nil, 0, 0, err + } + + return d.checkClusterRetry() + } + + cls, rev, err := d.getClusterMembers() + if err != nil { + return d.checkClusterRetry() + } + d.retries = 0 + + // find self position + memberSelfId := getMemberKey(d.clusterToken, d.memberId.String()) + idx := 0 + for _, m := range cls.members { + if m.peerRegKey == memberSelfId { + break + } + if idx >= clusterSize-1 { + return cls, clusterSize, rev, ErrFullCluster + } + idx++ + } + return cls, clusterSize, rev, nil +} + +func (d *discovery) registerSelfRetry(contents string) error { + if d.retries < nRetries { + d.logAndBackoffForRetry("register member itself") + return d.registerSelf(contents) + } + return ErrTooManyRetries +} + +func (d *discovery) registerSelf(contents string) error { + ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RequestTimeOut) + memberKey := getMemberKey(d.clusterToken, d.memberId.String()) + _, err := d.c.Put(ctx, memberKey, contents) + cancel() + + if err != nil { + d.lg.Warn( + "failed to register members itself to the discovery service", + zap.String("memberKey", memberKey), + zap.Error(err), + ) + return d.registerSelfRetry(contents) + } + d.retries = 0 + + d.lg.Info( + "register member itself successfully", + zap.String("memberKey", memberKey), + zap.String("memberInfo", contents), + ) + + return nil +} + +func (d *discovery) waitPeers(cls *clusterInfo, clusterSize int, rev int64) { + // watch from the next revision + membersKeyPrefix := getMemberKeyPrefix(d.clusterToken) + w := d.c.Watch(context.Background(), membersKeyPrefix, clientv3.WithPrefix(), clientv3.WithRev(rev+1)) + + d.lg.Info( + "waiting for peers from discovery service", + zap.Int("clusterSize", clusterSize), + zap.Int("found-peers", cls.Len()), + ) + + // waiting for peers until all needed peers are returned + for wresp := range w { + for _, ev := range wresp.Events { + mKey := strings.TrimSpace(string(ev.Kv.Key)) + mValue := strings.TrimSpace(string(ev.Kv.Value)) + + if err := cls.add(mKey, mValue, ev.Kv.CreateRevision); err != nil { + d.lg.Warn( + err.Error(), + zap.String("memberKey", mKey), + zap.String("memberInfo", mValue), + ) + } else { + d.lg.Info( + "found peer from discovery service", + zap.String("memberKey", mKey), + zap.String("memberInfo", mValue), + ) + } + } + + if cls.Len() >= clusterSize { + break + } + } + + d.lg.Info( + "found all needed peers from discovery service", + zap.Int("clusterSize", clusterSize), + zap.Int("found-peers", cls.Len()), + ) +} + +func (d *discovery) logAndBackoffForRetry(step string) { + d.retries++ + // logAndBackoffForRetry stops exponential backoff when the retries are + // more than maxExpoentialRetries and is set to a constant backoff afterward. + retries := d.retries + if retries > maxExponentialRetries { + retries = maxExponentialRetries + } + retryTimeInSecond := time.Duration(0x1< clusterSize { + peerURLs = peerURLs[:clusterSize] + } + + us := strings.Join(peerURLs, ",") + _, err := types.NewURLsMap(us) + if err != nil { + return us, ErrInvalidURL + } + + return us, nil +} + +func (cls *clusterInfo) getPeerURLs() []string { + var peerURLs []string + for _, peer := range cls.members { + peerURLs = append(peerURLs, peer.peerURLsMap) + } + return peerURLs +} diff --git a/server/etcdserver/api/v3discovery/discovery_test.go b/server/etcdserver/api/v3discovery/discovery_test.go new file mode 100644 index 000000000000..529bb3984623 --- /dev/null +++ b/server/etcdserver/api/v3discovery/discovery_test.go @@ -0,0 +1,783 @@ +package v3discovery + +import ( + "context" + "errors" + "fmt" + "testing" + + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/client/v3" + + "github.com/jonboulle/clockwork" + "go.uber.org/zap" +) + +// fakeKVForClusterSize is used to test getClusterSize. +type fakeKVForClusterSize struct { + *fakeBaseKV + clusterSizeStr string +} + +// We only need to overwrite the method `Get`. +func (fkv *fakeKVForClusterSize) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + if fkv.clusterSizeStr == "" { + // cluster size isn't configured in this case. + return &clientv3.GetResponse{}, nil + } + + return &clientv3.GetResponse{ + Kvs: []*mvccpb.KeyValue{ + { + Value: []byte(fkv.clusterSizeStr), + }, + }, + }, nil +} + +func TestGetClusterSize(t *testing.T) { + cases := []struct { + name string + clusterSizeStr string + expectedErr error + expectedSize int + }{ + { + name: "cluster size not defined", + clusterSizeStr: "", + expectedErr: ErrSizeNotFound, + }, + { + name: "invalid cluster size", + clusterSizeStr: "invalidSize", + expectedErr: ErrBadSizeKey, + }, + { + name: "valid cluster size", + clusterSizeStr: "3", + expectedErr: nil, + expectedSize: 3, + }, + } + + lg, err := zap.NewProduction() + if err != nil { + t.Errorf("Failed to create a logger, error: %v", err) + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + d := &discovery{ + lg: lg, + c: &clientv3.Client{ + KV: &fakeKVForClusterSize{ + fakeBaseKV: &fakeBaseKV{}, + clusterSizeStr: tc.clusterSizeStr, + }, + }, + cfg: &DiscoveryConfig{}, + clusterToken: "fakeToken", + } + + if cs, err := d.getClusterSize(); err != tc.expectedErr { + t.Errorf("Unexpected error, expected: %v got: %v", tc.expectedErr, err) + } else { + if err == nil && cs != tc.expectedSize { + t.Errorf("Unexpected cluster size, expected: %d got: %d", tc.expectedSize, cs) + } + } + }) + } +} + +// fakeKVForClusterMembers is used to test getClusterMembers. +type fakeKVForClusterMembers struct { + *fakeBaseKV + members []memberInfo +} + +// We only need to overwrite method `Get`. +func (fkv *fakeKVForClusterMembers) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + kvs := memberInfoToKeyValues(fkv.members) + + return &clientv3.GetResponse{ + Header: &etcdserverpb.ResponseHeader{ + Revision: 10, + }, + Kvs: kvs, + }, nil +} + +func memberInfoToKeyValues(members []memberInfo) []*mvccpb.KeyValue { + kvs := make([]*mvccpb.KeyValue, 0) + for _, mi := range members { + kvs = append(kvs, &mvccpb.KeyValue{ + Key: []byte(mi.peerRegKey), + Value: []byte(mi.peerURLsMap), + CreateRevision: mi.createRev, + }) + } + + return kvs +} + +func TestGetClusterMembers(t *testing.T) { + actualMemberInfo := []memberInfo{ + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(), + peerURLsMap: "infra1=http://192.168.0.100:2380", + createRev: 8, + }, + { + // invalid peer registry key + peerRegKey: "/invalidPrefix/fakeToken/members/" + types.ID(102).String(), + peerURLsMap: "infra2=http://192.168.0.102:2380", + createRev: 6, + }, + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(), + peerURLsMap: "infra2=http://192.168.0.102:2380", + createRev: 6, + }, + { + // invalid peer info format + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(), + peerURLsMap: "http://192.168.0.102:2380", + createRev: 6, + }, + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(103).String(), + peerURLsMap: "infra3=http://192.168.0.103:2380", + createRev: 7, + }, + { + // duplicate peer + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(), + peerURLsMap: "infra1=http://192.168.0.100:2380", + createRev: 2, + }, + } + + // sort by CreateRevision + expectedMemberInfo := []memberInfo{ + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(), + peerURLsMap: "infra2=http://192.168.0.102:2380", + createRev: 6, + }, + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(103).String(), + peerURLsMap: "infra3=http://192.168.0.103:2380", + createRev: 7, + }, + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(), + peerURLsMap: "infra1=http://192.168.0.100:2380", + createRev: 8, + }, + } + + lg, err := zap.NewProduction() + if err != nil { + t.Errorf("Failed to create a logger, error: %v", err) + } + + d := &discovery{ + lg: lg, + c: &clientv3.Client{ + KV: &fakeKVForClusterMembers{ + fakeBaseKV: &fakeBaseKV{}, + members: actualMemberInfo, + }, + }, + cfg: &DiscoveryConfig{}, + clusterToken: "fakeToken", + } + + clsInfo, _, err := d.getClusterMembers() + if err != nil { + t.Errorf("Failed to get cluster members, error: %v", err) + } + + if clsInfo.Len() != len(expectedMemberInfo) { + t.Errorf("unexpected member count, expected: %d, got: %d", len(expectedMemberInfo), clsInfo.Len()) + } + + for i, m := range clsInfo.members { + if m != expectedMemberInfo[i] { + t.Errorf("unexpected member[%d], expected: %v, got: %v", i, expectedMemberInfo[i], m) + } + } +} + +// fakeKVForCheckCluster is used to test checkCluster. +type fakeKVForCheckCluster struct { + *fakeBaseKV + t *testing.T + token string + clusterSizeStr string + members []memberInfo + getSizeRetries int + getMembersRetries int +} + +// We only need to overwrite method `Get`. +func (fkv *fakeKVForCheckCluster) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + clusterSizeKey := fmt.Sprintf("/_etcd/registry/%s/_config/size", fkv.token) + clusterMembersKey := fmt.Sprintf("/_etcd/registry/%s/members", fkv.token) + + if key == clusterSizeKey { + if fkv.getSizeRetries > 0 { + fkv.getSizeRetries-- + // discovery client should retry on error. + return nil, errors.New("get cluster size failed") + } + return &clientv3.GetResponse{ + Kvs: []*mvccpb.KeyValue{ + { + Value: []byte(fkv.clusterSizeStr), + }, + }, + }, nil + + } else if key == clusterMembersKey { + if fkv.getMembersRetries > 0 { + fkv.getMembersRetries-- + // discovery client should retry on error. + return nil, errors.New("get cluster members failed") + } + kvs := memberInfoToKeyValues(fkv.members) + + return &clientv3.GetResponse{ + Header: &etcdserverpb.ResponseHeader{ + Revision: 10, + }, + Kvs: kvs, + }, nil + } else { + fkv.t.Errorf("unexpected key: %s", key) + return nil, fmt.Errorf("unexpected key: %s", key) + } +} + +func TestCheckCluster(t *testing.T) { + actualMemberInfo := []memberInfo{ + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(), + peerURLsMap: "infra1=http://192.168.0.100:2380", + createRev: 8, + }, + { + // invalid peer registry key + peerRegKey: "/invalidPrefix/fakeToken/members/" + types.ID(102).String(), + peerURLsMap: "infra2=http://192.168.0.102:2380", + createRev: 6, + }, + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(), + peerURLsMap: "infra2=http://192.168.0.102:2380", + createRev: 6, + }, + { + // invalid peer info format + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(), + peerURLsMap: "http://192.168.0.102:2380", + createRev: 6, + }, + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(103).String(), + peerURLsMap: "infra3=http://192.168.0.103:2380", + createRev: 7, + }, + { + // duplicate peer + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(), + peerURLsMap: "infra1=http://192.168.0.100:2380", + createRev: 2, + }, + } + + // sort by CreateRevision + expectedMemberInfo := []memberInfo{ + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(), + peerURLsMap: "infra2=http://192.168.0.102:2380", + createRev: 6, + }, + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(103).String(), + peerURLsMap: "infra3=http://192.168.0.103:2380", + createRev: 7, + }, + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(), + peerURLsMap: "infra1=http://192.168.0.100:2380", + createRev: 8, + }, + } + + cases := []struct { + name string + memberId types.ID + getSizeRetries int + getMembersRetries int + expectedError error + }{ + { + name: "no retries", + memberId: 101, + getSizeRetries: 0, + getMembersRetries: 0, + expectedError: nil, + }, + { + name: "2 retries for getClusterSize", + memberId: 102, + getSizeRetries: 2, + getMembersRetries: 0, + expectedError: nil, + }, + { + name: "2 retries for getClusterMembers", + memberId: 103, + getSizeRetries: 0, + getMembersRetries: 2, + expectedError: nil, + }, + { + name: "error due to cluster full", + memberId: 104, + getSizeRetries: 0, + getMembersRetries: 0, + expectedError: ErrFullCluster, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + lg, err := zap.NewProduction() + if err != nil { + t.Errorf("Failed to create a logger, error: %v", err) + } + + fkv := &fakeKVForCheckCluster{ + fakeBaseKV: &fakeBaseKV{}, + t: t, + token: "fakeToken", + clusterSizeStr: "3", + members: actualMemberInfo, + getSizeRetries: tc.getSizeRetries, + getMembersRetries: tc.getMembersRetries, + } + + d := &discovery{ + lg: lg, + c: &clientv3.Client{ + KV: fkv, + }, + cfg: &DiscoveryConfig{}, + clusterToken: "fakeToken", + memberId: tc.memberId, + clock: clockwork.NewRealClock(), + } + + clsInfo, _, _, err := d.checkCluster() + if err != tc.expectedError { + t.Errorf("Unexpected error, expected: %v, got: %v", tc.expectedError, err) + } + + if err == nil { + if fkv.getSizeRetries != 0 || fkv.getMembersRetries != 0 { + t.Errorf("Discovery client did not retry checking cluster on error, remaining etries: (%d, %d)", fkv.getSizeRetries, fkv.getMembersRetries) + } + + if clsInfo.Len() != len(expectedMemberInfo) { + t.Errorf("Unexpected member count, expected: %d, got: %d", len(expectedMemberInfo), clsInfo.Len()) + } + + for mIdx, m := range clsInfo.members { + if m != expectedMemberInfo[mIdx] { + t.Errorf("Unexpected member[%d], expected: %v, got: %v", mIdx, expectedMemberInfo[mIdx], m) + } + } + } + }) + } +} + +// fakeKVForRegisterSelf is used to test registerSelf. +type fakeKVForRegisterSelf struct { + *fakeBaseKV + t *testing.T + expectedRegKey string + expectedRegValue string + retries int +} + +// We only need to overwrite method `Put`. +func (fkv *fakeKVForRegisterSelf) Put(ctx context.Context, key string, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { + if key != fkv.expectedRegKey { + fkv.t.Errorf("unexpected register key, expected: %s, got: %s", fkv.expectedRegKey, key) + } + + if val != fkv.expectedRegValue { + fkv.t.Errorf("unexpected register value, expected: %s, got: %s", fkv.expectedRegValue, val) + } + + if fkv.retries > 0 { + fkv.retries-- + // discovery client should retry on error. + return nil, errors.New("register self failed") + } + + return nil, nil +} + +func TestRegisterSelf(t *testing.T) { + cases := []struct { + name string + token string + memberId types.ID + expectedRegKey string + expectedRegValue string + retries int // when retries > 0, then return an error on Put request. + }{ + { + name: "no retry with token1", + token: "token1", + memberId: 101, + expectedRegKey: "/_etcd/registry/token1/members/" + types.ID(101).String(), + expectedRegValue: "infra=http://127.0.0.1:2380", + retries: 0, + }, + { + name: "no retry with token2", + token: "token2", + memberId: 102, + expectedRegKey: "/_etcd/registry/token2/members/" + types.ID(102).String(), + expectedRegValue: "infra=http://127.0.0.1:2380", + retries: 0, + }, + { + name: "2 retries", + token: "token3", + memberId: 103, + expectedRegKey: "/_etcd/registry/token3/members/" + types.ID(103).String(), + expectedRegValue: "infra=http://127.0.0.1:2380", + retries: 2, + }, + } + + lg, err := zap.NewProduction() + if err != nil { + t.Errorf("Failed to create a logger, error: %v", err) + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + fkv := &fakeKVForRegisterSelf{ + fakeBaseKV: &fakeBaseKV{}, + t: t, + expectedRegKey: tc.expectedRegKey, + expectedRegValue: tc.expectedRegValue, + retries: tc.retries, + } + + d := &discovery{ + lg: lg, + clusterToken: tc.token, + memberId: tc.memberId, + cfg: &DiscoveryConfig{}, + c: &clientv3.Client{ + KV: fkv, + }, + clock: clockwork.NewRealClock(), + } + + if err := d.registerSelf(tc.expectedRegValue); err != nil { + t.Errorf("Error occuring on register member self: %v", err) + } + + if fkv.retries != 0 { + t.Errorf("Discovery client did not retry registering itself on error, remaining retries: %d", fkv.retries) + } + }) + } +} + +// fakeWatcherForWaitPeers is used to test waitPeers. +type fakeWatcherForWaitPeers struct { + *fakeBaseWatcher + t *testing.T + token string + members []memberInfo +} + +// We only need to overwrite method `Watch`. +func (fw *fakeWatcherForWaitPeers) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { + expectedWatchKey := fmt.Sprintf("/_etcd/registry/%s/members", fw.token) + if key != expectedWatchKey { + fw.t.Errorf("unexpected watch key, expected: %s, got: %s", expectedWatchKey, key) + } + + ch := make(chan clientv3.WatchResponse, 1) + go func() { + for _, mi := range fw.members { + ch <- clientv3.WatchResponse{ + Events: []*clientv3.Event{ + { + Kv: &mvccpb.KeyValue{ + Key: []byte(mi.peerRegKey), + Value: []byte(mi.peerURLsMap), + CreateRevision: mi.createRev, + }, + }, + }, + } + } + close(ch) + }() + return ch +} + +func TestWaitPeers(t *testing.T) { + actualMemberInfo := []memberInfo{ + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(), + peerURLsMap: "infra1=http://192.168.0.100:2380", + createRev: 8, + }, + { + // invalid peer registry key + peerRegKey: "/invalidPrefix/fakeToken/members/" + types.ID(102).String(), + peerURLsMap: "infra2=http://192.168.0.102:2380", + createRev: 6, + }, + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(), + peerURLsMap: "infra2=http://192.168.0.102:2380", + createRev: 6, + }, + { + // invalid peer info format + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(), + peerURLsMap: "http://192.168.0.102:2380", + createRev: 6, + }, + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(103).String(), + peerURLsMap: "infra3=http://192.168.0.103:2380", + createRev: 7, + }, + { + // duplicate peer + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(), + peerURLsMap: "infra1=http://192.168.0.100:2380", + createRev: 2, + }, + } + + // sort by CreateRevision + expectedMemberInfo := []memberInfo{ + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(102).String(), + peerURLsMap: "infra2=http://192.168.0.102:2380", + createRev: 6, + }, + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(103).String(), + peerURLsMap: "infra3=http://192.168.0.103:2380", + createRev: 7, + }, + { + peerRegKey: "/_etcd/registry/fakeToken/members/" + types.ID(101).String(), + peerURLsMap: "infra1=http://192.168.0.100:2380", + createRev: 8, + }, + } + + lg, err := zap.NewProduction() + if err != nil { + t.Errorf("Failed to create a logger, error: %v", err) + } + + d := &discovery{ + lg: lg, + c: &clientv3.Client{ + KV: &fakeBaseKV{}, + Watcher: &fakeWatcherForWaitPeers{ + fakeBaseWatcher: &fakeBaseWatcher{}, + t: t, + token: "fakeToken", + members: actualMemberInfo, + }, + }, + cfg: &DiscoveryConfig{}, + clusterToken: "fakeToken", + } + + cls := clusterInfo{ + clusterToken: "fakeToken", + } + + d.waitPeers(&cls, 3, 0) + + if cls.Len() != len(expectedMemberInfo) { + t.Errorf("unexpected member number returned by watch, expected: %d, got: %d", len(expectedMemberInfo), cls.Len()) + } + + for i, m := range cls.members { + if m != expectedMemberInfo[i] { + t.Errorf("unexpected member[%d] returned by watch, expected: %v, got: %v", i, expectedMemberInfo[i], m) + } + } +} + +func TestGetInitClusterStr(t *testing.T) { + cases := []struct { + name string + members []memberInfo + clusterSize int + expectedResult string + expectedError error + }{ + { + name: "1 member", + members: []memberInfo{ + { + peerURLsMap: "infra2=http://192.168.0.102:2380", + }, + }, + clusterSize: 1, + expectedResult: "infra2=http://192.168.0.102:2380", + expectedError: nil, + }, + { + name: "2 members", + members: []memberInfo{ + { + peerURLsMap: "infra2=http://192.168.0.102:2380", + }, + { + peerURLsMap: "infra3=http://192.168.0.103:2380", + }, + }, + clusterSize: 2, + expectedResult: "infra2=http://192.168.0.102:2380,infra3=http://192.168.0.103:2380", + expectedError: nil, + }, + { + name: "3 members", + members: []memberInfo{ + { + peerURLsMap: "infra2=http://192.168.0.102:2380", + }, + { + peerURLsMap: "infra3=http://192.168.0.103:2380", + }, + { + peerURLsMap: "infra1=http://192.168.0.100:2380", + }, + }, + clusterSize: 3, + expectedResult: "infra2=http://192.168.0.102:2380,infra3=http://192.168.0.103:2380,infra1=http://192.168.0.100:2380", + expectedError: nil, + }, + { + name: "should ignore redundant member", + members: []memberInfo{ + { + peerURLsMap: "infra2=http://192.168.0.102:2380", + }, + { + peerURLsMap: "infra3=http://192.168.0.103:2380", + }, + { + peerURLsMap: "infra1=http://192.168.0.100:2380", + }, + { + peerURLsMap: "infra4=http://192.168.0.104:2380", + }, + }, + clusterSize: 3, + expectedResult: "infra2=http://192.168.0.102:2380,infra3=http://192.168.0.103:2380,infra1=http://192.168.0.100:2380", + expectedError: nil, + }, + { + name: "invalid_peer_url", + members: []memberInfo{ + { + peerURLsMap: "infra2=http://192.168.0.102:2380", + }, + { + peerURLsMap: "infra3=http://192.168.0.103", //not host:port + }, + }, + clusterSize: 2, + expectedResult: "infra2=http://192.168.0.102:2380,infra3=http://192.168.0.103:2380", + expectedError: ErrInvalidURL, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + clsInfo := &clusterInfo{ + members: tc.members, + } + + retStr, err := clsInfo.getInitClusterStr(tc.clusterSize) + if err != tc.expectedError { + t.Errorf("Unexpected error, expected: %v, got: %v", tc.expectedError, err) + } + + if err == nil { + if retStr != tc.expectedResult { + t.Errorf("Unexpected result, expected: %s, got: %s", tc.expectedResult, retStr) + } + } + }) + } +} + +// fakeBaseKV is the base struct implementing the interface `clientv3.KV`. +type fakeBaseKV struct{} + +func (fkv *fakeBaseKV) Put(ctx context.Context, key string, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { + return nil, nil +} + +func (fkv *fakeBaseKV) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + return nil, nil +} + +func (fkv *fakeBaseKV) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { + return nil, nil +} + +func (fkv *fakeBaseKV) Compact(ctx context.Context, rev int64, opts ...clientv3.CompactOption) (*clientv3.CompactResponse, error) { + return nil, nil +} + +func (fkv *fakeBaseKV) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) { + return clientv3.OpResponse{}, nil +} + +func (fkv *fakeBaseKV) Txn(ctx context.Context) clientv3.Txn { + return nil +} + +// fakeBaseWatcher is the base struct implementing the interface `clientv3.Watcher`. +type fakeBaseWatcher struct{} + +func (fw *fakeBaseWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { + return nil +} + +func (fw *fakeBaseWatcher) RequestProgress(ctx context.Context) error { + return nil +} + +func (fw *fakeBaseWatcher) Close() error { + return nil +} diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 0c1297afee50..76b888ec3e76 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -40,6 +40,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery" "go.etcd.io/etcd/server/v3/etcdserver/cindex" serverstorage "go.etcd.io/etcd/server/v3/storage" "go.etcd.io/etcd/server/v3/storage/backend" @@ -328,7 +329,11 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (* } if cfg.ShouldDiscover() { var str string - str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) + if cfg.EnableV2Discovery { + str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) + } else { + str, err = v3discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, &cfg.DiscoveryCfg, m.ID, cfg.InitialPeerURLsMap.String()) + } if err != nil { return nil, &DiscoveryError{Op: "join", Err: err} } diff --git a/tests/e2e/discovery_v3_test.go b/tests/e2e/discovery_v3_test.go new file mode 100644 index 000000000000..efc50656340f --- /dev/null +++ b/tests/e2e/discovery_v3_test.go @@ -0,0 +1,112 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "fmt" + "strconv" + "strings" + "testing" + + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +func TestClusterOf1UsingV3Discovery(t *testing.T) { + testClusterUsingV3Discovery(t, 1, e2e.ClientNonTLS, false) +} +func TestClusterOf3UsingV3Discovery(t *testing.T) { + testClusterUsingV3Discovery(t, 3, e2e.ClientTLS, true) +} +func TestTLSClusterOf3UsingV3Discovery(t *testing.T) { + testClusterUsingV3Discovery(t, 5, e2e.ClientTLS, false) +} + +func testClusterUsingV3Discovery(t *testing.T, clusterSize int, clientTlsType e2e.ClientConnType, isClientAutoTls bool) { + e2e.BeforeTest(t) + + // step 1: start the discovery service + ds, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + InitialToken: "new", + BasePort: 2000, + ClusterSize: 1, + ClientTLS: clientTlsType, + IsClientAutoTLS: isClientAutoTls, + }) + if err != nil { + t.Fatalf("could not start discovery etcd cluster (%v)", err) + } + defer ds.Close() + + // step 2: configure the cluster size + clusterToken := "8A591FAB-1D72-41FA-BDF2-A27162FDA1E0" + configSizeKey := fmt.Sprintf("/_etcd/registry/%s/_config/size", clusterToken) + configSizeValStr := strconv.Itoa(clusterSize) + if err := ctlV3Put(ctlCtx{epc: ds}, configSizeKey, configSizeValStr, ""); err != nil { + t.Errorf("failed to configure cluster size to discovery serivce, error: %v", err) + } + + // step 3: start the etcd cluster + epc, err := bootstrapEtcdClusterUsingV3Discovery(t, ds.EndpointsV3()[0], clusterToken, clusterSize, clientTlsType, isClientAutoTls) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + defer epc.Close() + + // step 4: sanity test on the etcd cluster + etcdctl := []string{e2e.CtlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ",")} + if err := e2e.SpawnWithExpect(append(etcdctl, "put", "key", "value"), "OK"); err != nil { + t.Fatal(err) + } + if err := e2e.SpawnWithExpect(append(etcdctl, "get", "key"), "value"); err != nil { + t.Fatal(err) + } +} + +func bootstrapEtcdClusterUsingV3Discovery(t *testing.T, durl string, clusterToken string, clusterSize int, clientTlsType e2e.ClientConnType, isClientAutoTls bool) (*e2e.EtcdProcessCluster, error) { + // cluster configuration + cfg := &e2e.EtcdProcessClusterConfig{ + BasePort: 3000, + ClusterSize: clusterSize, + IsPeerTLS: true, + IsPeerAutoTLS: true, + Discovery: fmt.Sprintf("%s/%s", durl, clusterToken), + } + + // initialize the cluster + epc, err := e2e.InitEtcdProcessCluster(t, cfg) + if err != nil { + return epc, err + } + + // populate discovery related security configuration + for _, ep := range epc.Procs { + epCfg := ep.Config() + epCfg.Args = append(epCfg.Args, "--enable-v2-discovery=false") + + if clientTlsType == e2e.ClientTLS { + if isClientAutoTls { + epCfg.Args = append(epCfg.Args, "--discovery-insecure-transport=false") + epCfg.Args = append(epCfg.Args, "--discovery-insecure-skip-tls-verify=true") + } else { + epCfg.Args = append(epCfg.Args, "--discovery-cacert="+e2e.CaPath) + epCfg.Args = append(epCfg.Args, "--discovery-cert="+e2e.CertPath) + epCfg.Args = append(epCfg.Args, "--discovery-key="+e2e.PrivateKeyPath) + } + } + } + + // start the cluster + return e2e.StartEtcdProcessCluster(epc, cfg) +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 242ebe0d6a7d..e1de1951047f 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -181,16 +181,7 @@ func NewEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdPr return nil, err } - if cfg.RollingStart { - if err := epc.RollingStart(); err != nil { - return nil, fmt.Errorf("Cannot rolling-start: %v", err) - } - } else { - if err := epc.Start(); err != nil { - return nil, fmt.Errorf("Cannot start: %v", err) - } - } - return epc, nil + return StartEtcdProcessCluster(epc, cfg) } // InitEtcdProcessCluster initializes a new cluster based on the given config. @@ -218,6 +209,21 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP return epc, nil } +// StartEtcdProcessCluster launches a new cluster from etcd processes. +func StartEtcdProcessCluster(epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) { + if cfg.RollingStart { + if err := epc.RollingStart(); err != nil { + return nil, fmt.Errorf("cannot rolling-start: %v", err) + } + } else { + if err := epc.Start(); err != nil { + return nil, fmt.Errorf("cannot start: %v", err) + } + } + + return epc, nil +} + func (cfg *EtcdProcessClusterConfig) ClientScheme() string { if cfg.ClientTLS == ClientTLS { return "https" diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index 4600e092d84d..09f0728af49f 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -585,6 +585,8 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { peerScheme := SchemeFromTLSInfo(mcfg.PeerTLS) clientScheme := SchemeFromTLSInfo(mcfg.ClientTLS) + m.EnableV2Discovery = embed.DefaultEnableV2Discovery + pln := newLocalListener(t) m.PeerListeners = []net.Listener{pln} m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})