Skip to content

Commit

Permalink
Add node watcher to MachinePool controller
Browse files Browse the repository at this point in the history
  • Loading branch information
CecileRobertMichon authored and k8s-infra-cherrypick-robot committed Apr 4, 2023
1 parent 0034d28 commit f995df3
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 1 deletion.
10 changes: 10 additions & 0 deletions api/v1beta1/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,15 @@ func AddDefaultIndexes(ctx context.Context, mgr ctrl.Manager) error {
}
}

if feature.Gates.Enabled(feature.MachinePool) {
if err := ByMachinePoolNode(ctx, mgr); err != nil {
return err
}

if err := ByMachinePoolProviderID(ctx, mgr); err != nil {
return err
}
}

return nil
}
105 changes: 105 additions & 0 deletions api/v1beta1/index/machinepool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package index

import (
"context"
"fmt"

"github.com/pkg/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/cluster-api/controllers/noderefutil"
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
)

const (
// MachinePoolNodeNameField is used by the MachinePool Controller to index MachinePools by Node name, and add a watch on Nodes.
MachinePoolNodeNameField = "status.nodeRefs.name"

// MachinePoolProviderIDField is used to index MachinePools by ProviderID. It's useful to find MachinePools
// in a management cluster from Nodes in a workload cluster.
MachinePoolProviderIDField = "spec.providerIDList"
)

// ByMachinePoolNode adds the machinepool node name index to the
// managers cache.
func ByMachinePoolNode(ctx context.Context, mgr ctrl.Manager) error {
if err := mgr.GetCache().IndexField(ctx, &expv1.MachinePool{},
MachinePoolNodeNameField,
MachinePoolByNodeName,
); err != nil {
return errors.Wrap(err, "error setting index field")
}

return nil
}

// MachinePoolByNodeName contains the logic to index MachinePools by Node name.
func MachinePoolByNodeName(o client.Object) []string {
machinepool, ok := o.(*expv1.MachinePool)
if !ok {
panic(fmt.Sprintf("Expected a MachinePool but got a %T", o))
}

if len(machinepool.Status.NodeRefs) == 0 {
return nil
}

nodeNames := make([]string, 0, len(machinepool.Status.NodeRefs))
for _, ref := range machinepool.Status.NodeRefs {
nodeNames = append(nodeNames, ref.Name)
}
return nodeNames
}

// ByMachinePoolProviderID adds the machinepool providerID index to the
// managers cache.
func ByMachinePoolProviderID(ctx context.Context, mgr ctrl.Manager) error {
if err := mgr.GetCache().IndexField(ctx, &expv1.MachinePool{},
MachinePoolProviderIDField,
machinePoolByProviderID,
); err != nil {
return errors.Wrap(err, "error setting index field")
}

return nil
}

func machinePoolByProviderID(o client.Object) []string {
machinepool, ok := o.(*expv1.MachinePool)
if !ok {
panic(fmt.Sprintf("Expected a MachinePool but got a %T", o))
}

if len(machinepool.Spec.ProviderIDList) == 0 {
return nil
}

providerIDs := make([]string, 0, len(machinepool.Spec.ProviderIDList))
for _, id := range machinepool.Spec.ProviderIDList {
providerID, err := noderefutil.NewProviderID(id)
if err != nil {
// Failed to create providerID, skipping.
continue
}
providerIDs = append(providerIDs, providerID.IndexKey())
}

return providerIDs
}
112 changes: 112 additions & 0 deletions api/v1beta1/index/machinepool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package index

import (
"testing"

. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/cluster-api/controllers/noderefutil"
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
)

func TestIndexMachinePoolByNodeName(t *testing.T) {
testCases := []struct {
name string
object client.Object
expected []string
}{
{
name: "when the machinepool has no NodeRef",
object: &expv1.MachinePool{},
expected: []string{},
},
{
name: "when the machinepool has valid NodeRefs",
object: &expv1.MachinePool{
Status: expv1.MachinePoolStatus{
NodeRefs: []corev1.ObjectReference{
{
Name: "node1",
},
{
Name: "node2",
},
},
},
},
expected: []string{"node1", "node2"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
g := NewWithT(t)
got := MachinePoolByNodeName(tc.object)
g.Expect(got).To(ConsistOf(tc.expected))
})
}
}

func TestIndexMachinePoolByProviderID(t *testing.T) {
g := NewWithT(t)
validProviderID, err := noderefutil.NewProviderID("aws://region/zone/1")
g.Expect(err).ToNot(HaveOccurred())
otherValidProviderID, err := noderefutil.NewProviderID("aws://region/zone/2")
g.Expect(err).ToNot(HaveOccurred())

testCases := []struct {
name string
object client.Object
expected []string
}{
{
name: "MachinePool has no providerID",
object: &expv1.MachinePool{},
expected: nil,
},
{
name: "MachinePool has invalid providerID",
object: &expv1.MachinePool{
Spec: expv1.MachinePoolSpec{
ProviderIDList: []string{"invalid"},
},
},
expected: []string{},
},
{
name: "MachinePool has valid providerIDs",
object: &expv1.MachinePool{
Spec: expv1.MachinePoolSpec{
ProviderIDList: []string{validProviderID.String(), otherValidProviderID.String()},
},
},
expected: []string{validProviderID.IndexKey(), otherValidProviderID.IndexKey()},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
g := NewWithT(t)
got := machinePoolByProviderID(tc.object)
g.Expect(got).To(BeEquivalentTo(tc.expected))
})
}
}
3 changes: 3 additions & 0 deletions exp/controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"

"sigs.k8s.io/cluster-api/controllers/remote"
machinepool "sigs.k8s.io/cluster-api/exp/internal/controllers"
)

// MachinePoolReconciler reconciles a MachinePool object.
type MachinePoolReconciler struct {
Client client.Client
APIReader client.Reader
Tracker *remote.ClusterCacheTracker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand All @@ -39,6 +41,7 @@ func (r *MachinePoolReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M
return (&machinepool.MachinePoolReconciler{
Client: r.Client,
APIReader: r.APIReader,
Tracker: r.Tracker,
WatchFilterValue: r.WatchFilterValue,
}).SetupWithManager(ctx, mgr, options)
}
82 changes: 82 additions & 0 deletions exp/internal/controllers/machinepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"fmt"
"sync"

"github.com/pkg/errors"
Expand All @@ -34,10 +35,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/api/v1beta1/index"
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/controllers/noderefutil"
"sigs.k8s.io/cluster-api/controllers/remote"
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
Expand All @@ -62,6 +66,7 @@ const (
type MachinePoolReconciler struct {
Client client.Client
APIReader client.Reader
Tracker *remote.ClusterCacheTracker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand Down Expand Up @@ -289,3 +294,80 @@ func (r *MachinePoolReconciler) reconcileDeleteExternal(ctx context.Context, m *
// Return true if there are no more external objects.
return len(objects) == 0, nil
}

func (r *MachinePoolReconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.Cluster) error {
log := ctrl.LoggerFrom(ctx)

if !conditions.IsTrue(cluster, clusterv1.ControlPlaneInitializedCondition) {
log.V(5).Info("Skipping node watching setup because control plane is not initialized")
return nil
}

// If there is no tracker, don't watch remote nodes
if r.Tracker == nil {
return nil
}

return r.Tracker.Watch(ctx, remote.WatchInput{
Name: "machinepool-watchNodes",
Cluster: util.ObjectKey(cluster),
Watcher: r.controller,
Kind: &corev1.Node{},
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachinePool),
})
}

func (r *MachinePoolReconciler) nodeToMachinePool(o client.Object) []reconcile.Request {
node, ok := o.(*corev1.Node)
if !ok {
panic(fmt.Sprintf("Expected a Node but got a %T", o))
}

var filters []client.ListOption
// Match by clusterName when the node has the annotation.
if clusterName, ok := node.GetAnnotations()[clusterv1.ClusterNameAnnotation]; ok {
filters = append(filters, client.MatchingLabels{
clusterv1.ClusterNameLabel: clusterName,
})
}

// Match by namespace when the node has the annotation.
if namespace, ok := node.GetAnnotations()[clusterv1.ClusterNamespaceAnnotation]; ok {
filters = append(filters, client.InNamespace(namespace))
}

// Match by nodeName and status.nodeRef.name.
machinePoolList := &expv1.MachinePoolList{}
if err := r.Client.List(
context.TODO(),
machinePoolList,
append(filters, client.MatchingFields{index.MachinePoolNodeNameField: node.Name})...); err != nil {
return nil
}

// There should be exactly 1 MachinePool for the node.
if len(machinePoolList.Items) == 1 {
return []reconcile.Request{{NamespacedName: util.ObjectKey(&machinePoolList.Items[0])}}
}

// Otherwise let's match by providerID. This is useful when e.g the NodeRef has not been set yet.
// Match by providerID
nodeProviderID, err := noderefutil.NewProviderID(node.Spec.ProviderID)
if err != nil {
return nil
}
machinePoolList = &expv1.MachinePoolList{}
if err := r.Client.List(
context.TODO(),
machinePoolList,
append(filters, client.MatchingFields{index.MachinePoolProviderIDField: nodeProviderID.IndexKey()})...); err != nil {
return nil
}

// There should be exactly 1 MachinePool for the node.
if len(machinePoolList.Items) == 1 {
return []reconcile.Request{{NamespacedName: util.ObjectKey(&machinePoolList.Items[0])}}
}

return nil
}
9 changes: 8 additions & 1 deletion exp/internal/controllers/machinepool_controller_noderef.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ type getNodeReferencesResult struct {

func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, cluster *clusterv1.Cluster, mp *expv1.MachinePool) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// Create a watch on the nodes in the Cluster.
if err := r.watchClusterNodes(ctx, cluster); err != nil {
return ctrl.Result{}, err
}

// Check that the MachinePool hasn't been deleted or in the process.
if !mp.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
Expand Down Expand Up @@ -81,7 +87,8 @@ func (r *MachinePoolReconciler) reconcileNodeRefs(ctx context.Context, cluster *
if err != nil {
if err == errNoAvailableNodes {
log.Info("Cannot assign NodeRefs to MachinePool, no matching Nodes")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
// No need to requeue here. Nodes emit an event that triggers reconciliation.
return ctrl.Result{}, nil
}
r.recorder.Event(mp, corev1.EventTypeWarning, "FailedSetNodeRef", err.Error())
return ctrl.Result{}, errors.Wrapf(err, "failed to get node references")
Expand Down
Loading

0 comments on commit f995df3

Please sign in to comment.