Skip to content

Commit

Permalink
improve etcd management in CAPIM
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziopandini committed Jun 23, 2023
1 parent 6073e52 commit c54236a
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

// defines annotations to be applied to in memory etcd pods in order to track etcd cluster
// info belonging to the etcd member each pod represent.
const (
// EtcdClusterIDAnnotationName defines the name of the annotation applied to in memory etcd
// pods to track the cluster ID of the etcd member each pod represent.
EtcdClusterIDAnnotationName = "etcd.inmemory.infrastructure.cluster.x-k8s.io/cluster-id"

// EtcdMemberIDAnnotationName defines the name of the annotation applied to in memory etcd
// pods to track the member ID of the etcd member each pod represent.
EtcdMemberIDAnnotationName = "etcd.inmemory.infrastructure.cluster.x-k8s.io/member-id"

// EtcdLeaderFromAnnotationName defines the name of the annotation applied to in memory etcd
// pods to track leadership status of the etcd member each pod represent.
// Note: We are tracking the time from an etcd member is leader; if more than one pod has this
// annotation, the last etcd member that became leader is the current leader.
// By using this mechanism leadership can be forwarded to another pod with an atomic operation
// (add/update of the annotation to the pod/etcd member we are forwarding leadership to).
EtcdLeaderFromAnnotationName = "etcd.inmemory.infrastructure.cluster.x-k8s.io/leader-from"
)
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -44,6 +45,7 @@ import (
infrav1 "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/api/v1alpha1"
"sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud"
cloudv1 "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/api/v1alpha1"
cclient "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/runtime/client"
"sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/server"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
Expand Down Expand Up @@ -421,13 +423,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
"component": "etcd",
"tier": "control-plane",
},
Annotations: map[string]string{
// TODO: read this from existing etcd pods, if any, otherwise all the member will get a different ClusterID.
"etcd.inmemory.infrastructure.cluster.x-k8s.io/cluster-id": fmt.Sprintf("%d", rand.Uint32()), //nolint:gosec // weak random number generator is good enough here
"etcd.inmemory.infrastructure.cluster.x-k8s.io/member-id": fmt.Sprintf("%d", rand.Uint32()), //nolint:gosec // weak random number generator is good enough here
// TODO: set this only if there are no other leaders.
"etcd.inmemory.infrastructure.cluster.x-k8s.io/leader-from": time.Now().Format(time.RFC3339),
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Expand All @@ -444,6 +439,42 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
return ctrl.Result{}, errors.Wrapf(err, "failed to get etcd Pod")
}

// Gets info about the current etcd cluster, if any.
info, err := r.inspectEtcd(ctx, cloudClient)
if err != nil {
return ctrl.Result{}, err
}

// If this is the first etcd member in the cluster, assign a cluster ID
if info.clusterID == "" {
for {
info.clusterID = fmt.Sprintf("%d", rand.Uint32()) //nolint:gosec // weak random number generator is good enough here
if info.clusterID != "0" {
break
}
}
}

// Computes a unique memberID.
var memberID string
for {
memberID = fmt.Sprintf("%d", rand.Uint32()) //nolint:gosec // weak random number generator is good enough here
if !info.members.Has(memberID) && memberID != "0" {
break
}
}

// Annotate the pod with the info about the etcd cluster.
etcdPod.Annotations = map[string]string{
cloudv1.EtcdClusterIDAnnotationName: info.clusterID,
cloudv1.EtcdMemberIDAnnotationName: memberID,
}

// If the etcd cluster is being created it doesn't have a leader yet, so set this member as a leader.
if info.leaderID == "" {
etcdPod.Annotations[cloudv1.EtcdLeaderFromAnnotationName] = time.Now().Format(time.RFC3339)
}

// NOTE: for the first control plane machine we might create the etcd pod before the API server pod is running
// but this is not an issue, because it won't be visible to CAPI until the API server start serving requests.
if err := cloudClient.Create(ctx, etcdPod); err != nil && !apierrors.IsAlreadyExists(err) {
Expand Down Expand Up @@ -487,6 +518,58 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
return ctrl.Result{}, nil
}

type etcdInfo struct {
clusterID string
leaderID string
members sets.Set[string]
}

func (r *InMemoryMachineReconciler) inspectEtcd(ctx context.Context, cloudClient cclient.Client) (etcdInfo, error) {
etcdPods := &corev1.PodList{}
if err := cloudClient.List(ctx, etcdPods,
client.InNamespace(metav1.NamespaceSystem),
client.MatchingLabels{
"component": "etcd",
"tier": "control-plane"},
); err != nil {
return etcdInfo{}, errors.Wrap(err, "failed to list etcd members")
}

if len(etcdPods.Items) == 0 {
return etcdInfo{}, nil
}

info := etcdInfo{
members: sets.New[string](),
}
var leaderFrom time.Time
for _, pod := range etcdPods.Items {
if info.clusterID == "" {
info.clusterID = pod.Annotations[cloudv1.EtcdClusterIDAnnotationName]
} else if pod.Annotations[cloudv1.EtcdClusterIDAnnotationName] != info.clusterID {
return etcdInfo{}, errors.New("invalid etcd cluster, members have different cluster ID")
}
memberID := pod.Annotations[cloudv1.EtcdMemberIDAnnotationName]
info.members.Insert(memberID)

if t, err := time.Parse(time.RFC3339, pod.Annotations[cloudv1.EtcdLeaderFromAnnotationName]); err == nil {
if t.After(leaderFrom) {
info.leaderID = memberID
leaderFrom = t
}
}
}

if info.leaderID == "" {
// TODO: consider if and how to automatically recover from this case
// note: this can happen also when reading etcd members in the server, might be it is something we have to take case before deletion...
// for now it should not be an issue because KCP forward etcd leadership before deletion.
return etcdInfo{}, errors.New("invalid etcd cluster, no leader found")
}

return info, nil
}

func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context, cluster *clusterv1.Cluster, machine *clusterv1.Machine, inMemoryMachine *infrav1.InMemoryMachine) (ctrl.Result, error) {
// No-op if the machine is not a control plane machine.
if !util.IsControlPlaneMachine(machine) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,13 @@ func TestReconcileNormalNode(t *testing.T) {
func TestReconcileNormalEtcd(t *testing.T) {
inMemoryMachineWithNodeNotYetProvisioned := &infrav1.InMemoryMachine{
ObjectMeta: metav1.ObjectMeta{
Name: "bar",
Name: "bar0",
},
}

inMemoryMachineWithNodeProvisioned := &infrav1.InMemoryMachine{
inMemoryMachineWithNodeProvisioned1 := &infrav1.InMemoryMachine{
ObjectMeta: metav1.ObjectMeta{
Name: "bar",
Name: "bar1",
},
Spec: infrav1.InMemoryMachineSpec{
Behaviour: &infrav1.InMemoryMachineBehaviour{
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestReconcileNormalEtcd(t *testing.T) {

manager := cmanager.New(scheme)

host := "127.0.0.1"
host := "127.0.0.1" //nolint:goconst
wcmux, err := server.NewWorkloadClustersMux(manager, host, server.CustomPorts{
// NOTE: make sure to use ports different than other tests, so we can run tests in parallel
MinPort: server.DefaultMinPort + 1000,
Expand All @@ -332,15 +332,15 @@ func TestReconcileNormalEtcd(t *testing.T) {
r.CloudManager.AddResourceGroup(klog.KObj(cluster).String())
c := r.CloudManager.GetResourceGroup(klog.KObj(cluster).String()).GetClient()

res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned)
res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned1)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(res.IsZero()).To(BeFalse())
g.Expect(conditions.IsFalse(inMemoryMachineWithNodeProvisioned, infrav1.EtcdProvisionedCondition)).To(BeTrue())
g.Expect(conditions.IsFalse(inMemoryMachineWithNodeProvisioned1, infrav1.EtcdProvisionedCondition)).To(BeTrue())

got := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceSystem,
Name: fmt.Sprintf("etcd-%s", inMemoryMachineWithNodeNotYetProvisioned.Name),
Name: fmt.Sprintf("etcd-%s", inMemoryMachineWithNodeProvisioned1.Name),
},
}
err = c.Get(ctx, client.ObjectKeyFromObject(got), got)
Expand All @@ -350,32 +350,111 @@ func TestReconcileNormalEtcd(t *testing.T) {
g := NewWithT(t)

g.Eventually(func() bool {
res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned)
res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned1)
g.Expect(err).ToNot(HaveOccurred())
if !res.IsZero() {
time.Sleep(res.RequeueAfter / 100 * 90)
}
return res.IsZero()
}, inMemoryMachineWithNodeProvisioned.Spec.Behaviour.Etcd.Provisioning.StartupDuration.Duration*2).Should(BeTrue())
}, inMemoryMachineWithNodeProvisioned1.Spec.Behaviour.Etcd.Provisioning.StartupDuration.Duration*2).Should(BeTrue())

err = c.Get(ctx, client.ObjectKeyFromObject(got), got)
g.Expect(err).ToNot(HaveOccurred())

g.Expect(conditions.IsTrue(inMemoryMachineWithNodeProvisioned, infrav1.EtcdProvisionedCondition)).To(BeTrue())
g.Expect(conditions.Get(inMemoryMachineWithNodeProvisioned, infrav1.EtcdProvisionedCondition).LastTransitionTime.Time).To(BeTemporally(">", conditions.Get(inMemoryMachineWithNodeProvisioned, infrav1.NodeProvisionedCondition).LastTransitionTime.Time, inMemoryMachineWithNodeProvisioned.Spec.Behaviour.Etcd.Provisioning.StartupDuration.Duration))
g.Expect(got.Annotations).To(HaveKey(cloudv1.EtcdClusterIDAnnotationName))
g.Expect(got.Annotations).To(HaveKey(cloudv1.EtcdMemberIDAnnotationName))
g.Expect(got.Annotations).To(HaveKey(cloudv1.EtcdLeaderFromAnnotationName))

g.Expect(conditions.IsTrue(inMemoryMachineWithNodeProvisioned1, infrav1.EtcdProvisionedCondition)).To(BeTrue())
g.Expect(conditions.Get(inMemoryMachineWithNodeProvisioned1, infrav1.EtcdProvisionedCondition).LastTransitionTime.Time).To(BeTemporally(">", conditions.Get(inMemoryMachineWithNodeProvisioned1, infrav1.NodeProvisionedCondition).LastTransitionTime.Time, inMemoryMachineWithNodeProvisioned1.Spec.Behaviour.Etcd.Provisioning.StartupDuration.Duration))
})

t.Run("no-op after it is provisioned", func(t *testing.T) {
g := NewWithT(t)

res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned)
res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned1)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(res.IsZero()).To(BeTrue())
})

err = wcmux.Shutdown(ctx)
g.Expect(err).ToNot(HaveOccurred())
})

t.Run("takes care of the etcd cluster annotations", func(t *testing.T) {
g := NewWithT(t)

inMemoryMachineWithNodeProvisioned1 := inMemoryMachineWithNodeProvisioned1.DeepCopy()
inMemoryMachineWithNodeProvisioned1.Spec = infrav1.InMemoryMachineSpec{}

inMemoryMachineWithNodeProvisioned2 := inMemoryMachineWithNodeProvisioned1.DeepCopy()
inMemoryMachineWithNodeProvisioned2.Name = "bar2"

manager := cmanager.New(scheme)

host := "127.0.0.1"
wcmux, err := server.NewWorkloadClustersMux(manager, host, server.CustomPorts{
// NOTE: make sure to use ports different than other tests, so we can run tests in parallel
MinPort: server.DefaultMinPort + 1200,
MaxPort: server.DefaultMinPort + 1299,
DebugPort: server.DefaultDebugPort + 20,
})
g.Expect(err).ToNot(HaveOccurred())
_, err = wcmux.InitWorkloadClusterListener(klog.KObj(cluster).String())
g.Expect(err).ToNot(HaveOccurred())

r := InMemoryMachineReconciler{
Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(createCASecret(t, cluster, secretutil.EtcdCA)).Build(),
CloudManager: manager,
APIServerMux: wcmux,
}
r.CloudManager.AddResourceGroup(klog.KObj(cluster).String())
c := r.CloudManager.GetResourceGroup(klog.KObj(cluster).String()).GetClient()

// first etcd pod gets annotated with clusterID, memberID, and also set as a leader

res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned1)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(res.IsZero()).To(BeTrue())
g.Expect(conditions.IsTrue(inMemoryMachineWithNodeProvisioned1, infrav1.EtcdProvisionedCondition)).To(BeTrue())

got1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceSystem,
Name: fmt.Sprintf("etcd-%s", inMemoryMachineWithNodeProvisioned1.Name),
},
}

err = c.Get(ctx, client.ObjectKeyFromObject(got1), got1)
g.Expect(err).ToNot(HaveOccurred())

g.Expect(got1.Annotations).To(HaveKey(cloudv1.EtcdClusterIDAnnotationName))
g.Expect(got1.Annotations).To(HaveKey(cloudv1.EtcdMemberIDAnnotationName))
g.Expect(got1.Annotations).To(HaveKey(cloudv1.EtcdLeaderFromAnnotationName))

// second etcd pod gets annotated with the same clusterID, a new memberID (but it is not set as a leader

res, err = r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned2)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(res.IsZero()).To(BeTrue())
g.Expect(conditions.IsTrue(inMemoryMachineWithNodeProvisioned2, infrav1.EtcdProvisionedCondition)).To(BeTrue())

got2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceSystem,
Name: fmt.Sprintf("etcd-%s", inMemoryMachineWithNodeProvisioned2.Name),
},
}

err = c.Get(ctx, client.ObjectKeyFromObject(got2), got2)
g.Expect(err).ToNot(HaveOccurred())

g.Expect(got2.Annotations).To(HaveKey(cloudv1.EtcdClusterIDAnnotationName))
g.Expect(got1.Annotations[cloudv1.EtcdClusterIDAnnotationName]).To(Equal(got2.Annotations[cloudv1.EtcdClusterIDAnnotationName]))
g.Expect(got2.Annotations).To(HaveKey(cloudv1.EtcdMemberIDAnnotationName))
g.Expect(got1.Annotations[cloudv1.EtcdMemberIDAnnotationName]).ToNot(Equal(got2.Annotations[cloudv1.EtcdMemberIDAnnotationName]))
g.Expect(got2.Annotations).ToNot(HaveKey(cloudv1.EtcdLeaderFromAnnotationName))
})
}

func TestReconcileNormalApiServer(t *testing.T) {
Expand Down
25 changes: 20 additions & 5 deletions test/infrastructure/inmemory/internal/server/etcd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

cloudv1 "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/api/v1alpha1"
cclient "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/runtime/client"
cmanager "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/runtime/manager"
)
Expand Down Expand Up @@ -192,19 +193,26 @@ func (b *baseServer) inspectEtcd(ctx context.Context, cloudClient cclient.Client

memberList := &pb.MemberListResponse{}
statusResponse := &pb.StatusResponse{}
var clusterID int
var leaderID int
var leaderFrom time.Time
for _, pod := range etcdPods.Items {
clusterID, err := strconv.Atoi(pod.Annotations["etcd.inmemory.infrastructure.cluster.x-k8s.io/cluster-id"])
if err != nil {
return nil, nil, errors.Wrapf(err, "failed read cluster ID annotation from etcd member with name %s", pod.Name)
if clusterID == 0 {
var err error
clusterID, err = strconv.Atoi(pod.Annotations[cloudv1.EtcdClusterIDAnnotationName])
if err != nil {
return nil, nil, errors.Wrapf(err, "failed read cluster ID annotation from etcd member with name %s", pod.Name)
}
} else if pod.Annotations[cloudv1.EtcdClusterIDAnnotationName] != fmt.Sprintf("%d", clusterID) {
return nil, nil, errors.New("invalid etcd cluster, members have different cluster ID")
}
memberID, err := strconv.Atoi(pod.Annotations["etcd.inmemory.infrastructure.cluster.x-k8s.io/member-id"])

memberID, err := strconv.Atoi(pod.Annotations[cloudv1.EtcdMemberIDAnnotationName])
if err != nil {
return nil, nil, errors.Wrapf(err, "failed read member ID annotation from etcd member with name %s", pod.Name)
}

if t, err := time.Parse(time.RFC3339, pod.Annotations["etcd.inmemory.infrastructure.cluster.x-k8s.io/leader-from"]); err == nil {
if t, err := time.Parse(time.RFC3339, pod.Annotations[cloudv1.EtcdLeaderFromAnnotationName]); err == nil {
if t.After(leaderFrom) {
leaderID = memberID
leaderFrom = t
Expand All @@ -224,6 +232,13 @@ func (b *baseServer) inspectEtcd(ctx context.Context, cloudClient cclient.Client
Name: strings.TrimPrefix(pod.Name, "etcd-"),
})
}

if leaderID == 0 {
// TODO: consider if and how to automatically recover from this case
// note: this can happen also when adding a new etcd members in the handler, might be it is something we have to take case before deletion...
// for now it should not be an issue because KCP forwards etcd leadership before deletion.
return nil, nil, errors.New("invalid etcd cluster, no leader found")
}
statusResponse.Leader = uint64(leaderID)

return memberList, statusResponse, nil
Expand Down
Loading

0 comments on commit c54236a

Please sign in to comment.