diff --git a/controlplane/kubeadm/controllers/alias.go b/controlplane/kubeadm/controllers/alias.go index 2677c5ca5888..f5584c0dd86e 100644 --- a/controlplane/kubeadm/controllers/alias.go +++ b/controlplane/kubeadm/controllers/alias.go @@ -35,6 +35,7 @@ type KubeadmControlPlaneReconciler struct { Tracker *remote.ClusterCacheTracker EtcdDialTimeout time.Duration + EtcdCallTimeout time.Duration // WatchFilterValue is the label value used to filter events prior to reconciliation. WatchFilterValue string @@ -47,6 +48,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg APIReader: r.APIReader, Tracker: r.Tracker, EtcdDialTimeout: r.EtcdDialTimeout, + EtcdCallTimeout: r.EtcdCallTimeout, WatchFilterValue: r.WatchFilterValue, }).SetupWithManager(ctx, mgr, options) } diff --git a/controlplane/kubeadm/internal/cluster.go b/controlplane/kubeadm/internal/cluster.go index 28176a37e920..47c4e419b946 100644 --- a/controlplane/kubeadm/internal/cluster.go +++ b/controlplane/kubeadm/internal/cluster.go @@ -53,6 +53,7 @@ type Management struct { Client client.Reader Tracker *remote.ClusterCacheTracker EtcdDialTimeout time.Duration + EtcdCallTimeout time.Duration } // RemoteClusterConnectionError represents a failure to connect to a remote cluster. @@ -163,7 +164,7 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O restConfig: restConfig, Client: c, CoreDNSMigrator: &CoreDNSMigrator{}, - etcdClientGenerator: NewEtcdClientGenerator(restConfig, tlsConfig, m.EtcdDialTimeout), + etcdClientGenerator: NewEtcdClientGenerator(restConfig, tlsConfig, m.EtcdDialTimeout, m.EtcdCallTimeout), }, nil } diff --git a/controlplane/kubeadm/internal/controllers/controller.go b/controlplane/kubeadm/internal/controllers/controller.go index d8fdeeda57ec..c29b46fb3ccd 100644 --- a/controlplane/kubeadm/internal/controllers/controller.go +++ b/controlplane/kubeadm/internal/controllers/controller.go @@ -70,6 +70,7 @@ type KubeadmControlPlaneReconciler struct { recorder record.EventRecorder Tracker *remote.ClusterCacheTracker EtcdDialTimeout time.Duration + EtcdCallTimeout time.Duration // WatchFilterValue is the label value used to filter events prior to reconciliation. WatchFilterValue string @@ -112,6 +113,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg Client: r.Client, Tracker: r.Tracker, EtcdDialTimeout: r.EtcdDialTimeout, + EtcdCallTimeout: r.EtcdCallTimeout, } } diff --git a/controlplane/kubeadm/internal/etcd/etcd.go b/controlplane/kubeadm/internal/etcd/etcd.go index 860d218776e3..70464cfb6a56 100644 --- a/controlplane/kubeadm/internal/etcd/etcd.go +++ b/controlplane/kubeadm/internal/etcd/etcd.go @@ -49,10 +49,11 @@ type etcd interface { // Client wraps an etcd client formatting its output to something more consumable. type Client struct { - EtcdClient etcd - Endpoint string - LeaderID uint64 - Errors []string + EtcdClient etcd + Endpoint string + LeaderID uint64 + Errors []string + CallTimeout time.Duration } // MemberAlarm represents an alarm type association with a cluster member. @@ -78,6 +79,10 @@ const ( AlarmCorrupt ) +// DefaultCallTimeout represents the duration that the etcd client waits at most +// for read and write operations to etcd. +const DefaultCallTimeout = 15 * time.Second + // AlarmTypeName provides a text translation for AlarmType codes. var AlarmTypeName = map[AlarmType]string{ AlarmOK: "NONE", @@ -130,6 +135,7 @@ type ClientConfiguration struct { Proxy proxy.Proxy TLSConfig *tls.Config DialTimeout time.Duration + CallTimeout time.Duration } // NewClient creates a new etcd client with the given configuration. @@ -152,7 +158,12 @@ func NewClient(ctx context.Context, config ClientConfiguration) (*Client, error) return nil, errors.Wrap(err, "unable to create etcd client") } - client, err := newEtcdClient(ctx, etcdClient) + callTimeout := config.CallTimeout + if callTimeout == 0 { + callTimeout = DefaultCallTimeout + } + + client, err := newEtcdClient(ctx, etcdClient, callTimeout) if err != nil { closeErr := etcdClient.Close() return nil, errors.Wrap(kerrors.NewAggregate([]error{err, closeErr}), "unable to create etcd client") @@ -160,22 +171,27 @@ func NewClient(ctx context.Context, config ClientConfiguration) (*Client, error) return client, nil } -func newEtcdClient(ctx context.Context, etcdClient etcd) (*Client, error) { +func newEtcdClient(ctx context.Context, etcdClient etcd, callTimeout time.Duration) (*Client, error) { + endpoints := etcdClient.Endpoints() if len(endpoints) == 0 { return nil, errors.New("etcd client was not configured with any endpoints") } + ctx, cancel := context.WithTimeout(ctx, callTimeout) + defer cancel() + status, err := etcdClient.Status(ctx, endpoints[0]) if err != nil { return nil, errors.Wrap(err, "failed to get etcd status") } return &Client{ - Endpoint: endpoints[0], - EtcdClient: etcdClient, - LeaderID: status.Leader, - Errors: status.Errors, + Endpoint: endpoints[0], + EtcdClient: etcdClient, + LeaderID: status.Leader, + Errors: status.Errors, + CallTimeout: callTimeout, }, nil } @@ -186,6 +202,9 @@ func (c *Client) Close() error { // Members retrieves a list of etcd members. func (c *Client) Members(ctx context.Context) ([]*Member, error) { + ctx, cancel := context.WithTimeout(ctx, c.CallTimeout) + defer cancel() + response, err := c.EtcdClient.MemberList(ctx) if err != nil { return nil, errors.Wrap(err, "failed to get list of members for etcd cluster") @@ -214,18 +233,27 @@ func (c *Client) Members(ctx context.Context) ([]*Member, error) { // MoveLeader moves the leader to the provided member ID. func (c *Client) MoveLeader(ctx context.Context, newLeaderID uint64) error { + ctx, cancel := context.WithTimeout(ctx, c.CallTimeout) + defer cancel() + _, err := c.EtcdClient.MoveLeader(ctx, newLeaderID) return errors.Wrapf(err, "failed to move etcd leader: %v", newLeaderID) } // RemoveMember removes a given member. func (c *Client) RemoveMember(ctx context.Context, id uint64) error { + ctx, cancel := context.WithTimeout(ctx, c.CallTimeout) + defer cancel() + _, err := c.EtcdClient.MemberRemove(ctx, id) return errors.Wrapf(err, "failed to remove member: %v", id) } // UpdateMemberPeerURLs updates the list of peer URLs. func (c *Client) UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs []string) ([]*Member, error) { + ctx, cancel := context.WithTimeout(ctx, c.CallTimeout) + defer cancel() + response, err := c.EtcdClient.MemberUpdate(ctx, id, peerURLs) if err != nil { return nil, errors.Wrapf(err, "failed to update etcd member %v's peer list to %+v", id, peerURLs) @@ -241,6 +269,9 @@ func (c *Client) UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs [ // Alarms retrieves all alarms on a cluster. func (c *Client) Alarms(ctx context.Context) ([]MemberAlarm, error) { + ctx, cancel := context.WithTimeout(ctx, c.CallTimeout) + defer cancel() + alarmResponse, err := c.EtcdClient.AlarmList(ctx) if err != nil { return nil, errors.Wrap(err, "failed to get alarms for etcd cluster") diff --git a/controlplane/kubeadm/internal/etcd/etcd_test.go b/controlplane/kubeadm/internal/etcd/etcd_test.go index 84028d357513..80ac5254199a 100644 --- a/controlplane/kubeadm/internal/etcd/etcd_test.go +++ b/controlplane/kubeadm/internal/etcd/etcd_test.go @@ -49,7 +49,7 @@ func TestEtcdMembers_WithErrors(t *testing.T) { ErrorResponse: errors.New("something went wrong"), } - client, err := newEtcdClient(ctx, fakeEtcdClient) + client, err := newEtcdClient(ctx, fakeEtcdClient, DefaultCallTimeout) g.Expect(err).NotTo(HaveOccurred()) members, err := client.Members(ctx) @@ -86,7 +86,7 @@ func TestEtcdMembers_WithSuccess(t *testing.T) { StatusResponse: &clientv3.StatusResponse{}, } - client, err := newEtcdClient(ctx, fakeEtcdClient) + client, err := newEtcdClient(ctx, fakeEtcdClient, DefaultCallTimeout) g.Expect(err).NotTo(HaveOccurred()) members, err := client.Members(ctx) diff --git a/controlplane/kubeadm/internal/etcd_client_generator.go b/controlplane/kubeadm/internal/etcd_client_generator.go index 8c6d6745ab0b..bf5486aed539 100644 --- a/controlplane/kubeadm/internal/etcd_client_generator.go +++ b/controlplane/kubeadm/internal/etcd_client_generator.go @@ -43,7 +43,7 @@ type clientCreator func(ctx context.Context, endpoints []string) (*etcd.Client, var errEtcdNodeConnection = errors.New("failed to connect to etcd node") // NewEtcdClientGenerator returns a new etcdClientGenerator instance. -func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcdDialTimeout time.Duration) *EtcdClientGenerator { +func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcdDialTimeout, etcdCallTimeout time.Duration) *EtcdClientGenerator { ecg := &EtcdClientGenerator{restConfig: restConfig, tlsConfig: tlsConfig} ecg.createClient = func(ctx context.Context, endpoints []string) (*etcd.Client, error) { @@ -58,6 +58,7 @@ func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcd Proxy: p, TLSConfig: tlsConfig, DialTimeout: etcdDialTimeout, + CallTimeout: etcdCallTimeout, }) } diff --git a/controlplane/kubeadm/internal/etcd_client_generator_test.go b/controlplane/kubeadm/internal/etcd_client_generator_test.go index eadbcf15b0c8..eff3879ec2d6 100644 --- a/controlplane/kubeadm/internal/etcd_client_generator_test.go +++ b/controlplane/kubeadm/internal/etcd_client_generator_test.go @@ -38,7 +38,7 @@ var ( func TestNewEtcdClientGenerator(t *testing.T) { g := NewWithT(t) - subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0) + subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0, 0) g.Expect(subject.createClient).To(Not(BeNil())) } @@ -90,7 +90,7 @@ func TestFirstAvailableNode(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) - subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0) + subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0, 0) subject.createClient = tt.cc client, err := subject.forFirstAvailableNode(ctx, tt.nodes) @@ -212,7 +212,7 @@ func TestForLeader(t *testing.T) { t.Run(tt.name, func(t *testing.T) { g := NewWithT(t) - subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0) + subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0, 0) subject.createClient = tt.cc client, err := subject.forLeader(ctx, tt.nodes) diff --git a/controlplane/kubeadm/main.go b/controlplane/kubeadm/main.go index 7d3f36080cdf..5efa78a1236a 100644 --- a/controlplane/kubeadm/main.go +++ b/controlplane/kubeadm/main.go @@ -51,6 +51,7 @@ import ( controlplanev1alpha4 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha4" controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1" kubeadmcontrolplanecontrollers "sigs.k8s.io/cluster-api/controlplane/kubeadm/controllers" + "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd" kcpwebhooks "sigs.k8s.io/cluster-api/controlplane/kubeadm/webhooks" "sigs.k8s.io/cluster-api/feature" "sigs.k8s.io/cluster-api/util/flags" @@ -90,6 +91,7 @@ var ( webhookCertDir string healthAddr string etcdDialTimeout time.Duration + etcdCallTimeout time.Duration tlsOptions = flags.TLSOptions{} logOptions = logs.NewOptions() ) @@ -141,6 +143,9 @@ func InitFlags(fs *pflag.FlagSet) { fs.DurationVar(&etcdDialTimeout, "etcd-dial-timeout-duration", 10*time.Second, "Duration that the etcd client waits at most to establish a connection with etcd") + fs.DurationVar(&etcdCallTimeout, "etcd-call-timeout-duration", etcd.DefaultCallTimeout, + "Duration that the etcd client waits at most for read and write operations to etcd.") + flags.AddTLSOptions(fs, &tlsOptions) feature.MutableGates.AddFlag(fs) @@ -266,6 +271,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { Tracker: tracker, WatchFilterValue: watchFilterValue, EtcdDialTimeout: etcdDialTimeout, + EtcdCallTimeout: etcdCallTimeout, }).SetupWithManager(ctx, mgr, concurrency(kubeadmControlPlaneConcurrency)); err != nil { setupLog.Error(err, "unable to create controller", "controller", "KubeadmControlPlane") os.Exit(1)