Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pause handling for AzureCluster controller #3735

Merged
merged 1 commit into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions azure/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type Reconciler interface {
Delete(ctx context.Context) error
}

// Pauser may be implemented for a ServiceReconciler that requires additional work to stop reconciliation.
type Pauser interface {
Pause(context.Context) error
}

// ServiceReconciler is an Azure service reconciler which can reconcile an Azure service.
type ServiceReconciler interface {
Name() string
Expand Down
37 changes: 37 additions & 0 deletions azure/mock_azure/azure_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 39 additions & 21 deletions controllers/azurecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,19 @@ func (acr *AzureClusterReconciler) SetupWithManager(ctx context.Context, mgr ctr
c, err := ctrl.NewControllerManagedBy(mgr).
WithOptions(options.Options).
For(&infrav1.AzureCluster{}).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(log, acr.WatchFilterValue)).
WithEventFilter(predicates.ResourceHasFilterLabel(log, acr.WatchFilterValue)).
WithEventFilter(predicates.ResourceIsNotExternallyManaged(log)).
Build(r)
if err != nil {
return errors.Wrap(err, "error creating controller")
}

// Add a watch on clusterv1.Cluster object for unpause notifications.
// Add a watch on clusterv1.Cluster object for pause/unpause notifications.
if err = c.Watch(
&source.Kind{Type: &clusterv1.Cluster{}},
handler.EnqueueRequestsFromMapFunc(util.ClusterToInfrastructureMapFunc(ctx, infrav1.GroupVersion.WithKind("AzureCluster"), mgr.GetClient(), &infrav1.AzureCluster{})),
predicates.ClusterUnpaused(log),
predicates.ResourceNotPausedAndHasFilterLabel(log, acr.WatchFilterValue),
ClusterUpdatePauseChange(log),
predicates.ResourceHasFilterLabel(log, acr.WatchFilterValue),
); err != nil {
return errors.Wrap(err, "failed adding a watch for ready clusters")
}
Expand Down Expand Up @@ -151,23 +151,6 @@ func (acr *AzureClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque

log = log.WithValues("cluster", cluster.Name)

// Return early if the object or Cluster is paused.
if annotations.IsPaused(cluster, azureCluster) {
acr.Recorder.Eventf(azureCluster, corev1.EventTypeNormal, "ClusterPaused", "AzureCluster or linked Cluster is marked as paused. Won't reconcile")
log.Info("AzureCluster or linked Cluster is marked as paused. Won't reconcile")
return ctrl.Result{}, nil
}

if azureCluster.Spec.IdentityRef != nil {
err := EnsureClusterIdentity(ctx, acr.Client, azureCluster, azureCluster.Spec.IdentityRef, infrav1.ClusterFinalizer)
if err != nil {
return reconcile.Result{}, err
}
} else {
log.Info(fmt.Sprintf("WARNING, %s", deprecatedManagerCredsWarning))
acr.Recorder.Eventf(azureCluster, corev1.EventTypeWarning, "AzureClusterIdentity", deprecatedManagerCredsWarning)
}

// Create the scope.
clusterScope, err := scope.NewClusterScope(ctx, scope.ClusterScopeParams{
Client: acr.Client,
Expand All @@ -187,6 +170,23 @@ func (acr *AzureClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}
}()

// Return early if the object or Cluster is paused.
if annotations.IsPaused(cluster, azureCluster) {
acr.Recorder.Eventf(azureCluster, corev1.EventTypeNormal, "ClusterPaused", "AzureCluster or linked Cluster is marked as paused. Won't reconcile normally")
log.Info("AzureCluster or linked Cluster is marked as paused. Won't reconcile normally")
return acr.reconcilePause(ctx, clusterScope)
}

if azureCluster.Spec.IdentityRef != nil {
err := EnsureClusterIdentity(ctx, acr.Client, azureCluster, azureCluster.Spec.IdentityRef, infrav1.ClusterFinalizer)
if err != nil {
return reconcile.Result{}, err
}
} else {
log.Info(fmt.Sprintf("WARNING, %s", deprecatedManagerCredsWarning))
acr.Recorder.Eventf(azureCluster, corev1.EventTypeWarning, "AzureClusterIdentity", deprecatedManagerCredsWarning)
}

// Handle deleted clusters
if !azureCluster.DeletionTimestamp.IsZero() {
return acr.reconcileDelete(ctx, clusterScope)
Expand Down Expand Up @@ -257,6 +257,24 @@ func (acr *AzureClusterReconciler) reconcileNormal(ctx context.Context, clusterS
return reconcile.Result{}, nil
}

func (acr *AzureClusterReconciler) reconcilePause(ctx context.Context, clusterScope *scope.ClusterScope) (reconcile.Result, error) {
ctx, log, done := tele.StartSpanWithLogger(ctx, "controllers.AzureClusterReconciler.reconcilePause")
defer done()

log.Info("Reconciling AzureCluster pause")

acs, err := acr.createAzureClusterService(clusterScope)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to create a new azureClusterService")
}

if err := acs.Pause(ctx); err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to pause cluster services")
}

return reconcile.Result{}, nil
}

func (acr *AzureClusterReconciler) reconcileDelete(ctx context.Context, clusterScope *scope.ClusterScope) (reconcile.Result, error) {
ctx, log, done := tele.StartSpanWithLogger(ctx, "controllers.AzureClusterReconciler.reconcileDelete")
defer done()
Expand Down
70 changes: 70 additions & 0 deletions controllers/azurecluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@ package controllers

import (
"context"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-azure/internal/test"
"sigs.k8s.io/cluster-api-provider-azure/util/reconciler"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

var _ = Describe("AzureClusterReconciler", func() {
Expand All @@ -51,3 +56,68 @@ var _ = Describe("AzureClusterReconciler", func() {
})
})
})

func TestAzureClusterReconcilePaused(t *testing.T) {
Copy link
Contributor Author

@nojnhuh nojnhuh Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally tried to specify this test like the one above with envtest, but the race detector was flagging something in the logging calls whenever two objects were being created, even serially and with a 10s sleep in between. It was easier to rework this test to use a fake client than try to fix the race which seemed unrelated to the rest of these changes. The test itself hasn't lost any fidelity during that rewrite AFAICT.

g := NewWithT(t)

ctx := context.Background()

sb := runtime.NewSchemeBuilder(
clusterv1.AddToScheme,
infrav1.AddToScheme,
)
s := runtime.NewScheme()
g.Expect(sb.AddToScheme(s)).To(Succeed())
c := fake.NewClientBuilder().
WithScheme(s).
Build()

recorder := record.NewFakeRecorder(1)

reconciler := NewAzureClusterReconciler(c, recorder, reconciler.DefaultLoopTimeout, "")
name := test.RandomName("paused", 10)

cluster := &clusterv1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
},
Spec: clusterv1.ClusterSpec{
Paused: true,
},
}
g.Expect(c.Create(ctx, cluster)).To(Succeed())

instance := &infrav1.AzureCluster{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
OwnerReferences: []metav1.OwnerReference{
{
Kind: "Cluster",
APIVersion: clusterv1.GroupVersion.String(),
Name: cluster.Name,
UID: cluster.UID,
},
},
},
Spec: infrav1.AzureClusterSpec{
AzureClusterClassSpec: infrav1.AzureClusterClassSpec{
SubscriptionID: "something",
},
},
}
g.Expect(c.Create(ctx, instance)).To(Succeed())

result, err := reconciler.Reconcile(context.Background(), ctrl.Request{
NamespacedName: client.ObjectKey{
Namespace: instance.Namespace,
Name: instance.Name,
},
})

g.Expect(err).To(BeNil())
g.Expect(result.RequeueAfter).To(BeZero())

g.Eventually(recorder.Events).Should(Receive(Equal("Normal ClusterPaused AzureCluster or linked Cluster is marked as paused. Won't reconcile normally")))
}
18 changes: 18 additions & 0 deletions controllers/azurecluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,24 @@ func (s *azureClusterService) Reconcile(ctx context.Context) error {
return nil
}

// Pause pauses all components making up the cluster.
func (s *azureClusterService) Pause(ctx context.Context) error {
ctx, _, done := tele.StartSpanWithLogger(ctx, "controllers.azureClusterService.Pause")
defer done()

for _, service := range s.services {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the order in which services are paused is significant, at least for ASO.

pauser, ok := service.(azure.Pauser)
if !ok {
continue
}
if err := pauser.Pause(ctx); err != nil {
return errors.Wrapf(err, "failed to pause AzureCluster service %s", service.Name())
}
}

return nil
}

// Delete reconciles all the services in a predetermined order.
func (s *azureClusterService) Delete(ctx context.Context) error {
ctx, _, done := tele.StartSpanWithLogger(ctx, "controllers.azureClusterService.Delete")
Expand Down
70 changes: 70 additions & 0 deletions controllers/azurecluster_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,76 @@ func TestAzureClusterServiceReconcile(t *testing.T) {
}
}

func TestAzureClusterServicePause(t *testing.T) {
type pausingServiceReconciler struct {
*mock_azure.MockServiceReconciler
*mock_azure.MockPauser
}

cases := map[string]struct {
expectedError string
expect func(one pausingServiceReconciler, two pausingServiceReconciler, three pausingServiceReconciler)
}{
"all services are paused in order": {
expectedError: "",
expect: func(one pausingServiceReconciler, two pausingServiceReconciler, three pausingServiceReconciler) {
gomock.InOrder(
one.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil),
two.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil),
three.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil))
},
},
"service pause fails": {
expectedError: "failed to pause AzureCluster service two: some error happened",
expect: func(one pausingServiceReconciler, two pausingServiceReconciler, _ pausingServiceReconciler) {
gomock.InOrder(
one.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil),
two.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(errors.New("some error happened")),
two.MockServiceReconciler.EXPECT().Name().Return("two"))
},
},
}

for name, tc := range cases {
tc := tc
t.Run(name, func(t *testing.T) {
g := NewWithT(t)

t.Parallel()
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

newPausingServiceReconciler := func() pausingServiceReconciler {
return pausingServiceReconciler{
mock_azure.NewMockServiceReconciler(mockCtrl),
mock_azure.NewMockPauser(mockCtrl),
}
}
svcOneMock := newPausingServiceReconciler()
svcTwoMock := newPausingServiceReconciler()
svcThreeMock := newPausingServiceReconciler()

tc.expect(svcOneMock, svcTwoMock, svcThreeMock)

s := &azureClusterService{
services: []azure.ServiceReconciler{
svcOneMock,
svcTwoMock,
svcThreeMock,
},
}

err := s.Pause(context.TODO())
if tc.expectedError != "" {
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError(tc.expectedError))
} else {
g.Expect(err).NotTo(HaveOccurred())
}
})
}
}

func TestAzureClusterServiceDelete(t *testing.T) {
cases := map[string]struct {
expectedError string
Expand Down
33 changes: 33 additions & 0 deletions controllers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-azure/azure"
"sigs.k8s.io/cluster-api-provider-azure/azure/scope"
Expand All @@ -53,7 +54,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

Expand Down Expand Up @@ -1023,3 +1026,33 @@ func MachinePoolToAzureManagedControlPlaneMapFunc(ctx context.Context, c client.
return nil
}
}

// ClusterUpdatePauseChange returns a predicate that returns true for an update event when a cluster's
// Spec.Paused changes between any two distinct values.
func ClusterUpdatePauseChange(logger logr.Logger) predicate.Funcs {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly copy-paste from CAPI's ClusterUpdateUnpaused.

return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
log := logger.WithValues("predicate", "ClusterUpdatePauseChange", "eventType", "update")

oldCluster, ok := e.ObjectOld.(*clusterv1.Cluster)
if !ok {
log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld))
return false
}
log = log.WithValues("Cluster", klog.KObj(oldCluster))

newCluster := e.ObjectNew.(*clusterv1.Cluster)

if oldCluster.Spec.Paused != newCluster.Spec.Paused {
log.V(4).Info("Cluster paused status changed, allowing further processing")
return true
}

log.V(6).Info("Cluster paused status remained the same, blocking further processing")
return false
},
CreateFunc: func(e event.CreateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
GenericFunc: func(e event.GenericEvent) bool { return false },
}
}