From d8b5fc14aa91dc47707d3a09854d914dfe555911 Mon Sep 17 00:00:00 2001 From: Cecile Robert-Michon Date: Fri, 31 Mar 2023 22:19:18 +0000 Subject: [PATCH] Add node watcher to MachinePool controller --- api/v1beta1/index/index.go | 10 ++ api/v1beta1/index/machinepool.go | 105 ++++++++++++++++ api/v1beta1/index/machinepool_test.go | 112 ++++++++++++++++++ exp/controllers/alias.go | 3 + .../controllers/machinepool_controller.go | 82 +++++++++++++ .../machinepool_controller_noderef.go | 9 +- main.go | 1 + 7 files changed, 321 insertions(+), 1 deletion(-) create mode 100644 api/v1beta1/index/machinepool.go create mode 100644 api/v1beta1/index/machinepool_test.go diff --git a/api/v1beta1/index/index.go b/api/v1beta1/index/index.go index 69f3278324e9..c6a17bf175bc 100644 --- a/api/v1beta1/index/index.go +++ b/api/v1beta1/index/index.go @@ -41,5 +41,15 @@ func AddDefaultIndexes(ctx context.Context, mgr ctrl.Manager) error { } } + if feature.Gates.Enabled(feature.MachinePool) { + if err := ByMachinePoolNode(ctx, mgr); err != nil { + return err + } + + if err := ByMachinePoolProviderID(ctx, mgr); err != nil { + return err + } + } + return nil } diff --git a/api/v1beta1/index/machinepool.go b/api/v1beta1/index/machinepool.go new file mode 100644 index 000000000000..612dafe65cfb --- /dev/null +++ b/api/v1beta1/index/machinepool.go @@ -0,0 +1,105 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package index + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/cluster-api/controllers/noderefutil" + expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1" +) + +const ( + // MachinePoolNodeNameField is used by the MachinePool Controller to index MachinePools by Node name, and add a watch on Nodes. + MachinePoolNodeNameField = "status.nodeRefs.name" + + // MachinePoolProviderIDField is used to index MachinePools by ProviderID. It's useful to find MachinePools + // in a management cluster from Nodes in a workload cluster. + MachinePoolProviderIDField = "spec.providerIDList" +) + +// ByMachinePoolNode adds the machinepool node name index to the +// managers cache. +func ByMachinePoolNode(ctx context.Context, mgr ctrl.Manager) error { + if err := mgr.GetCache().IndexField(ctx, &expv1.MachinePool{}, + MachinePoolNodeNameField, + MachinePoolByNodeName, + ); err != nil { + return errors.Wrap(err, "error setting index field") + } + + return nil +} + +// MachinePoolByNodeName contains the logic to index MachinePools by Node name. +func MachinePoolByNodeName(o client.Object) []string { + machinepool, ok := o.(*expv1.MachinePool) + if !ok { + panic(fmt.Sprintf("Expected a MachinePool but got a %T", o)) + } + + if len(machinepool.Status.NodeRefs) == 0 { + return nil + } + + nodeNames := make([]string, 0, len(machinepool.Status.NodeRefs)) + for _, ref := range machinepool.Status.NodeRefs { + nodeNames = append(nodeNames, ref.Name) + } + return nodeNames +} + +// ByMachinePoolProviderID adds the machinepool providerID index to the +// managers cache. +func ByMachinePoolProviderID(ctx context.Context, mgr ctrl.Manager) error { + if err := mgr.GetCache().IndexField(ctx, &expv1.MachinePool{}, + MachinePoolProviderIDField, + machinePoolByProviderID, + ); err != nil { + return errors.Wrap(err, "error setting index field") + } + + return nil +} + +func machinePoolByProviderID(o client.Object) []string { + machinepool, ok := o.(*expv1.MachinePool) + if !ok { + panic(fmt.Sprintf("Expected a MachinePool but got a %T", o)) + } + + if len(machinepool.Spec.ProviderIDList) == 0 { + return nil + } + + providerIDs := make([]string, 0, len(machinepool.Spec.ProviderIDList)) + for _, id := range machinepool.Spec.ProviderIDList { + providerID, err := noderefutil.NewProviderID(id) + if err != nil { + // Failed to create providerID, skipping. + continue + } + providerIDs = append(providerIDs, providerID.IndexKey()) + } + + return providerIDs +} diff --git a/api/v1beta1/index/machinepool_test.go b/api/v1beta1/index/machinepool_test.go new file mode 100644 index 000000000000..eaaab680db31 --- /dev/null +++ b/api/v1beta1/index/machinepool_test.go @@ -0,0 +1,112 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package index + +import ( + "testing" + + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/cluster-api/controllers/noderefutil" + expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1" +) + +func TestIndexMachinePoolByNodeName(t *testing.T) { + testCases := []struct { + name string + object client.Object + expected []string + }{ + { + name: "when the machinepool has no NodeRef", + object: &expv1.MachinePool{}, + expected: []string{}, + }, + { + name: "when the machinepool has valid NodeRefs", + object: &expv1.MachinePool{ + Status: expv1.MachinePoolStatus{ + NodeRefs: []corev1.ObjectReference{ + { + Name: "node1", + }, + { + Name: "node2", + }, + }, + }, + }, + expected: []string{"node1", "node2"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + g := NewWithT(t) + got := MachinePoolByNodeName(tc.object) + g.Expect(got).To(ConsistOf(tc.expected)) + }) + } +} + +func TestIndexMachinePoolByProviderID(t *testing.T) { + g := NewWithT(t) + validProviderID, err := noderefutil.NewProviderID("aws://region/zone/1") + g.Expect(err).ToNot(HaveOccurred()) + otherValidProviderID, err := noderefutil.NewProviderID("aws://region/zone/2") + g.Expect(err).ToNot(HaveOccurred()) + + testCases := []struct { + name string + object client.Object + expected []string + }{ + { + name: "MachinePool has no providerID", + object: &expv1.MachinePool{}, + expected: nil, + }, + { + name: "MachinePool has invalid providerID", + object: &expv1.MachinePool{ + Spec: expv1.MachinePoolSpec{ + ProviderIDList: []string{"invalid"}, + }, + }, + expected: []string{}, + }, + { + name: "MachinePool has valid providerIDs", + object: &expv1.MachinePool{ + Spec: expv1.MachinePoolSpec{ + ProviderIDList: []string{validProviderID.String(), otherValidProviderID.String()}, + }, + }, + expected: []string{validProviderID.IndexKey(), otherValidProviderID.IndexKey()}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + g := NewWithT(t) + got := machinePoolByProviderID(tc.object) + g.Expect(got).To(BeEquivalentTo(tc.expected)) + }) + } +} diff --git a/exp/controllers/alias.go b/exp/controllers/alias.go index 3cfc860ef87b..0db7dfb5f090 100644 --- a/exp/controllers/alias.go +++ b/exp/controllers/alias.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/cluster-api/controllers/remote" machinepool "sigs.k8s.io/cluster-api/exp/internal/controllers" ) @@ -30,6 +31,7 @@ import ( type MachinePoolReconciler struct { Client client.Client APIReader client.Reader + Tracker *remote.ClusterCacheTracker // WatchFilterValue is the label value used to filter events prior to reconciliation. WatchFilterValue string @@ -39,6 +41,7 @@ func (r *MachinePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M return (&machinepool.MachinePoolReconciler{ Client: r.Client, APIReader: r.APIReader, + Tracker: r.Tracker, WatchFilterValue: r.WatchFilterValue, }).SetupWithManager(ctx, mgr, options) } diff --git a/exp/internal/controllers/machinepool_controller.go b/exp/internal/controllers/machinepool_controller.go index cb7084f77d20..97cd4946a982 100644 --- a/exp/internal/controllers/machinepool_controller.go +++ b/exp/internal/controllers/machinepool_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "fmt" "sync" "github.com/pkg/errors" @@ -34,10 +35,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/api/v1beta1/index" "sigs.k8s.io/cluster-api/controllers/external" + "sigs.k8s.io/cluster-api/controllers/noderefutil" "sigs.k8s.io/cluster-api/controllers/remote" expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1" "sigs.k8s.io/cluster-api/util" @@ -62,6 +66,7 @@ const ( type MachinePoolReconciler struct { Client client.Client APIReader client.Reader + Tracker *remote.ClusterCacheTracker // WatchFilterValue is the label value used to filter events prior to reconciliation. WatchFilterValue string @@ -289,3 +294,80 @@ func (r *MachinePoolReconciler) reconcileDeleteExternal(ctx context.Context, m * // Return true if there are no more external objects. return len(objects) == 0, nil } + +func (r *MachinePoolReconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.Cluster) error { + log := ctrl.LoggerFrom(ctx) + + if !conditions.IsTrue(cluster, clusterv1.ControlPlaneInitializedCondition) { + log.V(5).Info("Skipping node watching setup because control plane is not initialized") + return nil + } + + // If there is no tracker, don't watch remote nodes + if r.Tracker == nil { + return nil + } + + return r.Tracker.Watch(ctx, remote.WatchInput{ + Name: "machinepool-watchNodes", + Cluster: util.ObjectKey(cluster), + Watcher: r.controller, + Kind: &corev1.Node{}, + EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachinePool), + }) +} + +func (r *MachinePoolReconciler) nodeToMachinePool(o client.Object) []reconcile.Request { + node, ok := o.(*corev1.Node) + if !ok { + panic(fmt.Sprintf("Expected a Node but got a %T", o)) + } + + var filters []client.ListOption + // Match by clusterName when the node has the annotation. + if clusterName, ok := node.GetAnnotations()[clusterv1.ClusterNameAnnotation]; ok { + filters = append(filters, client.MatchingLabels{ + clusterv1.ClusterNameLabel: clusterName, + }) + } + + // Match by namespace when the node has the annotation. + if namespace, ok := node.GetAnnotations()[clusterv1.ClusterNamespaceAnnotation]; ok { + filters = append(filters, client.InNamespace(namespace)) + } + + // Match by nodeName and status.nodeRef.name. + machinePoolList := &expv1.MachinePoolList{} + if err := r.Client.List( + context.TODO(), + machinePoolList, + append(filters, client.MatchingFields{index.MachinePoolNodeNameField: node.Name})...); err != nil { + return nil + } + + // There should be exactly 1 MachinePool for the node. + if len(machinePoolList.Items) == 1 { + return []reconcile.Request{{NamespacedName: util.ObjectKey(&machinePoolList.Items[0])}} + } + + // Otherwise let's match by providerID. This is useful when e.g the NodeRef has not been set yet. + // Match by providerID + nodeProviderID, err := noderefutil.NewProviderID(node.Spec.ProviderID) + if err != nil { + return nil + } + machinePoolList = &expv1.MachinePoolList{} + if err := r.Client.List( + context.TODO(), + machinePoolList, + append(filters, client.MatchingFields{index.MachinePoolProviderIDField: nodeProviderID.IndexKey()})...); err != nil { + return nil + } + + // There should be exactly 1 MachinePool for the node. + if len(machinePoolList.Items) == 1 { + return []reconcile.Request{{NamespacedName: util.ObjectKey(&machinePoolList.Items[0])}} + } + + return nil +} diff --git a/exp/internal/controllers/machinepool_controller_noderef.go b/exp/internal/controllers/machinepool_controller_noderef.go index 1d680ed058fe..e1d1e023c82e 100644 --- a/exp/internal/controllers/machinepool_controller_noderef.go +++ b/exp/internal/controllers/machinepool_controller_noderef.go @@ -49,6 +49,12 @@ type getNodeReferencesResult struct { func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, cluster *clusterv1.Cluster, mp *expv1.MachinePool) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) + + // Create a watch on the nodes in the Cluster. + if err := r.watchClusterNodes(ctx, cluster); err != nil { + return ctrl.Result{}, err + } + // Check that the MachinePool hasn't been deleted or in the process. if !mp.DeletionTimestamp.IsZero() { return ctrl.Result{}, nil @@ -80,7 +86,8 @@ func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, cluster * if err != nil { if err == errNoAvailableNodes { log.Info("Cannot assign NodeRefs to MachinePool, no matching Nodes") - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + // No need to requeue here. Nodes emit an event that triggers reconciliation. + return ctrl.Result{}, nil } r.recorder.Event(mp, corev1.EventTypeWarning, "FailedSetNodeRef", err.Error()) return ctrl.Result{}, errors.Wrapf(err, "failed to get node references") diff --git a/main.go b/main.go index 881e0186617c..95c1a1a4fb67 100644 --- a/main.go +++ b/main.go @@ -462,6 +462,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { if err := (&expcontrollers.MachinePoolReconciler{ Client: mgr.GetClient(), APIReader: mgr.GetAPIReader(), + Tracker: tracker, WatchFilterValue: watchFilterValue, }).SetupWithManager(ctx, mgr, concurrency(machinePoolConcurrency)); err != nil { setupLog.Error(err, "unable to create controller", "controller", "MachinePool")