From b2b3039e6e9da1cd2f07cf7c731d229c5d7a2368 Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Mon, 18 Nov 2024 22:00:52 -0800 Subject: [PATCH 1/2] Delete cronjobs when pruning disabled and actually reconcile scheduler cronjobs Signed-off-by: Jason Parraga --- internal/controller/install/common_helpers.go | 38 ++- .../controller/install/lookout_controller.go | 25 +- .../install/lookout_controller_test.go | 182 +++++++++++++++ .../install/scheduler_controller.go | 25 +- .../install/scheduler_controller_test.go | 216 ++++++++++++++++++ 5 files changed, 460 insertions(+), 26 deletions(-) diff --git a/internal/controller/install/common_helpers.go b/internal/controller/install/common_helpers.go index f2bca45..af468c3 100644 --- a/internal/controller/install/common_helpers.go +++ b/internal/controller/install/common_helpers.go @@ -596,13 +596,13 @@ func upsertObjectIfNeeded( mutateFn controllerutil.MutateFn, logger logr.Logger, ) error { - if !isNil(object) { - logger.Info(fmt.Sprintf("Upserting %s %s object", componentName, object.GetObjectKind())) - if _, err := controllerutil.CreateOrUpdate(ctx, client, object, mutateFn); err != nil { - return err - } + if isNil(object) { + return nil } - return nil + + logger.Info(fmt.Sprintf("Upserting %s %s object", componentName, object.GetObjectKind())) + _, err := controllerutil.CreateOrUpdate(ctx, client, object, mutateFn) + return err } // Helper function to determine if the object is nil even if it's a pointer to a nil value @@ -619,6 +619,32 @@ func isNil(i any) bool { } } +// deleteObjectIfNeeded will delete the object if it exists. +func deleteObjectIfNeeded( + ctx context.Context, + client client.Client, + object client.Object, + componentName string, + logger logr.Logger, +) error { + if isNil(object) { + return nil + } + + err := client.Delete(ctx, object) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil // nothing to do + } else { + return err + } + } + + logger.Info("Successfully deleted %s %s", componentName, object.GetObjectKind()) + + return nil +} + // getObject will get the object from Kubernetes and return if it is missing or an error. func getObject( ctx context.Context, diff --git a/internal/controller/install/lookout_controller.go b/internal/controller/install/lookout_controller.go index ba817bc..982ce25 100644 --- a/internal/controller/install/lookout_controller.go +++ b/internal/controller/install/lookout_controller.go @@ -124,8 +124,14 @@ func (r *LookoutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, err } - if err := upsertObjectIfNeeded(ctx, r.Client, components.CronJob, lookout.Kind, mutateFn, logger); err != nil { - return ctrl.Result{}, err + if enabled := lookout.Spec.DbPruningEnabled; enabled != nil && *enabled { + if err := upsertObjectIfNeeded(ctx, r.Client, components.CronJob, lookout.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err + } + } else { + if err := deleteObjectIfNeeded(ctx, r.Client, components.CronJob, lookout.Kind, logger); err != nil { + return ctrl.Result{}, err + } } if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceMonitor, lookout.Kind, mutateFn, logger); err != nil { @@ -210,15 +216,12 @@ func generateLookoutInstallComponents( return nil, err } - var cronJob *batchv1.CronJob - if enabled := lookout.Spec.DbPruningEnabled; enabled != nil && *enabled { - cronJob, err = createLookoutCronJob(lookout, serviceAccountName) - if err != nil { - return nil, err - } - if err := controllerutil.SetOwnerReference(lookout, cronJob, scheme); err != nil { - return nil, err - } + cronJob, err := createLookoutCronJob(lookout, serviceAccountName) + if err != nil { + return nil, err + } + if err := controllerutil.SetOwnerReference(lookout, cronJob, scheme); err != nil { + return nil, err } ingressHTTP, err := createLookoutIngressHttp(lookout, config) diff --git a/internal/controller/install/lookout_controller_test.go b/internal/controller/install/lookout_controller_test.go index 70cbb46..f00c58f 100644 --- a/internal/controller/install/lookout_controller_test.go +++ b/internal/controller/install/lookout_controller_test.go @@ -215,6 +215,188 @@ func TestLookoutReconciler_Reconcile(t *testing.T) { } } +func TestLookoutReconciler_ReconcilePruningDisabled(t *testing.T) { + t.Parallel() + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + scheme, err := v1alpha1.SchemeBuilder.Build() + if err != nil { + t.Fatalf("should not return error when building schema") + } + + expectedNamespacedName := types.NamespacedName{Namespace: "default", Name: "lookout"} + dbPruningEnabled := false + dbPruningSchedule := "1d" + terminationGracePeriod := int64(20) + expectedLookout := v1alpha1.Lookout{ + TypeMeta: metav1.TypeMeta{ + Kind: "Lookout", + APIVersion: "install.armadaproject.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "lookout"}, + Spec: v1alpha1.LookoutSpec{ + Replicas: ptr.To[int32](2), + CommonSpecBase: installv1alpha1.CommonSpecBase{ + Labels: nil, + Image: v1alpha1.Image{ + Repository: "testrepo", + Tag: "1.0.0", + }, + ApplicationConfig: runtime.RawExtension{}, + Resources: &corev1.ResourceRequirements{}, + Prometheus: &installv1alpha1.PrometheusConfig{Enabled: true, ScrapeInterval: &metav1.Duration{Duration: 1 * time.Second}}, + TerminationGracePeriodSeconds: &terminationGracePeriod, + }, + ClusterIssuer: "test", + HostNames: []string{"localhost"}, + Ingress: &installv1alpha1.IngressConfig{ + IngressClass: "nginx", + Labels: map[string]string{"test": "hello"}, + Annotations: map[string]string{"test": "hello"}, + }, + DbPruningEnabled: &dbPruningEnabled, + DbPruningSchedule: &dbPruningSchedule, + }, + } + + mockK8sClient := k8sclient.NewMockClient(mockCtrl) + // Lookout + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&v1alpha1.Lookout{})). + Return(nil). + SetArg(2, expectedLookout) + + // Finalizer + mockK8sClient. + EXPECT(). + Update(gomock.Any(), gomock.AssignableToTypeOf(&installv1alpha1.Lookout{})). + Return(nil) + + // ServiceAccount + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&corev1.ServiceAccount{})). + Return(errors.NewNotFound(schema.GroupResource{}, "lookout")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&corev1.ServiceAccount{})). + Return(nil) + + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&corev1.Secret{})). + Return(errors.NewNotFound(schema.GroupResource{}, "lookout")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&corev1.Secret{})). + Return(nil) + + expectedJobName := types.NamespacedName{Namespace: "default", Name: "lookout-migration"} + expectedMigrationJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "lookout-migration", + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "lookout-migration", + Image: "testrepo:1.0.0", + }, + }, + }, + }, + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + }}, + }, + } + + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedJobName, gomock.AssignableToTypeOf(&batchv1.Job{})). + Return(errors.NewNotFound(schema.GroupResource{}, "lookout")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&batchv1.Job{})). + Return(nil) + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedJobName, gomock.AssignableToTypeOf(&batchv1.Job{})). + Return(nil). + SetArg(2, *expectedMigrationJob) + + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&appsv1.Deployment{})). + Return(errors.NewNotFound(schema.GroupResource{}, "lookout")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.Deployment{})). + Return(nil) + + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&corev1.Service{})). + Return(errors.NewNotFound(schema.GroupResource{}, "lookout")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&corev1.Service{})). + Return(nil) + + // IngressHttp + expectedIngressName := expectedNamespacedName + expectedIngressName.Name = expectedIngressName.Name + "-rest" + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedIngressName, gomock.AssignableToTypeOf(&networkingv1.Ingress{})). + Return(errors.NewNotFound(schema.GroupResource{}, "lookout")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&networkingv1.Ingress{})). + Return(nil) + + // ServiceMonitor + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&monitoringv1.ServiceMonitor{})). + Return(errors.NewNotFound(schema.GroupResource{}, "armadaserver")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&monitoringv1.ServiceMonitor{})). + Return(nil) + + // CronJob should be deleted + expectedCronJobName := expectedNamespacedName + expectedCronJobName.Name = expectedCronJobName.Name + "-db-pruner" + mockK8sClient. + EXPECT(). + Delete(gomock.Any(), gomock.AssignableToTypeOf(&batchv1.CronJob{})). + Return(nil) + + r := LookoutReconciler{ + Client: mockK8sClient, + Scheme: scheme, + } + + req := ctrl.Request{ + NamespacedName: types.NamespacedName{Namespace: "default", Name: "lookout"}, + } + + _, err = r.Reconcile(context.Background(), req) + if err != nil { + t.Fatalf("reconcile should not return error") + } +} + func TestLookoutReconciler_ReconcileNoLookout(t *testing.T) { t.Parallel() diff --git a/internal/controller/install/scheduler_controller.go b/internal/controller/install/scheduler_controller.go index 9953fc2..ac8ffab 100644 --- a/internal/controller/install/scheduler_controller.go +++ b/internal/controller/install/scheduler_controller.go @@ -135,6 +135,16 @@ func (r *SchedulerReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } + if scheduler.Spec.Pruner != nil && scheduler.Spec.Pruner.Enabled { + if err := upsertObjectIfNeeded(ctx, r.Client, components.CronJob, scheduler.Kind, mutateFn, logger); err != nil { + return ctrl.Result{}, err + } + } else { + if err := deleteObjectIfNeeded(ctx, r.Client, components.CronJob, scheduler.Kind, logger); err != nil { + return ctrl.Result{}, err + } + } + logger.Info("Successfully reconciled Scheduler object", "durationMillis", time.Since(started).Milliseconds()) return ctrl.Result{}, nil @@ -218,15 +228,12 @@ func generateSchedulerInstallComponents( return nil, err } - var cronJob *batchv1.CronJob - if scheduler.Spec.Pruner != nil && scheduler.Spec.Pruner.Enabled { - cronJob, err = newSchedulerCronJob(scheduler, serviceAccountName) - if err != nil { - return nil, err - } - if err := controllerutil.SetOwnerReference(scheduler, cronJob, scheme); err != nil { - return nil, err - } + cronJob, err := newSchedulerCronJob(scheduler, serviceAccountName) + if err != nil { + return nil, err + } + if err := controllerutil.SetOwnerReference(scheduler, cronJob, scheme); err != nil { + return nil, err } ingressGRPC, err := newSchedulerIngressGRPC(scheduler, config) diff --git a/internal/controller/install/scheduler_controller_test.go b/internal/controller/install/scheduler_controller_test.go index f1581e6..335fe46 100644 --- a/internal/controller/install/scheduler_controller_test.go +++ b/internal/controller/install/scheduler_controller_test.go @@ -187,6 +187,7 @@ func TestSchedulerReconciler_Reconcile(t *testing.T) { Create(gomock.Any(), gomock.AssignableToTypeOf(&monitoringv1.ServiceMonitor{})). Return(nil) + // PrometheusRule mockK8sClient. EXPECT(). Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&monitoringv1.PrometheusRule{})). @@ -196,6 +197,208 @@ func TestSchedulerReconciler_Reconcile(t *testing.T) { Create(gomock.Any(), gomock.AssignableToTypeOf(&monitoringv1.PrometheusRule{})). Return(nil) + // CronJob + expectedCronJobName := expectedNamespacedName + expectedCronJobName.Name = expectedCronJobName.Name + "-db-pruner" + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedCronJobName, gomock.AssignableToTypeOf(&batchv1.CronJob{})). + Return(errors.NewNotFound(schema.GroupResource{}, "armadaserver")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&batchv1.CronJob{})). + Return(nil) + + r := SchedulerReconciler{ + Client: mockK8sClient, + Scheme: scheme, + } + + req := ctrl.Request{ + NamespacedName: types.NamespacedName{Namespace: "default", Name: "scheduler"}, + } + + _, err = r.Reconcile(context.Background(), req) + if err != nil { + t.Fatalf("reconcile should not return error") + } +} + +func TestSchedulerReconciler_ReconcilePruningDisabled(t *testing.T) { + t.Parallel() + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + scheme, err := v1alpha1.SchemeBuilder.Build() + if err != nil { + t.Fatalf("should not return error when building schema") + } + + expectedNamespacedName := types.NamespacedName{Namespace: "default", Name: "scheduler"} + dbPruningEnabled := false + dbPruningSchedule := "1d" + terminationGracePeriod := int64(20) + expectedScheduler := v1alpha1.Scheduler{ + TypeMeta: metav1.TypeMeta{ + Kind: "Scheduler", + APIVersion: "install.armadaproject.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "scheduler"}, + Spec: v1alpha1.SchedulerSpec{ + Replicas: ptr.To[int32](2), + CommonSpecBase: installv1alpha1.CommonSpecBase{ + Labels: nil, + Image: v1alpha1.Image{ + Repository: "testrepo", + Tag: "1.0.0", + }, + ApplicationConfig: runtime.RawExtension{}, + Resources: &corev1.ResourceRequirements{}, + Prometheus: &installv1alpha1.PrometheusConfig{Enabled: true, ScrapeInterval: &metav1.Duration{Duration: 1 * time.Second}}, + TerminationGracePeriodSeconds: &terminationGracePeriod, + }, + ClusterIssuer: "test", + HostNames: []string{"localhost"}, + Ingress: &installv1alpha1.IngressConfig{ + IngressClass: "nginx", + Labels: map[string]string{"test": "hello"}, + Annotations: map[string]string{"test": "hello"}, + }, + Pruner: &installv1alpha1.PrunerConfig{ + Enabled: dbPruningEnabled, + Schedule: dbPruningSchedule, + }, + }, + } + + commonConfig, err := builders.ParseCommonApplicationConfig(expectedScheduler.Spec.ApplicationConfig) + if err != nil { + t.Fatalf("should not return error when parsing common application config") + } + scheduler, err := generateSchedulerInstallComponents(&expectedScheduler, scheme, commonConfig) + if err != nil { + t.Fatal("We should not fail on generating scheduler") + } + + mockK8sClient := k8sclient.NewMockClient(mockCtrl) + // Scheduler + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&v1alpha1.Scheduler{})). + Return(nil). + SetArg(2, expectedScheduler) + + // Finalizer + mockK8sClient. + EXPECT(). + Update(gomock.Any(), gomock.AssignableToTypeOf(&installv1alpha1.Scheduler{})). + Return(nil) + + // ServiceAccount + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&corev1.ServiceAccount{})). + Return(errors.NewNotFound(schema.GroupResource{}, "scheduler")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&corev1.ServiceAccount{})). + Return(nil). + SetArg(1, *scheduler.ServiceAccount) + + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&corev1.Secret{})). + Return(errors.NewNotFound(schema.GroupResource{}, "scheduler")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&corev1.Secret{})). + Return(nil). + SetArg(1, *scheduler.Secret) + + expectedJobName := types.NamespacedName{Namespace: "default", Name: "scheduler-migration"} + scheduler.Jobs[0].Status = batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + }}, + } + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedJobName, gomock.AssignableToTypeOf(&batchv1.Job{})). + Return(errors.NewNotFound(schema.GroupResource{}, "scheduler")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&batchv1.Job{})). + Return(nil). + SetArg(1, *scheduler.Jobs[0]) + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedJobName, gomock.AssignableToTypeOf(&batchv1.Job{})). + Return(nil). + SetArg(2, *scheduler.Jobs[0]) + + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&appsv1.Deployment{})). + Return(errors.NewNotFound(schema.GroupResource{}, "scheduler")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.Deployment{})). + Return(nil). + SetArg(1, *scheduler.Deployment) + + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&corev1.Service{})). + Return(errors.NewNotFound(schema.GroupResource{}, "scheduler")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&corev1.Service{})). + Return(nil). + SetArg(1, *scheduler.Service) + + // IngressGrpc + expectedIngressName := expectedNamespacedName + expectedIngressName.Name = expectedIngressName.Name + "-grpc" + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedIngressName, gomock.AssignableToTypeOf(&networkingv1.Ingress{})). + Return(errors.NewNotFound(schema.GroupResource{}, "scheduler")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&networkingv1.Ingress{})). + Return(nil). + SetArg(1, *scheduler.IngressGrpc) + + // ServiceMonitor + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&monitoringv1.ServiceMonitor{})). + Return(errors.NewNotFound(schema.GroupResource{}, "scheduler")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&monitoringv1.ServiceMonitor{})). + Return(nil) + + // PrometheusRule + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&monitoringv1.PrometheusRule{})). + Return(errors.NewNotFound(schema.GroupResource{}, "scheduler")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&monitoringv1.PrometheusRule{})). + Return(nil) + + // CronJob + expectedCronJobName := expectedNamespacedName + expectedCronJobName.Name = expectedCronJobName.Name + "-db-pruner" + mockK8sClient. + EXPECT(). + Delete(gomock.Any(), gomock.AssignableToTypeOf(&batchv1.CronJob{})). + Return(nil) + r := SchedulerReconciler{ Client: mockK8sClient, Scheme: scheme, @@ -459,6 +662,7 @@ func TestSchedulerReconciler_ReconcileMissingResources(t *testing.T) { Create(gomock.Any(), gomock.AssignableToTypeOf(&monitoringv1.ServiceMonitor{})). Return(nil) + // PrometheusRule mockK8sClient. EXPECT(). Get(gomock.Any(), expectedNamespacedName, gomock.AssignableToTypeOf(&monitoringv1.PrometheusRule{})). @@ -468,6 +672,18 @@ func TestSchedulerReconciler_ReconcileMissingResources(t *testing.T) { Create(gomock.Any(), gomock.AssignableToTypeOf(&monitoringv1.PrometheusRule{})). Return(nil) + // CronJob + expectedCronJobName := expectedNamespacedName + expectedCronJobName.Name = expectedCronJobName.Name + "-db-pruner" + mockK8sClient. + EXPECT(). + Get(gomock.Any(), expectedCronJobName, gomock.AssignableToTypeOf(&batchv1.CronJob{})). + Return(errors.NewNotFound(schema.GroupResource{}, "armadaserver")) + mockK8sClient. + EXPECT(). + Create(gomock.Any(), gomock.AssignableToTypeOf(&batchv1.CronJob{})). + Return(nil) + r := SchedulerReconciler{ Client: mockK8sClient, Scheme: scheme, From 4238007b3559cf8290d01ebb79a96acec350596f Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Wed, 20 Nov 2024 18:02:44 -0800 Subject: [PATCH 2/2] Address comments Signed-off-by: Jason Parraga --- internal/controller/install/common_helpers.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/internal/controller/install/common_helpers.go b/internal/controller/install/common_helpers.go index af468c3..8002b4f 100644 --- a/internal/controller/install/common_helpers.go +++ b/internal/controller/install/common_helpers.go @@ -622,7 +622,7 @@ func isNil(i any) bool { // deleteObjectIfNeeded will delete the object if it exists. func deleteObjectIfNeeded( ctx context.Context, - client client.Client, + k8sClient client.Client, object client.Object, componentName string, logger logr.Logger, @@ -631,13 +631,8 @@ func deleteObjectIfNeeded( return nil } - err := client.Delete(ctx, object) - if err != nil { - if k8serrors.IsNotFound(err) { - return nil // nothing to do - } else { - return err - } + if err := k8sClient.Delete(ctx, object); err != nil { + return client.IgnoreNotFound(err) } logger.Info("Successfully deleted %s %s", componentName, object.GetObjectKind())