Skip to content

Commit

Permalink
add pause handling for AzureCluster controller
Browse files Browse the repository at this point in the history
  • Loading branch information
nojnhuh committed Jul 18, 2023
1 parent 87345f7 commit 339603a
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 21 deletions.
5 changes: 5 additions & 0 deletions azure/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type Reconciler interface {
Delete(ctx context.Context) error
}

// Pauser may be implemented for a ServiceReconciler that requires additional work to stop reconciliation.
type Pauser interface {
Pause(context.Context) error
}

// ServiceReconciler is an Azure service reconciler which can reconcile an Azure service.
type ServiceReconciler interface {
Name() string
Expand Down
37 changes: 37 additions & 0 deletions azure/mock_azure/azure_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 39 additions & 21 deletions controllers/azurecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,19 @@ func (acr *AzureClusterReconciler) SetupWithManager(ctx context.Context, mgr ctr
c, err := ctrl.NewControllerManagedBy(mgr).
WithOptions(options.Options).
For(&infrav1.AzureCluster{}).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(log, acr.WatchFilterValue)).
WithEventFilter(predicates.ResourceHasFilterLabel(log, acr.WatchFilterValue)).
WithEventFilter(predicates.ResourceIsNotExternallyManaged(log)).
Build(r)
if err != nil {
return errors.Wrap(err, "error creating controller")
}

// Add a watch on clusterv1.Cluster object for unpause notifications.
// Add a watch on clusterv1.Cluster object for pause/unpause notifications.
if err = c.Watch(
&source.Kind{Type: &clusterv1.Cluster{}},
handler.EnqueueRequestsFromMapFunc(util.ClusterToInfrastructureMapFunc(ctx, infrav1.GroupVersion.WithKind("AzureCluster"), mgr.GetClient(), &infrav1.AzureCluster{})),
predicates.ClusterUnpaused(log),
predicates.ResourceNotPausedAndHasFilterLabel(log, acr.WatchFilterValue),
ClusterUpdatePauseChange(log),
predicates.ResourceHasFilterLabel(log, acr.WatchFilterValue),
); err != nil {
return errors.Wrap(err, "failed adding a watch for ready clusters")
}
Expand Down Expand Up @@ -151,23 +151,6 @@ func (acr *AzureClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque

log = log.WithValues("cluster", cluster.Name)

// Return early if the object or Cluster is paused.
if annotations.IsPaused(cluster, azureCluster) {
acr.Recorder.Eventf(azureCluster, corev1.EventTypeNormal, "ClusterPaused", "AzureCluster or linked Cluster is marked as paused. Won't reconcile")
log.Info("AzureCluster or linked Cluster is marked as paused. Won't reconcile")
return ctrl.Result{}, nil
}

if azureCluster.Spec.IdentityRef != nil {
err := EnsureClusterIdentity(ctx, acr.Client, azureCluster, azureCluster.Spec.IdentityRef, infrav1.ClusterFinalizer)
if err != nil {
return reconcile.Result{}, err
}
} else {
log.Info(fmt.Sprintf("WARNING, %s", deprecatedManagerCredsWarning))
acr.Recorder.Eventf(azureCluster, corev1.EventTypeWarning, "AzureClusterIdentity", deprecatedManagerCredsWarning)
}

// Create the scope.
clusterScope, err := scope.NewClusterScope(ctx, scope.ClusterScopeParams{
Client: acr.Client,
Expand All @@ -187,6 +170,23 @@ func (acr *AzureClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}
}()

// Return early if the object or Cluster is paused.
if annotations.IsPaused(cluster, azureCluster) {
acr.Recorder.Eventf(azureCluster, corev1.EventTypeNormal, "ClusterPaused", "AzureCluster or linked Cluster is marked as paused. Won't reconcile normally")
log.Info("AzureCluster or linked Cluster is marked as paused. Won't reconcile normally")
return acr.reconcilePause(ctx, clusterScope)
}

if azureCluster.Spec.IdentityRef != nil {
err := EnsureClusterIdentity(ctx, acr.Client, azureCluster, azureCluster.Spec.IdentityRef, infrav1.ClusterFinalizer)
if err != nil {
return reconcile.Result{}, err
}
} else {
log.Info(fmt.Sprintf("WARNING, %s", deprecatedManagerCredsWarning))
acr.Recorder.Eventf(azureCluster, corev1.EventTypeWarning, "AzureClusterIdentity", deprecatedManagerCredsWarning)
}

// Handle deleted clusters
if !azureCluster.DeletionTimestamp.IsZero() {
return acr.reconcileDelete(ctx, clusterScope)
Expand Down Expand Up @@ -257,6 +257,24 @@ func (acr *AzureClusterReconciler) reconcileNormal(ctx context.Context, clusterS
return reconcile.Result{}, nil
}

func (acr *AzureClusterReconciler) reconcilePause(ctx context.Context, clusterScope *scope.ClusterScope) (reconcile.Result, error) {
ctx, log, done := tele.StartSpanWithLogger(ctx, "controllers.AzureClusterReconciler.reconcilePause")
defer done()

log.Info("Reconciling AzureCluster pause")

acs, err := acr.createAzureClusterService(clusterScope)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to create a new azureClusterService")
}

if err := acs.Pause(ctx); err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to pause cluster services")
}

return reconcile.Result{}, nil
}

func (acr *AzureClusterReconciler) reconcileDelete(ctx context.Context, clusterScope *scope.ClusterScope) (reconcile.Result, error) {
ctx, log, done := tele.StartSpanWithLogger(ctx, "controllers.AzureClusterReconciler.reconcileDelete")
defer done()
Expand Down
65 changes: 65 additions & 0 deletions controllers/azurecluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@ package controllers

import (
"context"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-azure/internal/test"
"sigs.k8s.io/cluster-api-provider-azure/util/reconciler"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

var _ = Describe("AzureClusterReconciler", func() {
Expand All @@ -51,3 +56,63 @@ var _ = Describe("AzureClusterReconciler", func() {
})
})
})

func TestAzureClusterReconcilePaused(t *testing.T) {
g := NewWithT(t)

ctx := context.Background()

sb := runtime.NewSchemeBuilder(
clusterv1.AddToScheme,
infrav1.AddToScheme,
)
s := runtime.NewScheme()
g.Expect(sb.AddToScheme(s)).To(Succeed())
c := fake.NewClientBuilder().
WithScheme(s).
Build()

recorder := record.NewFakeRecorder(1)

reconciler := NewAzureClusterReconciler(c, recorder, reconciler.DefaultLoopTimeout, "")
name := test.RandomName("paused", 10)

cluster := &clusterv1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
},
Spec: clusterv1.ClusterSpec{
Paused: true,
},
}
g.Expect(c.Create(ctx, cluster)).To(Succeed())

instance := &infrav1.AzureCluster{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
OwnerReferences: []metav1.OwnerReference{
{
Kind: "Cluster",
APIVersion: clusterv1.GroupVersion.String(),
Name: cluster.Name,
UID: cluster.UID,
},
},
},
}
g.Expect(c.Create(ctx, instance)).To(Succeed())

result, err := reconciler.Reconcile(context.Background(), ctrl.Request{
NamespacedName: client.ObjectKey{
Namespace: instance.Namespace,
Name: instance.Name,
},
})

g.Expect(err).To(BeNil())
g.Expect(result.RequeueAfter).To(BeZero())

g.Eventually(recorder.Events).Should(Receive(Equal("Normal ClusterPaused AzureCluster or linked Cluster is marked as paused. Won't reconcile normally")))
}
18 changes: 18 additions & 0 deletions controllers/azurecluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,24 @@ func (s *azureClusterService) Reconcile(ctx context.Context) error {
return nil
}

// Pause pauses all components making up the cluster.
func (s *azureClusterService) Pause(ctx context.Context) error {
ctx, _, done := tele.StartSpanWithLogger(ctx, "controllers.azureClusterService.Pause")
defer done()

for _, service := range s.services {
pauser, ok := service.(azure.Pauser)
if !ok {
continue
}
if err := pauser.Pause(ctx); err != nil {
return errors.Wrapf(err, "failed to pause AzureCluster service %s", service.Name())
}
}

return nil
}

// Delete reconciles all the services in a predetermined order.
func (s *azureClusterService) Delete(ctx context.Context) error {
ctx, _, done := tele.StartSpanWithLogger(ctx, "controllers.azureClusterService.Delete")
Expand Down
70 changes: 70 additions & 0 deletions controllers/azurecluster_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,76 @@ func TestAzureClusterServiceReconcile(t *testing.T) {
}
}

func TestAzureClusterServicePause(t *testing.T) {
type pausingServiceReconciler struct {
*mock_azure.MockServiceReconciler
*mock_azure.MockPauser
}

cases := map[string]struct {
expectedError string
expect func(one pausingServiceReconciler, two pausingServiceReconciler, three pausingServiceReconciler)
}{
"all services are paused in order": {
expectedError: "",
expect: func(one pausingServiceReconciler, two pausingServiceReconciler, three pausingServiceReconciler) {
gomock.InOrder(
one.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil),
two.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil),
three.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil))
},
},
"service pause fails": {
expectedError: "failed to pause AzureCluster service two: some error happened",
expect: func(one pausingServiceReconciler, two pausingServiceReconciler, _ pausingServiceReconciler) {
gomock.InOrder(
one.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil),
two.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(errors.New("some error happened")),
two.MockServiceReconciler.EXPECT().Name().Return("two"))
},
},
}

for name, tc := range cases {
tc := tc
t.Run(name, func(t *testing.T) {
g := NewWithT(t)

t.Parallel()
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

newPausingServiceReconciler := func() pausingServiceReconciler {
return pausingServiceReconciler{
mock_azure.NewMockServiceReconciler(mockCtrl),
mock_azure.NewMockPauser(mockCtrl),
}
}
svcOneMock := newPausingServiceReconciler()
svcTwoMock := newPausingServiceReconciler()
svcThreeMock := newPausingServiceReconciler()

tc.expect(svcOneMock, svcTwoMock, svcThreeMock)

s := &azureClusterService{
services: []azure.ServiceReconciler{
svcOneMock,
svcTwoMock,
svcThreeMock,
},
}

err := s.Pause(context.TODO())
if tc.expectedError != "" {
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError(tc.expectedError))
} else {
g.Expect(err).NotTo(HaveOccurred())
}
})
}
}

func TestAzureClusterServiceDelete(t *testing.T) {
cases := map[string]struct {
expectedError string
Expand Down
33 changes: 33 additions & 0 deletions controllers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-azure/azure"
"sigs.k8s.io/cluster-api-provider-azure/azure/scope"
Expand All @@ -53,7 +54,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

Expand Down Expand Up @@ -1023,3 +1026,33 @@ func MachinePoolToAzureManagedControlPlaneMapFunc(ctx context.Context, c client.
return nil
}
}

// ClusterUpdatePauseChange returns a predicate that returns true for an update event when a cluster's
// Spec.Paused changes between any two distinct values.
func ClusterUpdatePauseChange(logger logr.Logger) predicate.Funcs {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
log := logger.WithValues("predicate", "ClusterUpdatePauseChange", "eventType", "update")

oldCluster, ok := e.ObjectOld.(*clusterv1.Cluster)
if !ok {
log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld))
return false
}
log = log.WithValues("Cluster", klog.KObj(oldCluster))

newCluster := e.ObjectNew.(*clusterv1.Cluster)

if oldCluster.Spec.Paused != newCluster.Spec.Paused {
log.V(4).Info("Cluster paused status changed, allowing further processing")
return true
}

log.V(6).Info("Cluster paused status remained the same, blocking further processing")
return false
},
CreateFunc: func(e event.CreateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
GenericFunc: func(e event.GenericEvent) bool { return false },
}
}

0 comments on commit 339603a

Please sign in to comment.