diff --git a/test/infrastructure/inmemory/internal/cloud/api/v1alpha1/etcdcluster_annotations.go b/test/infrastructure/inmemory/internal/cloud/api/v1alpha1/etcdcluster_annotations.go index 862e90bc6f58..05c398d90ec3 100644 --- a/test/infrastructure/inmemory/internal/cloud/api/v1alpha1/etcdcluster_annotations.go +++ b/test/infrastructure/inmemory/internal/cloud/api/v1alpha1/etcdcluster_annotations.go @@ -34,4 +34,7 @@ const ( // By using this mechanism leadership can be forwarded to another pod with an atomic operation // (add/update of the annotation to the pod/etcd member we are forwarding leadership to). EtcdLeaderFromAnnotationName = "etcd.inmemory.infrastructure.cluster.x-k8s.io/leader-from" + + // EtcdMemberRemoved is added to etcd pods which have been removed from the etcd cluster. + EtcdMemberRemoved = "etcd.inmemory.infrastructure.cluster.x-k8s.io/member-removed" ) diff --git a/test/infrastructure/inmemory/internal/cloud/runtime/cache/client.go b/test/infrastructure/inmemory/internal/cloud/runtime/cache/client.go index d83047cc151c..ff01f7523095 100644 --- a/test/infrastructure/inmemory/internal/cloud/runtime/cache/client.go +++ b/test/infrastructure/inmemory/internal/cloud/runtime/cache/client.go @@ -21,10 +21,12 @@ import ( "time" jsonpatch "github.com/evanphx/json-patch/v5" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -117,6 +119,15 @@ func (c *cache) List(resourceGroup string, list client.ObjectList, opts ...clien } } + // TODO(killianmuldoon): This only matches the nodeName field for pods. No other fieldSelectors are implemented. This should return an error if another fieldselector is used. + if pod, ok := obj.(*corev1.Pod); ok { + if listOpts.FieldSelector != nil && !listOpts.FieldSelector.Empty() { + if !listOpts.FieldSelector.Matches(fields.Set{"spec.nodeName": pod.Spec.NodeName}) { + continue + } + } + } + obj := obj.DeepCopyObject().(client.Object) switch list.(type) { case *unstructured.UnstructuredList: @@ -272,11 +283,6 @@ func updateTrackerOwnerReferences(tracker *resourceGroupTracker, oldObj, newObj } func (c *cache) Patch(resourceGroup string, obj client.Object, patch client.Patch) error { - obj = obj.DeepCopyObject().(client.Object) - if err := c.Get(resourceGroup, client.ObjectKeyFromObject(obj), obj); err != nil { - return err - } - patchData, err := patch.Data(obj) if err != nil { return apierrors.NewInternalError(err) diff --git a/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go b/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go index 6064f282eb69..d03fa6938cbc 100644 --- a/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go +++ b/test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go @@ -427,6 +427,9 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu "tier": "control-plane", }, }, + Spec: corev1.PodSpec{ + NodeName: inMemoryMachine.Name, + }, Status: corev1.PodStatus{ Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{ @@ -443,7 +446,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu } // Gets info about the current etcd cluster, if any. - info, err := r.inspectEtcd(ctx, cloudClient) + info, err := r.getEtcdInfo(ctx, cloudClient) if err != nil { return ctrl.Result{}, err } @@ -527,7 +530,7 @@ type etcdInfo struct { members sets.Set[string] } -func (r *InMemoryMachineReconciler) inspectEtcd(ctx context.Context, cloudClient cclient.Client) (etcdInfo, error) { +func (r *InMemoryMachineReconciler) getEtcdInfo(ctx context.Context, cloudClient cclient.Client) (etcdInfo, error) { etcdPods := &corev1.PodList{} if err := cloudClient.List(ctx, etcdPods, client.InNamespace(metav1.NamespaceSystem), @@ -547,6 +550,9 @@ func (r *InMemoryMachineReconciler) inspectEtcd(ctx context.Context, cloudClient } var leaderFrom time.Time for _, pod := range etcdPods.Items { + if _, ok := pod.Annotations[cloudv1.EtcdMemberRemoved]; ok { + continue + } if info.clusterID == "" { info.clusterID = pod.Annotations[cloudv1.EtcdClusterIDAnnotationName] } else if pod.Annotations[cloudv1.EtcdClusterIDAnnotationName] != info.clusterID { @@ -626,6 +632,9 @@ func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context "tier": "control-plane", }, }, + Spec: corev1.PodSpec{ + NodeName: inMemoryMachine.Name, + }, Status: corev1.PodStatus{ Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{ @@ -712,6 +721,9 @@ func (r *InMemoryMachineReconciler) reconcileNormalScheduler(ctx context.Context "tier": "control-plane", }, }, + Spec: corev1.PodSpec{ + NodeName: inMemoryMachine.Name, + }, Status: corev1.PodStatus{ Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{ @@ -757,6 +769,9 @@ func (r *InMemoryMachineReconciler) reconcileNormalControllerManager(ctx context "tier": "control-plane", }, }, + Spec: corev1.PodSpec{ + NodeName: inMemoryMachine.Name, + }, Status: corev1.PodStatus{ Phase: corev1.PodRunning, Conditions: []corev1.PodCondition{ @@ -831,9 +846,12 @@ func (r *InMemoryMachineReconciler) reconcileNormalKubeadmObjects(ctx context.Co Name: "kubeadm-config", Namespace: metav1.NamespaceSystem, }, + Data: map[string]string{ + "ClusterConfiguration": "", + }, } if err := cloudClient.Create(ctx, cm); err != nil && !apierrors.IsAlreadyExists(err) { - return ctrl.Result{}, errors.Wrapf(err, "failed to create ubeadm-config ConfigMap") + return ctrl.Result{}, errors.Wrapf(err, "failed to create kubeadm-config ConfigMap") } return ctrl.Result{}, nil @@ -1012,6 +1030,8 @@ func (r *InMemoryMachineReconciler) reconcileDeleteNode(ctx context.Context, clu Name: inMemoryMachine.Name, }, } + + // TODO(killianmuldoon): check if we can drop this given that the MachineController is already draining pods and deleting nodes. if err := cloudClient.Delete(ctx, node); err != nil && !apierrors.IsNotFound(err) { return ctrl.Result{}, errors.Wrapf(err, "failed to delete Node") } diff --git a/test/infrastructure/inmemory/internal/server/api/handler.go b/test/infrastructure/inmemory/internal/server/api/handler.go index 532881551651..bd01e303c060 100644 --- a/test/infrastructure/inmemory/internal/server/api/handler.go +++ b/test/infrastructure/inmemory/internal/server/api/handler.go @@ -30,6 +30,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -252,6 +253,16 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons listOpts = append(listOpts, client.InNamespace(req.PathParameter("namespace"))) } + // TODO: The only field selector which works is for `spec.nodeName` on pods. + selector, err := fields.ParseSelector(req.QueryParameter("fieldSelector")) + if err != nil { + _ = resp.WriteErrorString(http.StatusInternalServerError, err.Error()) + return + } + if selector != nil { + listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: selector}) + } + if err := cloudClient.List(ctx, list, listOpts...); err != nil { _ = resp.WriteErrorString(http.StatusInternalServerError, err.Error()) return @@ -412,6 +423,10 @@ func (h *apiServerHandler) apiV1Patch(req *restful.Request, resp *restful.Respon obj.SetName(req.PathParameter("name")) obj.SetNamespace(req.PathParameter("namespace")) + if err := cloudClient.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil { + _ = resp.WriteErrorString(http.StatusInternalServerError, err.Error()) + return + } if err := cloudClient.Patch(ctx, obj, patch); err != nil { _ = resp.WriteErrorString(http.StatusInternalServerError, err.Error()) return diff --git a/test/infrastructure/inmemory/internal/server/etcd/handler.go b/test/infrastructure/inmemory/internal/server/etcd/handler.go index a798f0ab42e2..cce90ff6a2e1 100644 --- a/test/infrastructure/inmemory/internal/server/etcd/handler.go +++ b/test/infrastructure/inmemory/internal/server/etcd/handler.go @@ -113,8 +113,48 @@ func (m *maintenanceServer) Snapshot(_ *pb.SnapshotRequest, _ pb.Maintenance_Sna return fmt.Errorf("not implemented: Snapshot") } -func (m *maintenanceServer) MoveLeader(_ context.Context, _ *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) { - return nil, fmt.Errorf("not implemented: MoveLeader") +func (m *maintenanceServer) MoveLeader(ctx context.Context, req *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) { + out := new(pb.MoveLeaderResponse) + resourceGroup, _, err := m.getResourceGroupAndMember(ctx) + if err != nil { + return nil, err + } + etcdPods := &corev1.PodList{} + cloudClient := m.manager.GetResourceGroup(resourceGroup).GetClient() + if err := cloudClient.List(ctx, etcdPods, + client.InNamespace(metav1.NamespaceSystem), + client.MatchingLabels{ + "component": "etcd", + "tier": "control-plane"}, + ); err != nil { + return nil, errors.Wrap(err, "failed to list etcd members") + } + + if len(etcdPods.Items) == 0 { + return nil, errors.New("failed to list etcd members: no etcd pods found") + } + + for i := range etcdPods.Items { + pod := &etcdPods.Items[i] + for k, v := range pod.GetAnnotations() { + if k == cloudv1.EtcdMemberIDAnnotationName { + target := strconv.FormatInt(int64(req.TargetID), 10) + if v == target { + updatedPod := pod.DeepCopy() + annotations := updatedPod.GetAnnotations() + annotations[cloudv1.EtcdLeaderFromAnnotationName] = time.Now().Format(time.RFC3339) + updatedPod.SetAnnotations(annotations) + err := cloudClient.Patch(ctx, updatedPod, client.MergeFrom(pod)) + if err != nil { + return nil, err + } + return out, nil + } + } + } + } + // If we reach this point leadership was not moved. + return nil, errors.Errorf("etcd member with ID %d did not become the leader: expected etcd Pod not found", req.TargetID) } func (m *maintenanceServer) Downgrade(_ context.Context, _ *pb.DowngradeRequest) (*pb.DowngradeResponse, error) { @@ -130,8 +170,39 @@ func (c *clusterServerServer) MemberAdd(_ context.Context, _ *pb.MemberAddReques return nil, fmt.Errorf("not implemented: MemberAdd") } -func (c *clusterServerServer) MemberRemove(_ context.Context, _ *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) { - return nil, fmt.Errorf("not implemented: MemberRemove") +func (c *clusterServerServer) MemberRemove(ctx context.Context, req *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) { + out := new(pb.MemberRemoveResponse) + resourceGroup, _, err := c.getResourceGroupAndMember(ctx) + if err != nil { + return nil, err + } + cloudClient := c.manager.GetResourceGroup(resourceGroup).GetClient() + + etcdPods := &corev1.PodList{} + + if err := cloudClient.List(ctx, etcdPods, + client.InNamespace(metav1.NamespaceSystem), + client.MatchingLabels{ + "component": "etcd", + "tier": "control-plane"}, + ); err != nil { + return nil, errors.Wrap(err, "failed to list etcd members") + } + + for i := range etcdPods.Items { + pod := etcdPods.Items[i] + memberID := pod.Annotations[cloudv1.EtcdMemberIDAnnotationName] + if memberID != fmt.Sprintf("%d", req.ID) { + continue + } + updatedPod := pod.DeepCopy() + updatedPod.Annotations[cloudv1.EtcdMemberRemoved] = "" + if err := cloudClient.Patch(ctx, updatedPod, client.MergeFrom(&pod)); err != nil { + return nil, err + } + return out, nil + } + return nil, errors.Errorf("no etcd member with id %d found", req.ID) } func (c *clusterServerServer) MemberUpdate(_ context.Context, _ *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) { @@ -197,6 +268,12 @@ func (b *baseServer) inspectEtcd(ctx context.Context, cloudClient cclient.Client var leaderID int var leaderFrom time.Time for _, pod := range etcdPods.Items { + if _, ok := pod.Annotations[cloudv1.EtcdMemberRemoved]; ok { + if pod.Name == fmt.Sprintf("%s%s", "etcd-", etcdMember) { + return nil, nil, errors.New("inspect called on etcd which has been removed") + } + continue + } if clusterID == 0 { var err error clusterID, err = strconv.Atoi(pod.Annotations[cloudv1.EtcdClusterIDAnnotationName]) @@ -224,7 +301,6 @@ func (b *baseServer) inspectEtcd(ctx context.Context, cloudClient cclient.Client ClusterId: uint64(clusterID), MemberId: uint64(memberID), } - statusResponse.Header = memberList.Header } memberList.Members = append(memberList.Members, &pb.Member{ diff --git a/test/infrastructure/inmemory/internal/server/etcd/handler_test.go b/test/infrastructure/inmemory/internal/server/etcd/handler_test.go new file mode 100644 index 000000000000..15336e52bf7d --- /dev/null +++ b/test/infrastructure/inmemory/internal/server/etcd/handler_test.go @@ -0,0 +1,121 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "context" + "fmt" + "testing" + "time" + + . "github.com/onsi/gomega" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "google.golang.org/grpc/metadata" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/log" + + cloudv1 "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/api/v1alpha1" + "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/internal/cloud/runtime/manager" +) + +func Test_etcd_scalingflow(t *testing.T) { + scheme := runtime.NewScheme() + _ = clientgoscheme.AddToScheme(scheme) + + // During a scale down event - for example during upgrade - KCP will call `MoveLeader` and `MemberRemove` in sequence. + g := NewWithT(t) + ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{":authority": "etcd-1"})) + manager := manager.New(scheme) + resourceGroupResolver := func(host string) (string, error) { return "group1", nil } + c := &clusterServerServer{ + baseServer: &baseServer{ + log: log.FromContext(ctx), + manager: manager, + resourceGroupResolver: resourceGroupResolver, + }, + } + + m := &maintenanceServer{ + baseServer: &baseServer{ + log: log.FromContext(ctx), + manager: manager, + resourceGroupResolver: resourceGroupResolver, + }, + } + c.manager.AddResourceGroup("group1") + cloudClient := c.manager.GetResourceGroup("group1").GetClient() + + for i := 1; i <= 3; i++ { + etcdMember := fmt.Sprintf("etcd-%d", i) + etcdPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceSystem, + Name: etcdMember, + Labels: map[string]string{ + "component": "etcd", + "tier": "control-plane", + }, + Annotations: map[string]string{ + cloudv1.EtcdMemberIDAnnotationName: fmt.Sprintf("%d", i), + cloudv1.EtcdClusterIDAnnotationName: "15", + }, + }, + Spec: corev1.PodSpec{ + NodeName: fmt.Sprintf("etcd-%d", i), + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + // Initially set leader to `etcd-1` + if i == 1 { + etcdPod.Annotations[cloudv1.EtcdLeaderFromAnnotationName] = time.Date(2020, 07, 03, 14, 25, 58, 651387237, time.UTC).Format(time.RFC3339) + } + g.Expect(cloudClient.Create(ctx, etcdPod)).To(Succeed()) + } + var etcdMemberToRemove uint64 = 2 + var etcdMemberToBeLeader uint64 = 3 + + t.Run("move leader and remove etcd member", func(t *testing.T) { + _, err := m.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: etcdMemberToBeLeader}) + g.Expect(err).NotTo(HaveOccurred()) + + _, err = c.MemberRemove(ctx, &pb.MemberRemoveRequest{ID: etcdMemberToRemove}) + g.Expect(err).NotTo(HaveOccurred()) + + // Expect the inspect call to fail on a member which has been removed. + _, _, err = c.inspectEtcd(ctx, cloudClient, fmt.Sprintf("%d", etcdMemberToRemove)) + g.Expect(err).To(HaveOccurred()) + + // inspectEtcd should succeed when calling on a member that has not been removed. + members, status, err := c.inspectEtcd(ctx, cloudClient, fmt.Sprintf("%d", etcdMemberToBeLeader)) + g.Expect(err).ToNot(HaveOccurred()) + + g.Expect(status.Leader).To(Equal(etcdMemberToBeLeader)) + g.Expect(members.GetMembers()).To(HaveLen(2)) + g.Expect(members.GetMembers()).NotTo(ContainElement(fmt.Sprintf("etcd-%d", etcdMemberToRemove))) + }) +} diff --git a/test/infrastructure/inmemory/internal/server/mux_test.go b/test/infrastructure/inmemory/internal/server/mux_test.go index 757bae109152..7072df7a9518 100644 --- a/test/infrastructure/inmemory/internal/server/mux_test.go +++ b/test/infrastructure/inmemory/internal/server/mux_test.go @@ -35,6 +35,7 @@ import ( corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -148,6 +149,23 @@ func TestAPI_corev1_CRUD(t *testing.T) { g.Expect(nl.Items).To(HaveLen(1)) g.Expect(nl.Items[0].Name).To(Equal("foo")) + // list with nodeName selector on pod + g.Expect(c.Create(ctx, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: metav1.NamespaceDefault}, + Spec: corev1.PodSpec{NodeName: n.Name}, + })).To(Succeed()) + g.Expect(c.Create(ctx, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "notSelectedPod", Namespace: metav1.NamespaceDefault}, + })).To(Succeed()) + + pl := &corev1.PodList{} + nodeNameSelector := &client.ListOptions{ + FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": n.Name}), + } + g.Expect(c.List(ctx, pl, nodeNameSelector)).To(Succeed()) + g.Expect(pl.Items).To(HaveLen(1)) + g.Expect(pl.Items[0].Name).To(Equal("bar")) + // get n = &corev1.Node{} @@ -162,18 +180,16 @@ func TestAPI_corev1_CRUD(t *testing.T) { g.Expect(err).ToNot(HaveOccurred()) n3 := n2.DeepCopy() - // TODO: n doesn't have taints, so not sure what we are testing here. - taints := []corev1.Taint{} - for _, taint := range n.Spec.Taints { - if taint.Key == "foo" { - continue - } - taints = append(taints, taint) - } + taints := []corev1.Taint{{Key: "foo"}} + n3.Spec.Taints = taints err = c.Patch(ctx, n3, client.StrategicMergeFrom(n2)) g.Expect(err).ToNot(HaveOccurred()) + node := &corev1.Node{} + g.Expect(c.Get(ctx, client.ObjectKeyFromObject(n3), node)).To(Succeed()) + g.Expect(node.Spec.Taints).To(Equal(taints)) + // delete err = c.Delete(ctx, n)