Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pause handling for AzureMachine, AzureManagedMachinePool, AzureMachinePool #3808

Merged
merged 1 commit into from
Aug 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions controllers/azuremachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (amr *AzureMachineReconciler) SetupWithManager(ctx context.Context, mgr ctr
c, err := ctrl.NewControllerManagedBy(mgr).
WithOptions(options.Options).
For(&infrav1.AzureMachine{}).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(log, amr.WatchFilterValue)).
WithEventFilter(predicates.ResourceHasFilterLabel(log, amr.WatchFilterValue)).
// watch for changes in CAPI Machine resources
Watches(
&source.Kind{Type: &clusterv1.Machine{}},
Expand All @@ -113,12 +113,12 @@ func (amr *AzureMachineReconciler) SetupWithManager(ctx context.Context, mgr ctr
return errors.Wrap(err, "failed to create mapper for Cluster to AzureMachines")
}

// Add a watch on clusterv1.Cluster object for unpause & ready notifications.
// Add a watch on clusterv1.Cluster object for pause/unpause & ready notifications.
if err := c.Watch(
&source.Kind{Type: &clusterv1.Cluster{}},
handler.EnqueueRequestsFromMapFunc(azureMachineMapper),
predicates.ClusterUnpausedAndInfrastructureReady(log),
predicates.ResourceNotPausedAndHasFilterLabel(log, amr.WatchFilterValue),
ClusterPauseChangeAndInfrastructureReady(log),
predicates.ResourceHasFilterLabel(log, amr.WatchFilterValue),
); err != nil {
return errors.Wrap(err, "failed adding a watch for ready clusters")
}
Expand Down Expand Up @@ -179,12 +179,6 @@ func (amr *AzureMachineReconciler) Reconcile(ctx context.Context, req ctrl.Reque

log = log.WithValues("cluster", cluster.Name)

// Return early if the object or Cluster is paused.
if annotations.IsPaused(cluster, azureMachine) {
log.Info("AzureMachine or linked Cluster is marked as paused. Won't reconcile")
return ctrl.Result{}, nil
}

log = log.WithValues("AzureCluster", cluster.Spec.InfrastructureRef.Name)
azureClusterName := client.ObjectKey{
Namespace: azureMachine.Namespace,
Expand Down Expand Up @@ -227,6 +221,12 @@ func (amr *AzureMachineReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}
}()

// Return early if the object or Cluster is paused.
if annotations.IsPaused(cluster, azureMachine) {
log.Info("AzureMachine or linked Cluster is marked as paused. Won't reconcile normally")
return amr.reconcilePause(ctx, machineScope)
}

// Handle deleted machines
if !azureMachine.ObjectMeta.DeletionTimestamp.IsZero() {
return amr.reconcileDelete(ctx, machineScope, clusterScope)
Expand Down Expand Up @@ -341,6 +341,24 @@ func (amr *AzureMachineReconciler) reconcileNormal(ctx context.Context, machineS
return reconcile.Result{}, nil
}

func (amr *AzureMachineReconciler) reconcilePause(ctx context.Context, machineScope *scope.MachineScope) (reconcile.Result, error) {
ctx, log, done := tele.StartSpanWithLogger(ctx, "controllers.AzureMachine.reconcilePause")
defer done()

log.Info("Reconciling AzureMachine pause")

ams, err := amr.createAzureMachineService(machineScope)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to create azure machine service")
}

if err := ams.Pause(ctx); err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to pause azure machine services")
}

return reconcile.Result{}, nil
}

func (amr *AzureMachineReconciler) reconcileDelete(ctx context.Context, machineScope *scope.MachineScope, clusterScope *scope.ClusterScope) (reconcile.Result, error) {
ctx, log, done := tele.StartSpanWithLogger(ctx, "controllers.AzureMachineReconciler.reconcileDelete")
defer done()
Expand Down
55 changes: 45 additions & 10 deletions controllers/azuremachine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,6 @@ func TestAzureMachineReconcile(t *testing.T) {
},
event: "Unable to get cluster from metadata",
},
"should return if cluster is paused": {
objects: []runtime.Object{
getFakeCluster(func(c *clusterv1.Cluster) {
c.Spec.Paused = true
}),
defaultAzureCluster,
defaultAzureMachine,
defaultMachine,
},
},
"should return if azureCluster does not yet available": {
objects: []runtime.Object{
defaultCluster,
Expand Down Expand Up @@ -249,6 +239,45 @@ func TestAzureMachineReconcileNormal(t *testing.T) {
}
}

func TestAzureMachineReconcilePause(t *testing.T) {
cases := map[string]TestReconcileInput{
"should pause successfully": {
createAzureMachineService: getFakeAzureMachineService,
cache: &scope.MachineCache{},
},
"should fail if failed to create azure machine service": {
createAzureMachineService: getFakeAzureMachineServiceWithFailure,
cache: &scope.MachineCache{},
expectedErr: "failed to create AzureMachineService",
},
"should fail to pause for errors": {
createAzureMachineService: getFakeAzureMachineServiceWithGeneralError,
cache: &scope.MachineCache{},
expectedErr: "failed to pause azure machine service",
},
}

for name, c := range cases {
tc := c
t.Run(name, func(t *testing.T) {
g := NewWithT(t)

reconciler, machineScope, _, err := getReconcileInputs(tc)
g.Expect(err).NotTo(HaveOccurred())

result, err := reconciler.reconcilePause(context.Background(), machineScope)
g.Expect(result).To(Equal(tc.expectedResult))

if tc.expectedErr != "" {
g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(ContainSubstring(tc.expectedErr))
} else {
g.Expect(err).NotTo(HaveOccurred())
}
})
}
}

func TestAzureMachineReconcileDelete(t *testing.T) {
cases := map[string]TestReconcileInput{
"should delete successfully": {
Expand Down Expand Up @@ -431,6 +460,9 @@ func getFakeAzureMachineServiceWithGeneralError(machineScope *scope.MachineScope
ams.Reconcile = func(context.Context) error {
return errors.New("foo error")
}
ams.Pause = func(context.Context) error {
return errors.New("foo error")
}
ams.Delete = func(context.Context) error {
return errors.New("foo error")
}
Expand All @@ -446,6 +478,9 @@ func getDefaultAzureMachineService(machineScope *scope.MachineScope, cache *reso
Reconcile: func(context.Context) error {
return nil
},
Pause: func(context.Context) error {
return nil
},
Delete: func(context.Context) error {
return nil
},
Expand Down
24 changes: 22 additions & 2 deletions controllers/azuremachine_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type azureMachineService struct {
services []azure.ServiceReconciler
skuCache *resourceskus.Cache
Reconcile func(context.Context) error
Pause func(context.Context) error
Delete func(context.Context) error
}

Expand All @@ -68,14 +69,15 @@ func newAzureMachineService(machineScope *scope.MachineScope) (*azureMachineServ
skuCache: cache,
}
ams.Reconcile = ams.reconcile
ams.Pause = ams.pause
ams.Delete = ams.delete

return ams, nil
}

// reconcile reconciles all the services in a predetermined order.
func (s *azureMachineService) reconcile(ctx context.Context) error {
ctx, _, done := tele.StartSpanWithLogger(ctx, "controllers.azureMachineService.Reconcile")
ctx, _, done := tele.StartSpanWithLogger(ctx, "controllers.azureMachineService.reconcile")
defer done()

// Ensure that the deprecated networking field values have been migrated to the new NetworkInterfaces field.
Expand All @@ -94,9 +96,27 @@ func (s *azureMachineService) reconcile(ctx context.Context) error {
return nil
}

// pause pauses all components making up the machine.
func (s *azureMachineService) pause(ctx context.Context) error {
ctx, _, done := tele.StartSpanWithLogger(ctx, "controllers.azureMachineService.pause")
defer done()

for _, service := range s.services {
pauser, ok := service.(azure.Pauser)
if !ok {
continue
}
if err := pauser.Pause(ctx); err != nil {
return errors.Wrapf(err, "failed to pause AzureMachine service %s", service.Name())
}
}

return nil
}

// delete deletes all the services in a predetermined order.
func (s *azureMachineService) delete(ctx context.Context) error {
ctx, _, done := tele.StartSpanWithLogger(ctx, "controllers.azureMachineService.Delete")
ctx, _, done := tele.StartSpanWithLogger(ctx, "controllers.azureMachineService.delete")
defer done()

// Delete services in reverse order of creation.
Expand Down
70 changes: 70 additions & 0 deletions controllers/azuremachine_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,76 @@ func TestAzureMachineServiceReconcile(t *testing.T) {
}
}

func TestAzureMachineServicePause(t *testing.T) {
type pausingServiceReconciler struct {
*mock_azure.MockServiceReconciler
*mock_azure.MockPauser
}

cases := map[string]struct {
expectedError string
expect func(one pausingServiceReconciler, two pausingServiceReconciler, three pausingServiceReconciler)
}{
"all services are paused in order": {
expectedError: "",
expect: func(one pausingServiceReconciler, two pausingServiceReconciler, three pausingServiceReconciler) {
gomock.InOrder(
one.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil),
two.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil),
three.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil))
},
},
"service pause fails": {
expectedError: "failed to pause AzureMachine service two: some error happened",
expect: func(one pausingServiceReconciler, two pausingServiceReconciler, _ pausingServiceReconciler) {
gomock.InOrder(
one.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(nil),
two.MockPauser.EXPECT().Pause(gomockinternal.AContext()).Return(errors.New("some error happened")),
two.MockServiceReconciler.EXPECT().Name().Return("two"))
},
},
}

for name, tc := range cases {
tc := tc
t.Run(name, func(t *testing.T) {
g := NewWithT(t)

t.Parallel()
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

newPausingServiceReconciler := func() pausingServiceReconciler {
return pausingServiceReconciler{
mock_azure.NewMockServiceReconciler(mockCtrl),
mock_azure.NewMockPauser(mockCtrl),
}
}
svcOneMock := newPausingServiceReconciler()
svcTwoMock := newPausingServiceReconciler()
svcThreeMock := newPausingServiceReconciler()

tc.expect(svcOneMock, svcTwoMock, svcThreeMock)

s := &azureMachineService{
services: []azure.ServiceReconciler{
svcOneMock,
svcTwoMock,
svcThreeMock,
},
}

err := s.pause(context.TODO())
if tc.expectedError != "" {
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError(tc.expectedError))
} else {
g.Expect(err).NotTo(HaveOccurred())
}
})
}
}

func TestAzureMachineServiceDelete(t *testing.T) {
cases := map[string]struct {
expectedError string
Expand Down
2 changes: 1 addition & 1 deletion controllers/azuremanagedcontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (amcpr *AzureManagedControlPlaneReconciler) SetupWithManager(ctx context.Co
if err = c.Watch(
&source.Kind{Type: &clusterv1.Cluster{}},
handler.EnqueueRequestsFromMapFunc(amcpr.ClusterToAzureManagedControlPlane),
predicates.Any(log, predicates.ClusterCreateInfraReady(log), predicates.ClusterUpdateInfraReady(log), ClusterUpdatePauseChange(log)),
ClusterPauseChangeAndInfrastructureReady(log),
predicates.ResourceHasFilterLabel(log, amcpr.WatchFilterValue),
); err != nil {
return errors.Wrap(err, "failed adding a watch for ready clusters")
Expand Down
38 changes: 28 additions & 10 deletions controllers/azuremanagedmachinepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (ammpr *AzureManagedMachinePoolReconciler) SetupWithManager(ctx context.Con
c, err := ctrl.NewControllerManagedBy(mgr).
WithOptions(options.Options).
For(azManagedMachinePool).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(log, ammpr.WatchFilterValue)).
WithEventFilter(predicates.ResourceHasFilterLabel(log, ammpr.WatchFilterValue)).
// watch for changes in CAPI MachinePool resources
Watches(
&source.Kind{Type: &expv1.MachinePool{}},
Expand All @@ -112,12 +112,12 @@ func (ammpr *AzureManagedMachinePoolReconciler) SetupWithManager(ctx context.Con
return errors.Wrap(err, "failed to create mapper for Cluster to AzureManagedMachinePools")
}

// Add a watch on clusterv1.Cluster object for unpause & ready notifications.
// Add a watch on clusterv1.Cluster object for pause/unpause & ready notifications.
if err = c.Watch(
&source.Kind{Type: &clusterv1.Cluster{}},
handler.EnqueueRequestsFromMapFunc(azureManagedMachinePoolMapper),
predicates.ClusterUnpausedAndInfrastructureReady(log),
predicates.ResourceNotPausedAndHasFilterLabel(log, ammpr.WatchFilterValue),
ClusterPauseChangeAndInfrastructureReady(log),
predicates.ResourceHasFilterLabel(log, ammpr.WatchFilterValue),
); err != nil {
return errors.Wrap(err, "failed adding a watch for ready clusters")
}
Expand Down Expand Up @@ -174,12 +174,6 @@ func (ammpr *AzureManagedMachinePoolReconciler) Reconcile(ctx context.Context, r

log = log.WithValues("ownerCluster", ownerCluster.Name)

// Return early if the object or Cluster is paused.
if annotations.IsPaused(ownerCluster, infraPool) {
log.Info("AzureManagedMachinePool or linked Cluster is marked as paused. Won't reconcile")
return ctrl.Result{}, nil
}

// Fetch the corresponding control plane which has all the interesting data.
controlPlane := &infrav1.AzureManagedControlPlane{}
controlPlaneName := client.ObjectKey{
Expand Down Expand Up @@ -233,6 +227,12 @@ func (ammpr *AzureManagedMachinePoolReconciler) Reconcile(ctx context.Context, r
}
}()

// Return early if the object or Cluster is paused.
if annotations.IsPaused(ownerCluster, infraPool) {
log.Info("AzureManagedMachinePool or linked Cluster is marked as paused. Won't reconcile normally")
return ammpr.reconcilePause(ctx, mcpScope)
}

// Handle deleted clusters
if !infraPool.DeletionTimestamp.IsZero() {
return ammpr.reconcileDelete(ctx, mcpScope)
Expand Down Expand Up @@ -294,6 +294,24 @@ func (ammpr *AzureManagedMachinePoolReconciler) reconcileNormal(ctx context.Cont
return reconcile.Result{}, nil
}

func (ammpr *AzureManagedMachinePoolReconciler) reconcilePause(ctx context.Context, scope *scope.ManagedMachinePoolScope) (reconcile.Result, error) {
ctx, log, done := tele.StartSpanWithLogger(ctx, "controllers.AzureManagedMachinePool.reconcilePause")
defer done()

log.Info("Reconciling AzureManagedMachinePool pause")

svc, err := ammpr.createAzureManagedMachinePoolService(scope)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to create an AzureManageMachinePoolService")
}

if err := svc.Pause(ctx); err != nil {
return reconcile.Result{}, errors.Wrapf(err, "error pausing AzureManagedMachinePool %s/%s", scope.InfraMachinePool.Namespace, scope.InfraMachinePool.Name)
}

return reconcile.Result{}, nil
}

func (ammpr *AzureManagedMachinePoolReconciler) reconcileDelete(ctx context.Context, scope *scope.ManagedMachinePoolScope) (reconcile.Result, error) {
ctx, log, done := tele.StartSpanWithLogger(ctx, "controllers.AzureManagedMachinePoolReconciler.reconcileDelete")
defer done()
Expand Down
Loading