diff --git a/azure/interfaces.go b/azure/interfaces.go index 525838a97b18..12ce42324450 100644 --- a/azure/interfaces.go +++ b/azure/interfaces.go @@ -31,6 +31,11 @@ type Reconciler interface { Delete(ctx context.Context) error } +// Pauser may be implemented for a ServiceReconciler that requires additional work to stop reconciliation. +type Pauser interface { + Pause(context.Context) error +} + // ServiceReconciler is an Azure service reconciler which can reconcile an Azure service. type ServiceReconciler interface { Name() string diff --git a/azure/mock_azure/azure_mock.go b/azure/mock_azure/azure_mock.go index 51363998d888..ff4979158d68 100644 --- a/azure/mock_azure/azure_mock.go +++ b/azure/mock_azure/azure_mock.go @@ -82,6 +82,43 @@ func (mr *MockReconcilerMockRecorder) Reconcile(ctx interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reconcile", reflect.TypeOf((*MockReconciler)(nil).Reconcile), ctx) } +// MockPauser is a mock of Pauser interface. +type MockPauser struct { + ctrl *gomock.Controller + recorder *MockPauserMockRecorder +} + +// MockPauserMockRecorder is the mock recorder for MockPauser. +type MockPauserMockRecorder struct { + mock *MockPauser +} + +// NewMockPauser creates a new mock instance. +func NewMockPauser(ctrl *gomock.Controller) *MockPauser { + mock := &MockPauser{ctrl: ctrl} + mock.recorder = &MockPauserMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPauser) EXPECT() *MockPauserMockRecorder { + return m.recorder +} + +// Pause mocks base method. +func (m *MockPauser) Pause(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Pause", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Pause indicates an expected call of Pause. +func (mr *MockPauserMockRecorder) Pause(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pause", reflect.TypeOf((*MockPauser)(nil).Pause), arg0) +} + // MockServiceReconciler is a mock of ServiceReconciler interface. type MockServiceReconciler struct { ctrl *gomock.Controller diff --git a/controllers/azurecluster_controller.go b/controllers/azurecluster_controller.go index 876f25f9aa8c..8650954a36bb 100644 --- a/controllers/azurecluster_controller.go +++ b/controllers/azurecluster_controller.go @@ -85,19 +85,19 @@ func (acr *AzureClusterReconciler) SetupWithManager(ctx context.Context, mgr ctr c, err := ctrl.NewControllerManagedBy(mgr). WithOptions(options.Options). For(&infrav1.AzureCluster{}). - WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(log, acr.WatchFilterValue)). + WithEventFilter(predicates.ResourceHasFilterLabel(log, acr.WatchFilterValue)). WithEventFilter(predicates.ResourceIsNotExternallyManaged(log)). Build(r) if err != nil { return errors.Wrap(err, "error creating controller") } - // Add a watch on clusterv1.Cluster object for unpause notifications. + // Add a watch on clusterv1.Cluster object for pause/unpause notifications. if err = c.Watch( &source.Kind{Type: &clusterv1.Cluster{}}, handler.EnqueueRequestsFromMapFunc(util.ClusterToInfrastructureMapFunc(ctx, infrav1.GroupVersion.WithKind("AzureCluster"), mgr.GetClient(), &infrav1.AzureCluster{})), - predicates.ClusterUnpaused(log), - predicates.ResourceNotPausedAndHasFilterLabel(log, acr.WatchFilterValue), + ClusterUpdatePauseChange(log), + predicates.ResourceHasFilterLabel(log, acr.WatchFilterValue), ); err != nil { return errors.Wrap(err, "failed adding a watch for ready clusters") } @@ -151,23 +151,6 @@ func (acr *AzureClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque log = log.WithValues("cluster", cluster.Name) - // Return early if the object or Cluster is paused. - if annotations.IsPaused(cluster, azureCluster) { - acr.Recorder.Eventf(azureCluster, corev1.EventTypeNormal, "ClusterPaused", "AzureCluster or linked Cluster is marked as paused. Won't reconcile") - log.Info("AzureCluster or linked Cluster is marked as paused. Won't reconcile") - return ctrl.Result{}, nil - } - - if azureCluster.Spec.IdentityRef != nil { - err := EnsureClusterIdentity(ctx, acr.Client, azureCluster, azureCluster.Spec.IdentityRef, infrav1.ClusterFinalizer) - if err != nil { - return reconcile.Result{}, err - } - } else { - log.Info(fmt.Sprintf("WARNING, %s", deprecatedManagerCredsWarning)) - acr.Recorder.Eventf(azureCluster, corev1.EventTypeWarning, "AzureClusterIdentity", deprecatedManagerCredsWarning) - } - // Create the scope. clusterScope, err := scope.NewClusterScope(ctx, scope.ClusterScopeParams{ Client: acr.Client, @@ -187,6 +170,23 @@ func (acr *AzureClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque } }() + // Return early if the object or Cluster is paused. + if annotations.IsPaused(cluster, azureCluster) { + acr.Recorder.Eventf(azureCluster, corev1.EventTypeNormal, "ClusterPaused", "AzureCluster or linked Cluster is marked as paused. Won't reconcile normally") + log.Info("AzureCluster or linked Cluster is marked as paused. Won't reconcile normally") + return acr.reconcilePause(ctx, clusterScope) + } + + if azureCluster.Spec.IdentityRef != nil { + err := EnsureClusterIdentity(ctx, acr.Client, azureCluster, azureCluster.Spec.IdentityRef, infrav1.ClusterFinalizer) + if err != nil { + return reconcile.Result{}, err + } + } else { + log.Info(fmt.Sprintf("WARNING, %s", deprecatedManagerCredsWarning)) + acr.Recorder.Eventf(azureCluster, corev1.EventTypeWarning, "AzureClusterIdentity", deprecatedManagerCredsWarning) + } + // Handle deleted clusters if !azureCluster.DeletionTimestamp.IsZero() { return acr.reconcileDelete(ctx, clusterScope) @@ -257,6 +257,24 @@ func (acr *AzureClusterReconciler) reconcileNormal(ctx context.Context, clusterS return reconcile.Result{}, nil } +func (acr *AzureClusterReconciler) reconcilePause(ctx context.Context, clusterScope *scope.ClusterScope) (reconcile.Result, error) { + ctx, log, done := tele.StartSpanWithLogger(ctx, "controllers.AzureClusterReconciler.reconcilePause") + defer done() + + log.Info("Reconciling AzureCluster pause") + + acs, err := acr.createAzureClusterService(clusterScope) + if err != nil { + return reconcile.Result{}, errors.Wrap(err, "failed to create a new azureClusterService") + } + + if err := acs.Pause(ctx); err != nil { + return reconcile.Result{}, errors.Wrap(err, "failed to pause cluster services") + } + + return reconcile.Result{}, nil +} + func (acr *AzureClusterReconciler) reconcileDelete(ctx context.Context, clusterScope *scope.ClusterScope) (reconcile.Result, error) { ctx, log, done := tele.StartSpanWithLogger(ctx, "controllers.AzureClusterReconciler.reconcileDelete") defer done() diff --git a/controllers/azurecluster_controller_test.go b/controllers/azurecluster_controller_test.go index f978ba39489d..a1669db1c77d 100644 --- a/controllers/azurecluster_controller_test.go +++ b/controllers/azurecluster_controller_test.go @@ -18,15 +18,20 @@ package controllers import ( "context" + "testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1" "sigs.k8s.io/cluster-api-provider-azure/internal/test" "sigs.k8s.io/cluster-api-provider-azure/util/reconciler" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" ) var _ = Describe("AzureClusterReconciler", func() { @@ -51,3 +56,63 @@ var _ = Describe("AzureClusterReconciler", func() { }) }) }) + +func TestAzureClusterReconcilePaused(t *testing.T) { + g := NewWithT(t) + + ctx := context.Background() + + sb := runtime.NewSchemeBuilder( + clusterv1.AddToScheme, + infrav1.AddToScheme, + ) + s := runtime.NewScheme() + g.Expect(sb.AddToScheme(s)).To(Succeed()) + c := fake.NewClientBuilder(). + WithScheme(s). + Build() + + recorder := record.NewFakeRecorder(1) + + reconciler := NewAzureClusterReconciler(c, recorder, reconciler.DefaultLoopTimeout, "") + name := test.RandomName("paused", 10) + + cluster := &clusterv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Spec: clusterv1.ClusterSpec{ + Paused: true, + }, + } + g.Expect(c.Create(ctx, cluster)).To(Succeed()) + + instance := &infrav1.AzureCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "Cluster", + APIVersion: clusterv1.GroupVersion.String(), + Name: cluster.Name, + UID: cluster.UID, + }, + }, + }, + } + g.Expect(c.Create(ctx, instance)).To(Succeed()) + + result, err := reconciler.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: client.ObjectKey{ + Namespace: instance.Namespace, + Name: instance.Name, + }, + }) + + g.Expect(err).To(BeNil()) + g.Expect(result.RequeueAfter).To(BeZero()) + + g.Eventually(recorder.Events).Should(Receive(Equal("Normal ClusterPaused AzureCluster or linked Cluster is marked as paused. Won't reconcile normally"))) +} diff --git a/controllers/azurecluster_reconciler.go b/controllers/azurecluster_reconciler.go index c78141730de8..9e1bb070081c 100644 --- a/controllers/azurecluster_reconciler.go +++ b/controllers/azurecluster_reconciler.go @@ -98,6 +98,24 @@ func (s *azureClusterService) Reconcile(ctx context.Context) error { return nil } +// Pause pauses all components making up the cluster. +func (s *azureClusterService) Pause(ctx context.Context) error { + ctx, _, done := tele.StartSpanWithLogger(ctx, "controllers.azureClusterService.Pause") + defer done() + + for _, service := range s.services { + pauser, ok := service.(azure.Pauser) + if !ok { + continue + } + if err := pauser.Pause(ctx); err != nil { + return errors.Wrapf(err, "failed to pause AzureCluster service %s", service.Name()) + } + } + + return nil +} + // Delete reconciles all the services in a predetermined order. func (s *azureClusterService) Delete(ctx context.Context) error { ctx, _, done := tele.StartSpanWithLogger(ctx, "controllers.azureClusterService.Delete") diff --git a/controllers/azurecluster_reconciler_test.go b/controllers/azurecluster_reconciler_test.go index 7cafec1e4ba4..ac51a1e87345 100644 --- a/controllers/azurecluster_reconciler_test.go +++ b/controllers/azurecluster_reconciler_test.go @@ -98,6 +98,76 @@ func TestAzureClusterServiceReconcile(t *testing.T) { } } +func TestAzureClusterServicePause(t *testing.T) { + type pausingServiceReconciler struct { + *mock_azure.MockServiceReconciler + *mock_azure.MockPauser + } + + cases := map[string]struct { + expectedError string + expect func(one pausingServiceReconciler, two pausingServiceReconciler, three pausingServiceReconciler) + }{ + "all services are paused in order": { + expectedError: "", + expect: func(one pausingServiceReconciler, two pausingServiceReconciler, three pausingServiceReconciler) { + gomock.InOrder( + one.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil), + two.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil), + three.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil)) + }, + }, + "service pause fails": { + expectedError: "failed to pause AzureCluster service two: some error happened", + expect: func(one pausingServiceReconciler, two pausingServiceReconciler, _ pausingServiceReconciler) { + gomock.InOrder( + one.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil), + two.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(errors.New("some error happened")), + two.MockServiceReconciler.EXPECT().Name().Return("two")) + }, + }, + } + + for name, tc := range cases { + tc := tc + t.Run(name, func(t *testing.T) { + g := NewWithT(t) + + t.Parallel() + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + newPausingServiceReconciler := func() pausingServiceReconciler { + return pausingServiceReconciler{ + mock_azure.NewMockServiceReconciler(mockCtrl), + mock_azure.NewMockPauser(mockCtrl), + } + } + svcOneMock := newPausingServiceReconciler() + svcTwoMock := newPausingServiceReconciler() + svcThreeMock := newPausingServiceReconciler() + + tc.expect(svcOneMock, svcTwoMock, svcThreeMock) + + s := &azureClusterService{ + services: []azure.ServiceReconciler{ + svcOneMock, + svcTwoMock, + svcThreeMock, + }, + } + + err := s.Pause(context.TODO()) + if tc.expectedError != "" { + g.Expect(err).To(HaveOccurred()) + g.Expect(err).To(MatchError(tc.expectedError)) + } else { + g.Expect(err).NotTo(HaveOccurred()) + } + }) + } +} + func TestAzureClusterServiceDelete(t *testing.T) { cases := map[string]struct { expectedError string diff --git a/controllers/helpers.go b/controllers/helpers.go index 7b4bd3a49e8f..05fd93695e5e 100644 --- a/controllers/helpers.go +++ b/controllers/helpers.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" infrav1 "sigs.k8s.io/cluster-api-provider-azure/api/v1beta1" "sigs.k8s.io/cluster-api-provider-azure/azure" "sigs.k8s.io/cluster-api-provider-azure/azure/scope" @@ -53,7 +54,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -1023,3 +1026,33 @@ func MachinePoolToAzureManagedControlPlaneMapFunc(ctx context.Context, c client. return nil } } + +// ClusterUpdatePauseChange returns a predicate that returns true for an update event when a cluster's +// Spec.Paused changes between any two distinct values. +func ClusterUpdatePauseChange(logger logr.Logger) predicate.Funcs { + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + log := logger.WithValues("predicate", "ClusterUpdatePauseChange", "eventType", "update") + + oldCluster, ok := e.ObjectOld.(*clusterv1.Cluster) + if !ok { + log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld)) + return false + } + log = log.WithValues("Cluster", klog.KObj(oldCluster)) + + newCluster := e.ObjectNew.(*clusterv1.Cluster) + + if oldCluster.Spec.Paused != newCluster.Spec.Paused { + log.V(4).Info("Cluster paused status changed, allowing further processing") + return true + } + + log.V(6).Info("Cluster paused status remained the same, blocking further processing") + return false + }, + CreateFunc: func(e event.CreateEvent) bool { return false }, + DeleteFunc: func(e event.DeleteEvent) bool { return false }, + GenericFunc: func(e event.GenericEvent) bool { return false }, + } +}