Skip to content

Commit

Permalink
Merge pull request #7841 from jelmersnoeck/configure-etcd-call-timeout
Browse files Browse the repository at this point in the history
🌱 Add configurable etcd call timeout
  • Loading branch information
k8s-ci-robot authored Jan 18, 2023
2 parents 9dc69be + d9dbca5 commit e274f44
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 17 deletions.
2 changes: 2 additions & 0 deletions controlplane/kubeadm/controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type KubeadmControlPlaneReconciler struct {
Tracker *remote.ClusterCacheTracker

EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand All @@ -47,6 +48,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
APIReader: r.APIReader,
Tracker: r.Tracker,
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
WatchFilterValue: r.WatchFilterValue,
}).SetupWithManager(ctx, mgr, options)
}
3 changes: 2 additions & 1 deletion controlplane/kubeadm/internal/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Management struct {
Client client.Reader
Tracker *remote.ClusterCacheTracker
EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration
}

// RemoteClusterConnectionError represents a failure to connect to a remote cluster.
Expand Down Expand Up @@ -163,7 +164,7 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O
restConfig: restConfig,
Client: c,
CoreDNSMigrator: &CoreDNSMigrator{},
etcdClientGenerator: NewEtcdClientGenerator(restConfig, tlsConfig, m.EtcdDialTimeout),
etcdClientGenerator: NewEtcdClientGenerator(restConfig, tlsConfig, m.EtcdDialTimeout, m.EtcdCallTimeout),
}, nil
}

Expand Down
2 changes: 2 additions & 0 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type KubeadmControlPlaneReconciler struct {
recorder record.EventRecorder
Tracker *remote.ClusterCacheTracker
EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand Down Expand Up @@ -112,6 +113,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
Client: r.Client,
Tracker: r.Tracker,
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
}
}

Expand Down
50 changes: 40 additions & 10 deletions controlplane/kubeadm/internal/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ type etcd interface {

// Client wraps an etcd client formatting its output to something more consumable.
type Client struct {
EtcdClient etcd
Endpoint string
LeaderID uint64
Errors []string
EtcdClient etcd
Endpoint string
LeaderID uint64
Errors []string
CallTimeout time.Duration
}

// MemberAlarm represents an alarm type association with a cluster member.
Expand All @@ -78,6 +79,10 @@ const (
AlarmCorrupt
)

// DefaultCallTimeout represents the duration that the etcd client waits at most
// for read and write operations to etcd.
const DefaultCallTimeout = 15 * time.Second

// AlarmTypeName provides a text translation for AlarmType codes.
var AlarmTypeName = map[AlarmType]string{
AlarmOK: "NONE",
Expand Down Expand Up @@ -130,6 +135,7 @@ type ClientConfiguration struct {
Proxy proxy.Proxy
TLSConfig *tls.Config
DialTimeout time.Duration
CallTimeout time.Duration
}

// NewClient creates a new etcd client with the given configuration.
Expand All @@ -152,30 +158,39 @@ func NewClient(ctx context.Context, config ClientConfiguration) (*Client, error)
return nil, errors.Wrap(err, "unable to create etcd client")
}

client, err := newEtcdClient(ctx, etcdClient)
callTimeout := config.CallTimeout
if callTimeout == 0 {
callTimeout = DefaultCallTimeout
}

client, err := newEtcdClient(ctx, etcdClient, callTimeout)
if err != nil {
closeErr := etcdClient.Close()
return nil, errors.Wrap(kerrors.NewAggregate([]error{err, closeErr}), "unable to create etcd client")
}
return client, nil
}

func newEtcdClient(ctx context.Context, etcdClient etcd) (*Client, error) {
func newEtcdClient(ctx context.Context, etcdClient etcd, callTimeout time.Duration) (*Client, error) {
endpoints := etcdClient.Endpoints()
if len(endpoints) == 0 {
return nil, errors.New("etcd client was not configured with any endpoints")
}

ctx, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()

status, err := etcdClient.Status(ctx, endpoints[0])
if err != nil {
return nil, errors.Wrap(err, "failed to get etcd status")
}

return &Client{
Endpoint: endpoints[0],
EtcdClient: etcdClient,
LeaderID: status.Leader,
Errors: status.Errors,
Endpoint: endpoints[0],
EtcdClient: etcdClient,
LeaderID: status.Leader,
Errors: status.Errors,
CallTimeout: callTimeout,
}, nil
}

Expand All @@ -186,6 +201,9 @@ func (c *Client) Close() error {

// Members retrieves a list of etcd members.
func (c *Client) Members(ctx context.Context) ([]*Member, error) {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
defer cancel()

response, err := c.EtcdClient.MemberList(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get list of members for etcd cluster")
Expand Down Expand Up @@ -214,18 +232,27 @@ func (c *Client) Members(ctx context.Context) ([]*Member, error) {

// MoveLeader moves the leader to the provided member ID.
func (c *Client) MoveLeader(ctx context.Context, newLeaderID uint64) error {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
defer cancel()

_, err := c.EtcdClient.MoveLeader(ctx, newLeaderID)
return errors.Wrapf(err, "failed to move etcd leader: %v", newLeaderID)
}

// RemoveMember removes a given member.
func (c *Client) RemoveMember(ctx context.Context, id uint64) error {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
defer cancel()

_, err := c.EtcdClient.MemberRemove(ctx, id)
return errors.Wrapf(err, "failed to remove member: %v", id)
}

// UpdateMemberPeerURLs updates the list of peer URLs.
func (c *Client) UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs []string) ([]*Member, error) {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
defer cancel()

response, err := c.EtcdClient.MemberUpdate(ctx, id, peerURLs)
if err != nil {
return nil, errors.Wrapf(err, "failed to update etcd member %v's peer list to %+v", id, peerURLs)
Expand All @@ -241,6 +268,9 @@ func (c *Client) UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs [

// Alarms retrieves all alarms on a cluster.
func (c *Client) Alarms(ctx context.Context) ([]MemberAlarm, error) {
ctx, cancel := context.WithTimeout(ctx, c.CallTimeout)
defer cancel()

alarmResponse, err := c.EtcdClient.AlarmList(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get alarms for etcd cluster")
Expand Down
4 changes: 2 additions & 2 deletions controlplane/kubeadm/internal/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestEtcdMembers_WithErrors(t *testing.T) {
ErrorResponse: errors.New("something went wrong"),
}

client, err := newEtcdClient(ctx, fakeEtcdClient)
client, err := newEtcdClient(ctx, fakeEtcdClient, DefaultCallTimeout)
g.Expect(err).NotTo(HaveOccurred())

members, err := client.Members(ctx)
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestEtcdMembers_WithSuccess(t *testing.T) {
StatusResponse: &clientv3.StatusResponse{},
}

client, err := newEtcdClient(ctx, fakeEtcdClient)
client, err := newEtcdClient(ctx, fakeEtcdClient, DefaultCallTimeout)
g.Expect(err).NotTo(HaveOccurred())

members, err := client.Members(ctx)
Expand Down
3 changes: 2 additions & 1 deletion controlplane/kubeadm/internal/etcd_client_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type clientCreator func(ctx context.Context, endpoints []string) (*etcd.Client,
var errEtcdNodeConnection = errors.New("failed to connect to etcd node")

// NewEtcdClientGenerator returns a new etcdClientGenerator instance.
func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcdDialTimeout time.Duration) *EtcdClientGenerator {
func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcdDialTimeout, etcdCallTimeout time.Duration) *EtcdClientGenerator {
ecg := &EtcdClientGenerator{restConfig: restConfig, tlsConfig: tlsConfig}

ecg.createClient = func(ctx context.Context, endpoints []string) (*etcd.Client, error) {
Expand All @@ -58,6 +58,7 @@ func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcd
Proxy: p,
TLSConfig: tlsConfig,
DialTimeout: etcdDialTimeout,
CallTimeout: etcdCallTimeout,
})
}

Expand Down
6 changes: 3 additions & 3 deletions controlplane/kubeadm/internal/etcd_client_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (

func TestNewEtcdClientGenerator(t *testing.T) {
g := NewWithT(t)
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0)
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0, 0)
g.Expect(subject.createClient).To(Not(BeNil()))
}

Expand Down Expand Up @@ -90,7 +90,7 @@ func TestFirstAvailableNode(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0)
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0, 0)
subject.createClient = tt.cc

client, err := subject.forFirstAvailableNode(ctx, tt.nodes)
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestForLeader(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0)
subject = NewEtcdClientGenerator(&rest.Config{}, &tls.Config{MinVersion: tls.VersionTLS12}, 0, 0)
subject.createClient = tt.cc

client, err := subject.forLeader(ctx, tt.nodes)
Expand Down
6 changes: 6 additions & 0 deletions controlplane/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
controlplanev1alpha4 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha4"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
kubeadmcontrolplanecontrollers "sigs.k8s.io/cluster-api/controlplane/kubeadm/controllers"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
kcpwebhooks "sigs.k8s.io/cluster-api/controlplane/kubeadm/webhooks"
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/util/flags"
Expand Down Expand Up @@ -90,6 +91,7 @@ var (
webhookCertDir string
healthAddr string
etcdDialTimeout time.Duration
etcdCallTimeout time.Duration
tlsOptions = flags.TLSOptions{}
logOptions = logs.NewOptions()
)
Expand Down Expand Up @@ -141,6 +143,9 @@ func InitFlags(fs *pflag.FlagSet) {
fs.DurationVar(&etcdDialTimeout, "etcd-dial-timeout-duration", 10*time.Second,
"Duration that the etcd client waits at most to establish a connection with etcd")

fs.DurationVar(&etcdCallTimeout, "etcd-call-timeout-duration", etcd.DefaultCallTimeout,
"Duration that the etcd client waits at most for read and write operations to etcd.")

flags.AddTLSOptions(fs, &tlsOptions)

feature.MutableGates.AddFlag(fs)
Expand Down Expand Up @@ -266,6 +271,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
Tracker: tracker,
WatchFilterValue: watchFilterValue,
EtcdDialTimeout: etcdDialTimeout,
EtcdCallTimeout: etcdCallTimeout,
}).SetupWithManager(ctx, mgr, concurrency(kubeadmControlPlaneConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "KubeadmControlPlane")
os.Exit(1)
Expand Down

0 comments on commit e274f44

Please sign in to comment.