Skip to content

Commit

Permalink
Adds forNodes that will use all available etcd endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
gab-satchi committed May 25, 2020
1 parent 99cb87a commit 4a4261f
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 38 deletions.
2 changes: 1 addition & 1 deletion controlplane/kubeadm/internal/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O
RootCAs: caPool,
Certificates: []tls.Certificate{clientCert},
}

cfg.InsecureSkipVerify = true
return &Workload{
Client: c,
CoreDNSMigrator: &CoreDNSMigrator{},
Expand Down
4 changes: 2 additions & 2 deletions controlplane/kubeadm/internal/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func pbMemberToMember(m *etcdserverpb.Member) *Member {
}

// NewEtcdClient creates a new etcd client with a custom dialer and is configuration with optional functions.
func NewEtcdClient(endpoint string, dialer GRPCDial, tlsConfig *tls.Config) (*clientv3.Client, error) {
func NewEtcdClient(endpoints []string, dialer GRPCDial, tlsConfig *tls.Config) (*clientv3.Client, error) {
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: []string{endpoint},
Endpoints: endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
Expand Down
33 changes: 18 additions & 15 deletions controlplane/kubeadm/internal/etcd_client_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package internal
import (
"context"
"crypto/tls"

"github.com/pkg/errors"
kerrors "k8s.io/apimachinery/pkg/util/errors"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/rest"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/proxy"
Expand All @@ -35,21 +35,24 @@ type etcdClientGenerator struct {
tlsConfig *tls.Config
}

func (c *etcdClientGenerator) forNode(ctx context.Context, name string) (*etcd.Client, error) {
// This does not support external etcd.
func (c *etcdClientGenerator) forNodes(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error) {
endpoints := make([]string, len(nodes))
for i, node := range nodes {
endpoints[i] = staticPodName("etcd", node.Name)
}

p := proxy.Proxy{
Kind: "pods",
Namespace: metav1.NamespaceSystem, // TODO, can etcd ever run in a different namespace?
ResourceName: staticPodName("etcd", name),
KubeConfig: c.restConfig,
TLSConfig: c.tlsConfig,
Port: 2379, // TODO: the pod doesn't expose a port. Is this a problem?
Kind: "pods",
Namespace: metav1.NamespaceSystem,
KubeConfig: c.restConfig,
TLSConfig: c.tlsConfig,
Port: 2379,
}
dialer, err := proxy.NewDialer(p)
if err != nil {
return nil, err
}
etcdclient, err := etcd.NewEtcdClient("127.0.0.1", dialer.DialContextWithAddr, c.tlsConfig)
etcdclient, err := etcd.NewEtcdClient(endpoints, dialer.DialContextWithAddr, c.tlsConfig)
if err != nil {
return nil, err
}
Expand All @@ -61,11 +64,11 @@ func (c *etcdClientGenerator) forNode(ctx context.Context, name string) (*etcd.C
}

// forLeader takes a list of nodes and returns a client to the leader node
func (c *etcdClientGenerator) forLeader(ctx context.Context, nodes *corev1.NodeList) (*etcd.Client, error) {
func (c *etcdClientGenerator) forLeader(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error) {
var errs []error

for _, node := range nodes.Items {
client, err := c.forNode(ctx, node.Name)
for _, node := range nodes {
client, err := c.forNodes(ctx, []corev1.Node{node})
if err != nil {
errs = append(errs, err)
continue
Expand All @@ -78,7 +81,7 @@ func (c *etcdClientGenerator) forLeader(ctx context.Context, nodes *corev1.NodeL
}
for _, member := range members {
if member.ID == client.LeaderID {
return c.forNode(ctx, member.Name)
return c.forNodes(ctx, []corev1.Node{node})
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion controlplane/kubeadm/internal/proxy/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (d *Dialer) DialContext(_ context.Context, network string, addr string) (ne
Post().
Resource(d.proxy.Kind).
Namespace(d.proxy.Namespace).
Name(d.proxy.ResourceName).
Name(addr).
SubResource("portforward")

dialer := spdy.NewDialer(d.upgrader, &http.Client{Transport: d.proxyTransport}, "POST", req.URL())
Expand Down
14 changes: 6 additions & 8 deletions controlplane/kubeadm/internal/workload_cluster_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
)

type etcdClientFor interface {
forNode(ctx context.Context, name string) (*etcd.Client, error)
forLeader(ctx context.Context, nodes *corev1.NodeList) (*etcd.Client, error)
forNodes(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error)
forLeader(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error)
}

// EtcdIsHealthy runs checks for every etcd member in the cluster to satisfy our definition of healthy.
Expand Down Expand Up @@ -78,7 +78,7 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
expectedMembers++

// Create the etcd Client for the etcd Pod scheduled on the Node
etcdClient, err := w.etcdClientGenerator.forNode(ctx, name)
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, []corev1.Node{node})
if err != nil {
response[name] = errors.Wrap(err, "failed to create etcd client")
continue
Expand Down Expand Up @@ -142,10 +142,8 @@ func (w *Workload) ReconcileEtcdMembers(ctx context.Context) error {

errs := []error{}
for _, node := range controlPlaneNodes.Items {
name := node.Name

// Create the etcd Client for the etcd Pod scheduled on the Node
etcdClient, err := w.etcdClientGenerator.forNode(ctx, name)
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, []corev1.Node{node})
if err != nil {
continue
}
Expand Down Expand Up @@ -217,7 +215,7 @@ func (w *Workload) removeMemberForNode(ctx context.Context, name string) error {
if len(controlPlaneNodes.Items) < 2 {
return ErrControlPlaneMinNodes
}
etcdClient, err := w.etcdClientGenerator.forLeader(ctx, controlPlaneNodes)
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, controlPlaneNodes.Items)
if err != nil {
return errors.Wrap(err, "failed to create etcd client")
}
Expand Down Expand Up @@ -258,7 +256,7 @@ func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1
return errors.Wrap(err, "failed to list control plane nodes")
}

etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodes)
etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodes.Items)
if err != nil {
return errors.Wrap(err, "failed to create etcd client")
}
Expand Down
22 changes: 11 additions & 11 deletions controlplane/kubeadm/internal/workload_cluster_etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestWorkload_EtcdIsHealthy(t *testing.T) {
},
},
etcdClientGenerator: &fakeEtcdClientGenerator{
forNodeClient: &etcd.Client{
forNodesClient: &etcd.Client{
EtcdClient: &fake2.FakeEtcdClient{
EtcdEndpoints: []string{},
MemberListResponse: &clientv3.MemberListResponse{
Expand Down Expand Up @@ -228,15 +228,15 @@ func TestRemoveEtcdMemberForMachine(t *testing.T) {
name: "returns an error if it fails to create the etcd client",
machine: machine,
objs: []runtime.Object{cp1, cp2},
etcdClientGenerator: &fakeEtcdClientGenerator{forLeaderErr: errors.New("no client")},
etcdClientGenerator: &fakeEtcdClientGenerator{forNodesErr: errors.New("no client")},
expectErr: true,
},
{
name: "returns an error if the client errors getting etcd members",
machine: machine,
objs: []runtime.Object{cp1, cp2},
etcdClientGenerator: &fakeEtcdClientGenerator{
forLeaderClient: &etcd.Client{
forNodesClient: &etcd.Client{
EtcdClient: &fake2.FakeEtcdClient{
ErrorResponse: errors.New("cannot get etcd members"),
},
Expand All @@ -249,7 +249,7 @@ func TestRemoveEtcdMemberForMachine(t *testing.T) {
machine: machine,
objs: []runtime.Object{cp1, cp2},
etcdClientGenerator: &fakeEtcdClientGenerator{
forLeaderClient: &etcd.Client{
forNodesClient: &etcd.Client{
EtcdClient: &fake2.FakeEtcdClient{
ErrorResponse: errors.New("cannot remove etcd member"),
MemberListResponse: &clientv3.MemberListResponse{
Expand All @@ -272,7 +272,7 @@ func TestRemoveEtcdMemberForMachine(t *testing.T) {
machine: machine,
objs: []runtime.Object{cp1, cp2},
etcdClientGenerator: &fakeEtcdClientGenerator{
forLeaderClient: &etcd.Client{
forNodesClient: &etcd.Client{
EtcdClient: &fake2.FakeEtcdClient{
MemberListResponse: &clientv3.MemberListResponse{
Members: []*pb.Member{
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestForwardEtcdLeadership(t *testing.T) {
leaderCandidate: defaultMachine(),
k8sClient: &fakeClient{},
etcdClientGenerator: &fakeEtcdClientGenerator{
forNodeClient: &etcd.Client{
forLeaderClient: &etcd.Client{
EtcdClient: &fake2.FakeEtcdClient{
ErrorResponse: errors.New("cannot get etcd members"),
},
Expand Down Expand Up @@ -507,17 +507,17 @@ func TestForwardEtcdLeadership(t *testing.T) {
}

type fakeEtcdClientGenerator struct {
forNodeClient *etcd.Client
forNodesClient *etcd.Client
forLeaderClient *etcd.Client
forNodeErr error
forNodesErr error
forLeaderErr error
}

func (c *fakeEtcdClientGenerator) forNode(_ context.Context, _ string) (*etcd.Client, error) {
return c.forNodeClient, c.forNodeErr
func (c *fakeEtcdClientGenerator) forNodes(_ context.Context, _ []corev1.Node) (*etcd.Client, error) {
return c.forNodesClient, c.forNodesErr
}

func (c *fakeEtcdClientGenerator) forLeader(_ context.Context, _ *corev1.NodeList) (*etcd.Client, error) {
func (c *fakeEtcdClientGenerator) forLeader(_ context.Context, _ []corev1.Node) (*etcd.Client, error) {
return c.forLeaderClient, c.forLeaderErr
}

Expand Down

0 comments on commit 4a4261f

Please sign in to comment.