Skip to content

Commit

Permalink
Add configurable etcd call timeout
Browse files Browse the repository at this point in the history
Add configurable call timeouts to the etcd client through context
deadlines. This is different than the dial timeout, which is only used
to establish the connection from the client to the server.

This avoids a stuck reconciliation loop when there is an unresponsive
call to the server.
  • Loading branch information
Jelmer Snoeck committed Jan 17, 2023
1 parent 7a0ee7c commit 878c8f6
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 17 deletions.
2 changes: 2 additions & 0 deletions controlplane/kubeadm/controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type KubeadmControlPlaneReconciler struct {
Tracker *remote.ClusterCacheTracker

EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand All @@ -47,6 +48,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
APIReader: r.APIReader,
Tracker: r.Tracker,
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
WatchFilterValue: r.WatchFilterValue,
}).SetupWithManager(ctx, mgr, options)
}
3 changes: 2 additions & 1 deletion controlplane/kubeadm/internal/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Management struct {
Client client.Reader
Tracker *remote.ClusterCacheTracker
EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration
}

// RemoteClusterConnectionError represents a failure to connect to a remote cluster.
Expand Down Expand Up @@ -163,7 +164,7 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O
restConfig: restConfig,
Client: c,
CoreDNSMigrator: &CoreDNSMigrator{},
etcdClientGenerator: NewEtcdClientGenerator(restConfig, tlsConfig, m.EtcdDialTimeout),
etcdClientGenerator: NewEtcdClientGenerator(restConfig, tlsConfig, m.EtcdDialTimeout, m.EtcdCallTimeout),
}, nil
}

Expand Down
2 changes: 2 additions & 0 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type KubeadmControlPlaneReconciler struct {
recorder record.EventRecorder
Tracker *remote.ClusterCacheTracker
EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand Down Expand Up @@ -112,6 +113,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
Client: r.Client,
Tracker: r.Tracker,
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
}
}

Expand Down
51 changes: 41 additions & 10 deletions controlplane/kubeadm/internal/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ type etcd interface {

// Client wraps an etcd client formatting its output to something more consumable.
type Client struct {
EtcdClient etcd
Endpoint string
LeaderID uint64
Errors []string
EtcdClient etcd
Endpoint string
LeaderID uint64
Errors []string
CallTimeout time.Duration
}

// MemberAlarm represents an alarm type association with a cluster member.
Expand All @@ -78,6 +79,10 @@ const (
AlarmCorrupt
)

// DefaultCallTimeout represents the duration that the etcd client waits at most
// for read and write operations to etcd.
const DefaultCallTimeout = 15 * time.Second

// AlarmTypeName provides a text translation for AlarmType codes.
var AlarmTypeName = map[AlarmType]string{
AlarmOK: "NONE",
Expand Down Expand Up @@ -130,6 +135,7 @@ type ClientConfiguration struct {
Proxy proxy.Proxy
TLSConfig *tls.Config
DialTimeout time.Duration
CallTimeout time.Duration
}

// NewClient creates a new etcd client with the given configuration.
Expand All @@ -152,30 +158,40 @@ func NewClient(ctx context.Context, config ClientConfiguration) (*Client, error)
return nil, errors.Wrap(err, "unable to create etcd client")
}

client, err := newEtcdClient(ctx, etcdClient)
callTimeout := config.CallTimeout
if callTimeout == 0 {
callTimeout = DefaultCallTimeout
}

client, err := newEtcdClient(ctx, etcdClient, callTimeout)
if err != nil {
closeErr := etcdClient.Close()
return nil, errors.Wrap(kerrors.NewAggregate([]error{err, closeErr}), "unable to create etcd client")
}
return client, nil
}

func newEtcdClient(ctx context.Context, etcdClient etcd) (*Client, error) {
func newEtcdClient(ctx context.Context, etcdClient etcd, callTimeout time.Duration) (*Client, error) {

endpoints := etcdClient.Endpoints()
if len(endpoints) == 0 {
return nil, errors.New("etcd client was not configured with any endpoints")
}

ctx, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()

status, err := etcdClient.Status(ctx, endpoints[0])
if err != nil {
return nil, errors.Wrap(err, "failed to get etcd status")
}

return &Client{
Endpoint: endpoints[0],
EtcdClient: etcdClient,
LeaderID: status.Leader,
Errors: status.Errors,
Endpoint: endpoints[0],
EtcdClient: etcdClient,
LeaderID: status.Leader,
Errors: status.Errors,
CallTimeout: callTimeout,
}, nil
}

Expand All @@ -186,6 +202,9 @@ func (c *Client) Close() error {

// Members retrieves a list of etcd members.
func (c *Client) Members(ctx context.Context) ([]*Member, error) {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
defer cancel()

response, err := c.EtcdClient.MemberList(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get list of members for etcd cluster")
Expand Down Expand Up @@ -214,18 +233,27 @@ func (c *Client) Members(ctx context.Context) ([]*Member, error) {

// MoveLeader moves the leader to the provided member ID.
func (c *Client) MoveLeader(ctx context.Context, newLeaderID uint64) error {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
defer cancel()

_, err := c.EtcdClient.MoveLeader(ctx, newLeaderID)
return errors.Wrapf(err, "failed to move etcd leader: %v", newLeaderID)
}

// RemoveMember removes a given member.
func (c *Client) RemoveMember(ctx context.Context, id uint64) error {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
defer cancel()

_, err := c.EtcdClient.MemberRemove(ctx, id)
return errors.Wrapf(err, "failed to remove member: %v", id)
}

// UpdateMemberPeerURLs updates the list of peer URLs.
func (c *Client) UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs []string) ([]*Member, error) {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
defer cancel()

response, err := c.EtcdClient.MemberUpdate(ctx, id, peerURLs)
if err != nil {
return nil, errors.Wrapf(err, "failed to update etcd member %v's peer list to %+v", id, peerURLs)
Expand All @@ -241,6 +269,9 @@ func (c *Client) UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs [

// Alarms retrieves all alarms on a cluster.
func (c *Client) Alarms(ctx context.Context) ([]MemberAlarm, error) {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
defer cancel()

alarmResponse, err := c.EtcdClient.AlarmList(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get alarms for etcd cluster")
Expand Down
4 changes: 2 additions & 2 deletions controlplane/kubeadm/internal/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestEtcdMembers_WithErrors(t *testing.T) {
ErrorResponse: errors.New("something went wrong"),
}

client, err := newEtcdClient(ctx, fakeEtcdClient)
client, err := newEtcdClient(ctx, fakeEtcdClient, DefaultCallTimeout)
g.Expect(err).NotTo(HaveOccurred())

members, err := client.Members(ctx)
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestEtcdMembers_WithSuccess(t *testing.T) {
StatusResponse: &clientv3.StatusResponse{},
}

client, err := newEtcdClient(ctx, fakeEtcdClient)
client, err := newEtcdClient(ctx, fakeEtcdClient, DefaultCallTimeout)
g.Expect(err).NotTo(HaveOccurred())

members, err := client.Members(ctx)
Expand Down
3 changes: 2 additions & 1 deletion controlplane/kubeadm/internal/etcd_client_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type clientCreator func(ctx context.Context, endpoints []string) (*etcd.Client,
var errEtcdNodeConnection = errors.New("failed to connect to etcd node")

// NewEtcdClientGenerator returns a new etcdClientGenerator instance.
func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcdDialTimeout time.Duration) *EtcdClientGenerator {
func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcdDialTimeout, etcdCallTimeout time.Duration) *EtcdClientGenerator {
ecg := &EtcdClientGenerator{restConfig: restConfig, tlsConfig: tlsConfig}

ecg.createClient = func(ctx context.Context, endpoints []string) (*etcd.Client, error) {
Expand All @@ -58,6 +58,7 @@ func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcd
Proxy: p,
TLSConfig: tlsConfig,
DialTimeout: etcdDialTimeout,
CallTimeout: etcdCallTimeout,
})
}

Expand Down
6 changes: 3 additions & 3 deletions controlplane/kubeadm/internal/etcd_client_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (

func TestNewEtcdClientGenerator(t *testing.T) {
g := NewWithT(t)
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0)
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0, 0)
g.Expect(subject.createClient).To(Not(BeNil()))
}

Expand Down Expand Up @@ -90,7 +90,7 @@ func TestFirstAvailableNode(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0)
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0, 0)
subject.createClient = tt.cc

client, err := subject.forFirstAvailableNode(ctx, tt.nodes)
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestForLeader(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0)
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0, 0)
subject.createClient = tt.cc

client, err := subject.forLeader(ctx, tt.nodes)
Expand Down
6 changes: 6 additions & 0 deletions controlplane/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
controlplanev1alpha4 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha4"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
kubeadmcontrolplanecontrollers "sigs.k8s.io/cluster-api/controlplane/kubeadm/controllers"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
kcpwebhooks "sigs.k8s.io/cluster-api/controlplane/kubeadm/webhooks"
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/util/flags"
Expand Down Expand Up @@ -90,6 +91,7 @@ var (
webhookCertDir string
healthAddr string
etcdDialTimeout time.Duration
etcdCallTimeout time.Duration
tlsOptions = flags.TLSOptions{}
logOptions = logs.NewOptions()
)
Expand Down Expand Up @@ -141,6 +143,9 @@ func InitFlags(fs *pflag.FlagSet) {
fs.DurationVar(&etcdDialTimeout, "etcd-dial-timeout-duration", 10*time.Second,
"Duration that the etcd client waits at most to establish a connection with etcd")

fs.DurationVar(&etcdCallTimeout, "etcd-call-timeout-duration", etcd.DefaultCallTimeout,
"Duration that the etcd client waits at most for read and write operations to etcd.")

flags.AddTLSOptions(fs, &tlsOptions)

feature.MutableGates.AddFlag(fs)
Expand Down Expand Up @@ -266,6 +271,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
Tracker: tracker,
WatchFilterValue: watchFilterValue,
EtcdDialTimeout: etcdDialTimeout,
EtcdCallTimeout: etcdCallTimeout,
}).SetupWithManager(ctx, mgr, concurrency(kubeadmControlPlaneConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "KubeadmControlPlane")
os.Exit(1)
Expand Down

0 comments on commit 878c8f6

Please sign in to comment.