diff --git a/internal/controller/isbservicerollout_controller_test.go b/internal/controller/isbservicerollout_controller_test.go index 595fc6e7..ca1898ef 100644 --- a/internal/controller/isbservicerollout_controller_test.go +++ b/internal/controller/isbservicerollout_controller_test.go @@ -33,13 +33,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var _ = Describe("ISBServiceRollout Controller", func() { +var _ = Describe("ISBServiceRollout Controller", Ordered, func() { const ( namespace = "default" isbServiceRolloutName = "isbservicerollout-test" - timeout = 10 * time.Second - duration = 10 * time.Second - interval = 250 * time.Millisecond ) ctx := context.Background() @@ -256,30 +253,9 @@ var _ = Describe("ISBServiceRollout Controller", func() { }) - It("Should auto heal the InterStepBufferService with the ISBServiceRollout interstepbufferservice spec when the InterStepBufferService spec is changed", func() { - By("updating the InterStepBufferService") - currentISBService := &numaflowv1.InterStepBufferService{} - Expect(k8sClient.Get(ctx, resourceLookupKey, currentISBService)).To(Succeed()) - - originalJetstreamVersion := currentISBService.Spec.JetStream.Version - newJetstreamVersion := "1.2.3" - currentISBService.Spec.JetStream.Version = newJetstreamVersion - - Expect(k8sClient.Update(ctx, currentISBService)).ToNot(HaveOccurred()) - - By("Verifying the changed field of the InterStepBufferService is the same as the original and not the modified version") - e := Consistently(func() (string, error) { - updatedResource := &numaflowv1.InterStepBufferService{} - err := k8sClient.Get(ctx, resourceLookupKey, updatedResource) - if err != nil { - return "", err - } - - return updatedResource.Spec.JetStream.Version, nil - }, duration, interval) - - e.Should(Equal(originalJetstreamVersion)) - e.ShouldNot(Equal(newJetstreamVersion)) + It("Should auto heal the InterStepBufferService with the ISBServiceRollout pipeline spec when the InterStepBufferService spec is changed", func() { + By("updating the InterStepBufferService and verifying the changed field is the same as the original and not the modified version") + verifyAutoHealing(ctx, numaflowv1.ISBGroupVersionKind, namespace, isbServiceRolloutName, "spec.jetstream.version", "1.2.3.4.5") }) It("Should delete the ISBServiceRollout and InterStepBufferService", func() { diff --git a/internal/controller/numaflowcontrollerrollout_controller.go b/internal/controller/numaflowcontrollerrollout_controller.go index e0a1b290..1d43e6fc 100644 --- a/internal/controller/numaflowcontrollerrollout_controller.go +++ b/internal/controller/numaflowcontrollerrollout_controller.go @@ -26,16 +26,23 @@ import ( gitopsSync "github.com/argoproj/gitops-engine/pkg/sync" gitopsSyncCommon "github.com/argoproj/gitops-engine/pkg/sync/common" kubeUtil "github.com/argoproj/gitops-engine/pkg/utils/kube" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/rest" + "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + runtimecontroller "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" "github.com/numaproj/numaplane/internal/common" "github.com/numaproj/numaplane/internal/controller/config" @@ -51,6 +58,7 @@ const ( const ( ControllerNumaflowControllerRollout = "numaflow-controller-rollout-controller" + NumaflowControllerDeploymentName = "numaflow-controller" ) // NumaflowControllerRolloutReconciler reconciles a NumaflowControllerRollout object @@ -135,7 +143,16 @@ func (r *NumaflowControllerRolloutReconciler) Reconcile(ctx context.Context, req // Update the Status subresource if numaflowControllerRollout.DeletionTimestamp.IsZero() { // would've already been deleted - if err := r.client.Status().Update(ctx, numaflowControllerRollout); err != nil { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + latestNumaflowControllerRollout := &apiv1.NumaflowControllerRollout{} + if err := r.client.Get(ctx, req.NamespacedName, latestNumaflowControllerRollout); err != nil { + return err + } + + latestNumaflowControllerRollout.Status = numaflowControllerRollout.Status + return r.client.Status().Update(ctx, latestNumaflowControllerRollout) + }) + if err != nil { numaLogger.Error(err, "Error Updating NumaflowControllerRollout Status", "NumaflowControllerRollout", numaflowControllerRollout) return ctrl.Result{}, err } @@ -352,38 +369,26 @@ func (r *NumaflowControllerRolloutReconciler) getResourceOperations() (kubeUtil. // SetupWithManager sets up the controller with the Manager. func (r *NumaflowControllerRolloutReconciler) SetupWithManager(mgr ctrl.Manager) error { + controller, err := runtimecontroller.New(ControllerNumaflowControllerRollout, mgr, runtimecontroller.Options{Reconciler: r}) + if err != nil { + return err + } - return ctrl.NewControllerManagedBy(mgr). - // Reconcile NumaflowControllerRollouts when there's been a Generation changed (i.e. Spec change) - For(&apiv1.NumaflowControllerRollout{}).WithEventFilter(predicate.GenerationChangedPredicate{}). - Complete(r) - - /* - controller, err := runtimecontroller.New(ControllerNumaflowControllerRollout, mgr, runtimecontroller.Options{Reconciler: r}) - - if err != nil { - return err - } - - // Watch NumaflowControllerRollouts - - if err := controller.Watch(source.Kind(mgr.GetCache(), &apiv1.NumaflowControllerRollout{}), &handler.EnqueueRequestForObject{}, predicate.GenerationChangedPredicate{}); err != nil { - return err - } - - // Watch Deployments of numaflow-controller - // Can add other resources as well - numaflowControllerDeployments := appv1.Deployment{} - numaflowControllerDeployments.Name = "numaflow-controller" // not sure if this would work or not - if err := controller.Watch(source.Kind(mgr.GetCache(), &numaflowControllerDeployments), + // Watch NumaflowControllerRollouts + if err := controller.Watch(source.Kind(mgr.GetCache(), &apiv1.NumaflowControllerRollout{}), &handler.EnqueueRequestForObject{}, predicate.GenerationChangedPredicate{}); err != nil { + return err + } - handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &apiv1.NumaflowControllerRollout{}, handler.OnlyControllerOwner()), - predicate.GenerationChangedPredicate{}); err != nil { - return err - } + // Watch NumaflowControllerRollout child resources: numaflow-controller Deployment, ConfigMap, ServiceAccount, Role, RoleBinding + for _, kind := range []client.Object{&appsv1.Deployment{}, &corev1.ConfigMap{}, &corev1.ServiceAccount{}, &rbacv1.Role{}, &rbacv1.RoleBinding{}} { + if err := controller.Watch(source.Kind(mgr.GetCache(), kind), + handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &apiv1.NumaflowControllerRollout{}, handler.OnlyControllerOwner()), + predicate.GenerationChangedPredicate{}); err != nil { + return err + } + } - return nil - */ + return nil } // SplitYAMLToString splits a YAML file into strings. Returns list of yamls diff --git a/internal/controller/numaflowcontrollerrollout_controller_test.go b/internal/controller/numaflowcontrollerrollout_controller_test.go index ab41446b..0e80600a 100644 --- a/internal/controller/numaflowcontrollerrollout_controller_test.go +++ b/internal/controller/numaflowcontrollerrollout_controller_test.go @@ -18,27 +18,30 @@ package controller import ( "context" + "strings" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "strings" apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" ) -var _ = Describe("NumaflowControllerRollout Controller", func() { +var _ = Describe("NumaflowControllerRollout Controller", Ordered, func() { const ( - namespace = "default" + namespace = "default" + resourceName = "numaflow-controller" ) - Context("When reconciling a resource", func() { - const resourceName = "numaflow-controller" - ctx := context.Background() + ctx := context.Background() + Context("When reconciling a resource", func() { typeNamespacedName := types.NamespacedName{ Name: resourceName, - Namespace: namespace, // TODO(user):Modify as needed + Namespace: namespace, } numaflowControllerRollout := apiv1.NumaflowControllerRollout{ @@ -51,13 +54,6 @@ var _ = Describe("NumaflowControllerRollout Controller", func() { }, } - AfterEach(func() { - // Cleanup the resource after each test and ignore the error if it doesn't exist - resource := &apiv1.NumaflowControllerRollout{} - _ = k8sClient.Get(ctx, typeNamespacedName, resource) - _ = k8sClient.Delete(ctx, resource) - }) - It("Should throw a CR validation error", func() { By("Creating a NumaflowControllerRollout resource with an invalid name") resource := numaflowControllerRollout @@ -79,21 +75,22 @@ var _ = Describe("NumaflowControllerRollout Controller", func() { Expect(err.Error()).To(ContainSubstring("numaflowcontrollerrollouts.numaplane.numaproj.io \"numaflow-controller\" already exists")) }) - // It("should successfully reconcile the resource", func() { - // By("Reconciling the created resource") - // controllerReconciler := &NumaflowControllerRolloutReconciler{ - // client: k8sClient, - // scheme: k8sClient.Scheme(), - // restConfig: cfg, - // } - - // _, err := controllerReconciler.Reconcile(ctx, sigsReconcile.Request{ - // NamespacedName: typeNamespacedName, - // }) - // Expect(err).NotTo(HaveOccurred()) - // // TODO(user): Add more specific assertions depending on your controller's reconciliation logic. - // // Example: If you expect a certain status condition after reconciliation, verify it here. - // }) + It("Should auto heal the Numaflow Controller Deployment with the spec based on the NumaflowControllerRollout version field value when the Deployment spec is changed", func() { + By("updating the Numaflow Controller Deployment and verifying the changed field is the same as the original and not the modified version") + verifyAutoHealing(ctx, appsv1.SchemeGroupVersion.WithKind("Deployment"), namespace, "numaflow-controller", "spec.template.spec.serviceAccountName", "someothersaname") + }) + + It("Should auto heal the numaflow-cmd-params-config ConfigMap with the spec based on the NumaflowControllerRollout version field value when the ConfigMap spec is changed", func() { + By("updating the numaflow-cmd-params-config ConfigMap and verifying the changed field is the same as the original and not the modified version") + verifyAutoHealing(ctx, corev1.SchemeGroupVersion.WithKind("ConfigMap"), namespace, "numaflow-cmd-params-config", "data.namespaced", "false") + }) + + AfterAll(func() { + // Cleanup the resource after each test and ignore the error if it doesn't exist + resource := &apiv1.NumaflowControllerRollout{} + _ = k8sClient.Get(ctx, typeNamespacedName, resource) + _ = k8sClient.Delete(ctx, resource) + }) }) }) @@ -192,19 +189,19 @@ spec: Namespace: "default", }, } - It("should apply ownership reference correctly", func() { + It("should apply ownership reference correctly", func() { manifests, err := applyOwnershipToManifests(manifests, resource) Expect(err).To(BeNil()) Expect(strings.TrimSpace(manifests[0])).To(Equal(strings.TrimSpace(emanifests[0]))) Expect(strings.TrimSpace(manifests[1])).To(Equal(strings.TrimSpace(emanifests[1]))) }) + It("should not apply ownership if it already exists", func() { manifests, err := applyOwnershipToManifests(emanifests, resource) Expect(err).To(BeNil()) Expect(strings.TrimSpace(manifests[0])).To(Equal(strings.TrimSpace(emanifests[0]))) Expect(strings.TrimSpace(manifests[1])).To(Equal(strings.TrimSpace(emanifests[1]))) - }) }) }) diff --git a/internal/controller/pipelinerollout_controller_test.go b/internal/controller/pipelinerollout_controller_test.go index 93374220..9fddc657 100644 --- a/internal/controller/pipelinerollout_controller_test.go +++ b/internal/controller/pipelinerollout_controller_test.go @@ -36,13 +36,10 @@ import ( apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" ) -var _ = Describe("PipelineRollout Controller", func() { +var _ = Describe("PipelineRollout Controller", Ordered, func() { const ( namespace = "default" pipelineRolloutName = "pipelinerollout-test" - timeout = 10 * time.Second - duration = 10 * time.Second - interval = 250 * time.Millisecond ) ctx := context.Background() @@ -284,29 +281,8 @@ var _ = Describe("PipelineRollout Controller", func() { }) It("Should auto heal the Numaflow Pipeline with the PipelineRollout pipeline spec when the Numaflow Pipeline spec is changed", func() { - By("updating the Numaflow Pipeline") - currentPipeline := &numaflowv1.Pipeline{} - Expect(k8sClient.Get(ctx, resourceLookupKey, currentPipeline)).To(Succeed()) - - originalISBServiceName := currentPipeline.Spec.InterStepBufferServiceName - newISBServiceName := "my-isbsvc-updated-in-child" - currentPipeline.Spec.InterStepBufferServiceName = newISBServiceName - - Expect(k8sClient.Update(ctx, currentPipeline)).ToNot(HaveOccurred()) - - By("Verifying the changed field of the Numaflow Pipeline is the same as the original and not the modified version") - e := Consistently(func() (string, error) { - updatedResource := &numaflowv1.Pipeline{} - err := k8sClient.Get(ctx, resourceLookupKey, updatedResource) - if err != nil { - return "", err - } - - return updatedResource.Spec.InterStepBufferServiceName, nil - }, duration, interval) - - e.Should(Equal(originalISBServiceName)) - e.ShouldNot(Equal(newISBServiceName)) + By("updating the Numaflow Pipeline and verifying the changed field is the same as the original and not the modified version") + verifyAutoHealing(ctx, numaflowv1.PipelineGroupVersionKind, namespace, pipelineRolloutName, "spec.interStepBufferServiceName", "someotherisbsname") }) It("Should delete the PipelineRollout and Numaflow Pipeline", func() { diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 670771f3..9577e5b3 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -17,17 +17,27 @@ limitations under the License. package controller import ( + "context" "fmt" "io" "net/http" "os" "path/filepath" "runtime" + "strings" "testing" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/numaproj/numaplane/internal/controller/config" + "github.com/numaproj/numaplane/internal/util/kubernetes" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" @@ -37,12 +47,19 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaplane/internal/sync" apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" ) // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. +const ( + timeout = 10 * time.Second + duration = 10 * time.Second + interval = 250 * time.Millisecond +) + var ( cfg *rest.Config k8sClient client.Client @@ -125,6 +142,27 @@ var _ = BeforeSuite(func() { }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) + stateCache := sync.NewLiveStateCache(cfg) + err = stateCache.Init(nil) + Expect(err).ToNot(HaveOccurred()) + + configManager := config.GetConfigManagerInstance() + err = configManager.LoadAllConfigs(func(err error) { + Expect(err).ToNot(HaveOccurred()) + }) + Expect(err).ToNot(HaveOccurred()) + config.GetConfigManagerInstance().UpdateControllerDefinitionConfig(getNumaflowControllerDefinitions()) + + err = (&NumaflowControllerRolloutReconciler{ + client: k8sManager.GetClient(), + scheme: k8sManager.GetScheme(), + restConfig: cfg, + rawConfig: cfg, + kubectl: kubernetes.NewKubectl(), + stateCache: stateCache, + }).SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred()) + go func() { defer GinkgoRecover() err = k8sManager.Start(ctrl.SetupSignalHandler()) @@ -164,3 +202,66 @@ func downloadCRD(url string, downloadDir string) { _, err = io.Copy(out, resp.Body) Expect(err).ToNot(HaveOccurred()) } + +func getNumaflowControllerDefinitions() config.NumaflowControllerDefinitionConfig { + // Read definitions config file + // TODO: use this file instead "../../tests/config/controller-definitions-config.yaml" + data, err := os.ReadFile("../../config/manager/numaflow-controller-definitions-config.yaml") + Expect(err).ToNot(HaveOccurred()) + + // Decode the yaml into a ConfigMap object + configMap := corev1.ConfigMap{} + err = yaml.NewYAMLOrJSONDecoder(strings.NewReader(string(data)), len(data)).Decode(&configMap) + Expect(err).ToNot(HaveOccurred()) + + // Decode the sub-yaml string into a NumaflowControllerDefinitionConfig object + mp := configMap.Data["controller_definitions.yaml"] + ncdc := config.NumaflowControllerDefinitionConfig{} + err = yaml.NewYAMLOrJSONDecoder(strings.NewReader(mp), len(mp)).Decode(&ncdc) + Expect(err).ToNot(HaveOccurred()) + + return ncdc +} + +// verifyAutoHealing tests the auto healing feature +func verifyAutoHealing(ctx context.Context, gvk schema.GroupVersionKind, namespace string, resourceName string, pathToValue string, newValue any) { + lookupKey := types.NamespacedName{Name: resourceName, Namespace: namespace} + + // Get current resource + currentResource := unstructured.Unstructured{} + currentResource.SetGroupVersionKind(gvk) + Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, ¤tResource) + }, timeout, interval).Should(Succeed()) + Expect(currentResource.Object).ToNot(BeEmpty()) + + // Get the original value at the specified path (pathToValue) + pathSlice := strings.Split(pathToValue, ".") + originalValue, found, err := unstructured.NestedFieldNoCopy(currentResource.Object, pathSlice...) + Expect(err).ToNot(HaveOccurred()) + Expect(found).To(BeTrue()) + + // Set new value and update resource + err = unstructured.SetNestedField(currentResource.Object, newValue, pathSlice...) + Expect(err).ToNot(HaveOccurred()) + Expect(k8sClient.Update(ctx, ¤tResource)).ToNot(HaveOccurred()) + + // Get updated resource and the value at the specified path (pathToValue) + e := Eventually(func() (any, error) { + updatedResource := unstructured.Unstructured{} + updatedResource.SetGroupVersionKind(gvk) + if err := k8sClient.Get(ctx, lookupKey, &updatedResource); err != nil { + return nil, err + } + + currentValue, found, err := unstructured.NestedFieldNoCopy(updatedResource.Object, pathSlice...) + Expect(err).ToNot(HaveOccurred()) + Expect(found).To(BeTrue()) + + return currentValue, nil + }, timeout, interval) + + // Verify that the value matches the original value and not the new value + e.Should(Equal(originalValue)) + e.ShouldNot(Equal(newValue)) +}