diff --git a/test/infrastructure/inmemory/internal/cloud/api/v1alpha1/etcdcluster_annotations.go b/test/infrastructure/inmemory/internal/cloud/api/v1alpha1/etcdcluster_annotations.go new file mode 100644 index 000000000000..862e90bc6f58 --- /dev/null +++ b/test/infrastructure/inmemory/internal/cloud/api/v1alpha1/etcdcluster_annotations.go @@ -0,0 +1,37 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +// defines annotations to be applied to in memory etcd pods in order to track etcd cluster +// info belonging to the etcd member each pod represent. +const ( + // EtcdClusterIDAnnotationName defines the name of the annotation applied to in memory etcd + // pods to track the cluster ID of the etcd member each pod represent. + EtcdClusterIDAnnotationName = "etcd.inmemory.infrastructure.cluster.x-k8s.io/cluster-id" + + // EtcdMemberIDAnnotationName defines the name of the annotation applied to in memory etcd + // pods to track the member ID of the etcd member each pod represent. + EtcdMemberIDAnnotationName = "etcd.inmemory.infrastructure.cluster.x-k8s.io/member-id" + + // EtcdLeaderFromAnnotationName defines the name of the annotation applied to in memory etcd + // pods to track leadership status of the etcd member each pod represent. + // Note: We are tracking the time from an etcd member is leader; if more than one pod has this + // annotation, the last etcd member that became leader is the current leader. + // By using this mechanism leadership can be forwarded to another pod with an atomic operation + // (add/update of the annotation to the pod/etcd member we are forwarding leadership to). + EtcdLeaderFromAnnotationName = "etcd.inmemory.infrastructure.cluster.x-k8s.io/leader-from" +) diff --git a/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go b/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go index 5786c0459bdc..8546ab910fef 100644 --- a/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go +++ b/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go @@ -31,6 +31,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" @@ -44,6 +45,7 @@ import ( infrav1 "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/api/v1alpha1" "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud" cloudv1 "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/api/v1alpha1" + cclient "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/runtime/client" "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/server" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/annotations" @@ -421,13 +423,6 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu "component": "etcd", "tier": "control-plane", }, - Annotations: map[string]string{ - // TODO: read this from existing etcd pods, if any, otherwise all the member will get a different ClusterID. - "etcd.inmemory.infrastructure.cluster.x-k8s.io/cluster-id": fmt.Sprintf("%d", rand.Uint32()), //nolint:gosec // weak random number generator is good enough here - "etcd.inmemory.infrastructure.cluster.x-k8s.io/member-id": fmt.Sprintf("%d", rand.Uint32()), //nolint:gosec // weak random number generator is good enough here - // TODO: set this only if there are no other leaders. - "etcd.inmemory.infrastructure.cluster.x-k8s.io/leader-from": time.Now().Format(time.RFC3339), - }, }, Status: corev1.PodStatus{ Phase: corev1.PodRunning, @@ -444,6 +439,42 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu return ctrl.Result{}, errors.Wrapf(err, "failed to get etcd Pod") } + // Gets info about the current etcd cluster, if any. + info, err := r.inspectEtcd(ctx, cloudClient) + if err != nil { + return ctrl.Result{}, err + } + + // If this is the first etcd member in the cluster, assign a cluster ID + if info.clusterID == "" { + for { + info.clusterID = fmt.Sprintf("%d", rand.Uint32()) //nolint:gosec // weak random number generator is good enough here + if info.clusterID != "0" { + break + } + } + } + + // Computes a unique memberID. + var memberID string + for { + memberID = fmt.Sprintf("%d", rand.Uint32()) //nolint:gosec // weak random number generator is good enough here + if !info.members.Has(memberID) && memberID != "0" { + break + } + } + + // Annotate the pod with the info about the etcd cluster. + etcdPod.Annotations = map[string]string{ + cloudv1.EtcdClusterIDAnnotationName: info.clusterID, + cloudv1.EtcdMemberIDAnnotationName: memberID, + } + + // If the etcd cluster is being created it doesn't have a leader yet, so set this member as a leader. + if info.leaderID == "" { + etcdPod.Annotations[cloudv1.EtcdLeaderFromAnnotationName] = time.Now().Format(time.RFC3339) + } + // NOTE: for the first control plane machine we might create the etcd pod before the API server pod is running // but this is not an issue, because it won't be visible to CAPI until the API server start serving requests. if err := cloudClient.Create(ctx, etcdPod); err != nil && !apierrors.IsAlreadyExists(err) { @@ -487,6 +518,58 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu return ctrl.Result{}, nil } +type etcdInfo struct { + clusterID string + leaderID string + members sets.Set[string] +} + +func (r *InMemoryMachineReconciler) inspectEtcd(ctx context.Context, cloudClient cclient.Client) (etcdInfo, error) { + etcdPods := &corev1.PodList{} + if err := cloudClient.List(ctx, etcdPods, + client.InNamespace(metav1.NamespaceSystem), + client.MatchingLabels{ + "component": "etcd", + "tier": "control-plane"}, + ); err != nil { + return etcdInfo{}, errors.Wrap(err, "failed to list etcd members") + } + + if len(etcdPods.Items) == 0 { + return etcdInfo{}, nil + } + + info := etcdInfo{ + members: sets.New[string](), + } + var leaderFrom time.Time + for _, pod := range etcdPods.Items { + if info.clusterID == "" { + info.clusterID = pod.Annotations[cloudv1.EtcdClusterIDAnnotationName] + } else if pod.Annotations[cloudv1.EtcdClusterIDAnnotationName] != info.clusterID { + return etcdInfo{}, errors.New("invalid etcd cluster, members have different cluster ID") + } + memberID := pod.Annotations[cloudv1.EtcdMemberIDAnnotationName] + info.members.Insert(memberID) + + if t, err := time.Parse(time.RFC3339, pod.Annotations[cloudv1.EtcdLeaderFromAnnotationName]); err == nil { + if t.After(leaderFrom) { + info.leaderID = memberID + leaderFrom = t + } + } + } + + if info.leaderID == "" { + // TODO: consider if and how to automatically recover from this case + // note: this can happen also when reading etcd members in the server, might be it is something we have to take case before deletion... + // for now it should not be an issue because KCP forward etcd leadership before deletion. + return etcdInfo{}, errors.New("invalid etcd cluster, no leader found") + } + + return info, nil +} + func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context, cluster *clusterv1.Cluster, machine *clusterv1.Machine, inMemoryMachine *infrav1.InMemoryMachine) (ctrl.Result, error) { // No-op if the machine is not a control plane machine. if !util.IsControlPlaneMachine(machine) { diff --git a/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller_test.go b/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller_test.go index 65d7c7e57fad..803e2edd4340 100644 --- a/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller_test.go +++ b/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller_test.go @@ -253,13 +253,13 @@ func TestReconcileNormalNode(t *testing.T) { func TestReconcileNormalEtcd(t *testing.T) { inMemoryMachineWithNodeNotYetProvisioned := &infrav1.InMemoryMachine{ ObjectMeta: metav1.ObjectMeta{ - Name: "bar", + Name: "bar0", }, } - inMemoryMachineWithNodeProvisioned := &infrav1.InMemoryMachine{ + inMemoryMachineWithNodeProvisioned1 := &infrav1.InMemoryMachine{ ObjectMeta: metav1.ObjectMeta{ - Name: "bar", + Name: "bar1", }, Spec: infrav1.InMemoryMachineSpec{ Behaviour: &infrav1.InMemoryMachineBehaviour{ @@ -313,7 +313,7 @@ func TestReconcileNormalEtcd(t *testing.T) { manager := cmanager.New(scheme) - host := "127.0.0.1" + host := "127.0.0.1" //nolint:goconst wcmux, err := server.NewWorkloadClustersMux(manager, host, server.CustomPorts{ // NOTE: make sure to use ports different than other tests, so we can run tests in parallel MinPort: server.DefaultMinPort + 1000, @@ -332,15 +332,15 @@ func TestReconcileNormalEtcd(t *testing.T) { r.CloudManager.AddResourceGroup(klog.KObj(cluster).String()) c := r.CloudManager.GetResourceGroup(klog.KObj(cluster).String()).GetClient() - res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned) + res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned1) g.Expect(err).ToNot(HaveOccurred()) g.Expect(res.IsZero()).To(BeFalse()) - g.Expect(conditions.IsFalse(inMemoryMachineWithNodeProvisioned, infrav1.EtcdProvisionedCondition)).To(BeTrue()) + g.Expect(conditions.IsFalse(inMemoryMachineWithNodeProvisioned1, infrav1.EtcdProvisionedCondition)).To(BeTrue()) got := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: metav1.NamespaceSystem, - Name: fmt.Sprintf("etcd-%s", inMemoryMachineWithNodeNotYetProvisioned.Name), + Name: fmt.Sprintf("etcd-%s", inMemoryMachineWithNodeProvisioned1.Name), }, } err = c.Get(ctx, client.ObjectKeyFromObject(got), got) @@ -350,25 +350,29 @@ func TestReconcileNormalEtcd(t *testing.T) { g := NewWithT(t) g.Eventually(func() bool { - res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned) + res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned1) g.Expect(err).ToNot(HaveOccurred()) if !res.IsZero() { time.Sleep(res.RequeueAfter / 100 * 90) } return res.IsZero() - }, inMemoryMachineWithNodeProvisioned.Spec.Behaviour.Etcd.Provisioning.StartupDuration.Duration*2).Should(BeTrue()) + }, inMemoryMachineWithNodeProvisioned1.Spec.Behaviour.Etcd.Provisioning.StartupDuration.Duration*2).Should(BeTrue()) err = c.Get(ctx, client.ObjectKeyFromObject(got), got) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(conditions.IsTrue(inMemoryMachineWithNodeProvisioned, infrav1.EtcdProvisionedCondition)).To(BeTrue()) - g.Expect(conditions.Get(inMemoryMachineWithNodeProvisioned, infrav1.EtcdProvisionedCondition).LastTransitionTime.Time).To(BeTemporally(">", conditions.Get(inMemoryMachineWithNodeProvisioned, infrav1.NodeProvisionedCondition).LastTransitionTime.Time, inMemoryMachineWithNodeProvisioned.Spec.Behaviour.Etcd.Provisioning.StartupDuration.Duration)) + g.Expect(got.Annotations).To(HaveKey(cloudv1.EtcdClusterIDAnnotationName)) + g.Expect(got.Annotations).To(HaveKey(cloudv1.EtcdMemberIDAnnotationName)) + g.Expect(got.Annotations).To(HaveKey(cloudv1.EtcdLeaderFromAnnotationName)) + + g.Expect(conditions.IsTrue(inMemoryMachineWithNodeProvisioned1, infrav1.EtcdProvisionedCondition)).To(BeTrue()) + g.Expect(conditions.Get(inMemoryMachineWithNodeProvisioned1, infrav1.EtcdProvisionedCondition).LastTransitionTime.Time).To(BeTemporally(">", conditions.Get(inMemoryMachineWithNodeProvisioned1, infrav1.NodeProvisionedCondition).LastTransitionTime.Time, inMemoryMachineWithNodeProvisioned1.Spec.Behaviour.Etcd.Provisioning.StartupDuration.Duration)) }) t.Run("no-op after it is provisioned", func(t *testing.T) { g := NewWithT(t) - res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned) + res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned1) g.Expect(err).ToNot(HaveOccurred()) g.Expect(res.IsZero()).To(BeTrue()) }) @@ -376,6 +380,81 @@ func TestReconcileNormalEtcd(t *testing.T) { err = wcmux.Shutdown(ctx) g.Expect(err).ToNot(HaveOccurred()) }) + + t.Run("takes care of the etcd cluster annotations", func(t *testing.T) { + g := NewWithT(t) + + inMemoryMachineWithNodeProvisioned1 := inMemoryMachineWithNodeProvisioned1.DeepCopy() + inMemoryMachineWithNodeProvisioned1.Spec = infrav1.InMemoryMachineSpec{} + + inMemoryMachineWithNodeProvisioned2 := inMemoryMachineWithNodeProvisioned1.DeepCopy() + inMemoryMachineWithNodeProvisioned2.Name = "bar2" + + manager := cmanager.New(scheme) + + host := "127.0.0.1" + wcmux, err := server.NewWorkloadClustersMux(manager, host, server.CustomPorts{ + // NOTE: make sure to use ports different than other tests, so we can run tests in parallel + MinPort: server.DefaultMinPort + 1200, + MaxPort: server.DefaultMinPort + 1299, + DebugPort: server.DefaultDebugPort + 20, + }) + g.Expect(err).ToNot(HaveOccurred()) + _, err = wcmux.InitWorkloadClusterListener(klog.KObj(cluster).String()) + g.Expect(err).ToNot(HaveOccurred()) + + r := InMemoryMachineReconciler{ + Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(createCASecret(t, cluster, secretutil.EtcdCA)).Build(), + CloudManager: manager, + APIServerMux: wcmux, + } + r.CloudManager.AddResourceGroup(klog.KObj(cluster).String()) + c := r.CloudManager.GetResourceGroup(klog.KObj(cluster).String()).GetClient() + + // first etcd pod gets annotated with clusterID, memberID, and also set as a leader + + res, err := r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned1) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(res.IsZero()).To(BeTrue()) + g.Expect(conditions.IsTrue(inMemoryMachineWithNodeProvisioned1, infrav1.EtcdProvisionedCondition)).To(BeTrue()) + + got1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceSystem, + Name: fmt.Sprintf("etcd-%s", inMemoryMachineWithNodeProvisioned1.Name), + }, + } + + err = c.Get(ctx, client.ObjectKeyFromObject(got1), got1) + g.Expect(err).ToNot(HaveOccurred()) + + g.Expect(got1.Annotations).To(HaveKey(cloudv1.EtcdClusterIDAnnotationName)) + g.Expect(got1.Annotations).To(HaveKey(cloudv1.EtcdMemberIDAnnotationName)) + g.Expect(got1.Annotations).To(HaveKey(cloudv1.EtcdLeaderFromAnnotationName)) + + // second etcd pod gets annotated with the same clusterID, a new memberID (but it is not set as a leader + + res, err = r.reconcileNormalETCD(ctx, cluster, cpMachine, inMemoryMachineWithNodeProvisioned2) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(res.IsZero()).To(BeTrue()) + g.Expect(conditions.IsTrue(inMemoryMachineWithNodeProvisioned2, infrav1.EtcdProvisionedCondition)).To(BeTrue()) + + got2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceSystem, + Name: fmt.Sprintf("etcd-%s", inMemoryMachineWithNodeProvisioned2.Name), + }, + } + + err = c.Get(ctx, client.ObjectKeyFromObject(got2), got2) + g.Expect(err).ToNot(HaveOccurred()) + + g.Expect(got2.Annotations).To(HaveKey(cloudv1.EtcdClusterIDAnnotationName)) + g.Expect(got1.Annotations[cloudv1.EtcdClusterIDAnnotationName]).To(Equal(got2.Annotations[cloudv1.EtcdClusterIDAnnotationName])) + g.Expect(got2.Annotations).To(HaveKey(cloudv1.EtcdMemberIDAnnotationName)) + g.Expect(got1.Annotations[cloudv1.EtcdMemberIDAnnotationName]).ToNot(Equal(got2.Annotations[cloudv1.EtcdMemberIDAnnotationName])) + g.Expect(got2.Annotations).ToNot(HaveKey(cloudv1.EtcdLeaderFromAnnotationName)) + }) } func TestReconcileNormalApiServer(t *testing.T) { diff --git a/test/infrastructure/inmemory/internal/server/etcd/handler.go b/test/infrastructure/inmemory/internal/server/etcd/handler.go index 5aaffe28d34d..d697c20210c6 100644 --- a/test/infrastructure/inmemory/internal/server/etcd/handler.go +++ b/test/infrastructure/inmemory/internal/server/etcd/handler.go @@ -33,6 +33,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" + cloudv1 "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/api/v1alpha1" cclient "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/runtime/client" cmanager "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/runtime/manager" ) @@ -192,19 +193,26 @@ func (b *baseServer) inspectEtcd(ctx context.Context, cloudClient cclient.Client memberList := &pb.MemberListResponse{} statusResponse := &pb.StatusResponse{} + var clusterID int var leaderID int var leaderFrom time.Time for _, pod := range etcdPods.Items { - clusterID, err := strconv.Atoi(pod.Annotations["etcd.inmemory.infrastructure.cluster.x-k8s.io/cluster-id"]) - if err != nil { - return nil, nil, errors.Wrapf(err, "failed read cluster ID annotation from etcd member with name %s", pod.Name) + if clusterID == 0 { + var err error + clusterID, err = strconv.Atoi(pod.Annotations[cloudv1.EtcdClusterIDAnnotationName]) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed read cluster ID annotation from etcd member with name %s", pod.Name) + } + } else if pod.Annotations[cloudv1.EtcdClusterIDAnnotationName] != fmt.Sprintf("%d", clusterID) { + return nil, nil, errors.New("invalid etcd cluster, members have different cluster ID") } - memberID, err := strconv.Atoi(pod.Annotations["etcd.inmemory.infrastructure.cluster.x-k8s.io/member-id"]) + + memberID, err := strconv.Atoi(pod.Annotations[cloudv1.EtcdMemberIDAnnotationName]) if err != nil { return nil, nil, errors.Wrapf(err, "failed read member ID annotation from etcd member with name %s", pod.Name) } - if t, err := time.Parse(time.RFC3339, pod.Annotations["etcd.inmemory.infrastructure.cluster.x-k8s.io/leader-from"]); err == nil { + if t, err := time.Parse(time.RFC3339, pod.Annotations[cloudv1.EtcdLeaderFromAnnotationName]); err == nil { if t.After(leaderFrom) { leaderID = memberID leaderFrom = t @@ -224,6 +232,13 @@ func (b *baseServer) inspectEtcd(ctx context.Context, cloudClient cclient.Client Name: strings.TrimPrefix(pod.Name, "etcd-"), }) } + + if leaderID == 0 { + // TODO: consider if and how to automatically recover from this case + // note: this can happen also when adding a new etcd members in the handler, might be it is something we have to take case before deletion... + // for now it should not be an issue because KCP forwards etcd leadership before deletion. + return nil, nil, errors.New("invalid etcd cluster, no leader found") + } statusResponse.Leader = uint64(leaderID) return memberList, statusResponse, nil diff --git a/test/infrastructure/inmemory/internal/server/mux_test.go b/test/infrastructure/inmemory/internal/server/mux_test.go index 1d45eda41d32..757bae109152 100644 --- a/test/infrastructure/inmemory/internal/server/mux_test.go +++ b/test/infrastructure/inmemory/internal/server/mux_test.go @@ -25,7 +25,6 @@ import ( "crypto/x509/pkix" "fmt" "math/big" - "math/rand" "testing" "time" @@ -41,6 +40,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + cloudv1 "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/api/v1alpha1" cmanager "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/runtime/manager" "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/server/proxy" "sigs.k8s.io/cluster-api/util/certs" @@ -282,11 +282,9 @@ func TestAPI_PortForward(t *testing.T) { "tier": "control-plane", }, Annotations: map[string]string{ - // TODO: read this from existing etcd pods, if any. - "etcd.inmemory.infrastructure.cluster.x-k8s.io/cluster-id": fmt.Sprintf("%d", rand.Uint32()), //nolint:gosec // weak random number generator is good enough here - "etcd.inmemory.infrastructure.cluster.x-k8s.io/member-id": fmt.Sprintf("%d", rand.Uint32()), //nolint:gosec // weak random number generator is good enough here - // TODO: set this only if there are no other leaders. - "etcd.inmemory.infrastructure.cluster.x-k8s.io/leader-from": time.Now().Format(time.RFC3339), + cloudv1.EtcdClusterIDAnnotationName: "1", + cloudv1.EtcdMemberIDAnnotationName: "2", + cloudv1.EtcdLeaderFromAnnotationName: time.Now().Format(time.RFC3339), }, }, }