Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Numaflow Controller Rollout Auto Healing #75

Merged
merged 4 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 4 additions & 28 deletions internal/controller/isbservicerollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = Describe("ISBServiceRollout Controller", func() {
var _ = Describe("ISBServiceRollout Controller", Ordered, func() {
const (
namespace = "default"
isbServiceRolloutName = "isbservicerollout-test"
timeout = 10 * time.Second
duration = 10 * time.Second
interval = 250 * time.Millisecond
)

ctx := context.Background()
Expand Down Expand Up @@ -256,30 +253,9 @@ var _ = Describe("ISBServiceRollout Controller", func() {

})

It("Should auto heal the InterStepBufferService with the ISBServiceRollout interstepbufferservice spec when the InterStepBufferService spec is changed", func() {
By("updating the InterStepBufferService")
currentISBService := &numaflowv1.InterStepBufferService{}
Expect(k8sClient.Get(ctx, resourceLookupKey, currentISBService)).To(Succeed())

originalJetstreamVersion := currentISBService.Spec.JetStream.Version
newJetstreamVersion := "1.2.3"
currentISBService.Spec.JetStream.Version = newJetstreamVersion

Expect(k8sClient.Update(ctx, currentISBService)).ToNot(HaveOccurred())

By("Verifying the changed field of the InterStepBufferService is the same as the original and not the modified version")
e := Consistently(func() (string, error) {
updatedResource := &numaflowv1.InterStepBufferService{}
err := k8sClient.Get(ctx, resourceLookupKey, updatedResource)
if err != nil {
return "", err
}

return updatedResource.Spec.JetStream.Version, nil
}, duration, interval)

e.Should(Equal(originalJetstreamVersion))
e.ShouldNot(Equal(newJetstreamVersion))
It("Should auto heal the InterStepBufferService with the ISBServiceRollout pipeline spec when the InterStepBufferService spec is changed", func() {
By("updating the InterStepBufferService and verifying the changed field is the same as the original and not the modified version")
verifyAutoHealing(ctx, numaflowv1.ISBGroupVersionKind, namespace, isbServiceRolloutName, "spec.jetstream.version", "1.2.3.4.5")
})

It("Should delete the ISBServiceRollout and InterStepBufferService", func() {
Expand Down
65 changes: 35 additions & 30 deletions internal/controller/numaflowcontrollerrollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,23 @@ import (
gitopsSync "github.com/argoproj/gitops-engine/pkg/sync"
gitopsSyncCommon "github.com/argoproj/gitops-engine/pkg/sync/common"
kubeUtil "github.com/argoproj/gitops-engine/pkg/utils/kube"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
runtimecontroller "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/numaproj/numaplane/internal/common"
"github.com/numaproj/numaplane/internal/controller/config"
Expand All @@ -51,6 +58,7 @@ const (

const (
ControllerNumaflowControllerRollout = "numaflow-controller-rollout-controller"
NumaflowControllerDeploymentName = "numaflow-controller"
)

// NumaflowControllerRolloutReconciler reconciles a NumaflowControllerRollout object
Expand Down Expand Up @@ -135,7 +143,16 @@ func (r *NumaflowControllerRolloutReconciler) Reconcile(ctx context.Context, req

// Update the Status subresource
if numaflowControllerRollout.DeletionTimestamp.IsZero() { // would've already been deleted
if err := r.client.Status().Update(ctx, numaflowControllerRollout); err != nil {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
latestNumaflowControllerRollout := &apiv1.NumaflowControllerRollout{}
if err := r.client.Get(ctx, req.NamespacedName, latestNumaflowControllerRollout); err != nil {
return err
}

latestNumaflowControllerRollout.Status = numaflowControllerRollout.Status
return r.client.Status().Update(ctx, latestNumaflowControllerRollout)
})
if err != nil {
numaLogger.Error(err, "Error Updating NumaflowControllerRollout Status", "NumaflowControllerRollout", numaflowControllerRollout)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -352,38 +369,26 @@ func (r *NumaflowControllerRolloutReconciler) getResourceOperations() (kubeUtil.

// SetupWithManager sets up the controller with the Manager.
func (r *NumaflowControllerRolloutReconciler) SetupWithManager(mgr ctrl.Manager) error {
controller, err := runtimecontroller.New(ControllerNumaflowControllerRollout, mgr, runtimecontroller.Options{Reconciler: r})
if err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
// Reconcile NumaflowControllerRollouts when there's been a Generation changed (i.e. Spec change)
For(&apiv1.NumaflowControllerRollout{}).WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)

/*
controller, err := runtimecontroller.New(ControllerNumaflowControllerRollout, mgr, runtimecontroller.Options{Reconciler: r})

if err != nil {
return err
}

// Watch NumaflowControllerRollouts

if err := controller.Watch(source.Kind(mgr.GetCache(), &apiv1.NumaflowControllerRollout{}), &handler.EnqueueRequestForObject{}, predicate.GenerationChangedPredicate{}); err != nil {
return err
}

// Watch Deployments of numaflow-controller
// Can add other resources as well
numaflowControllerDeployments := appv1.Deployment{}
numaflowControllerDeployments.Name = "numaflow-controller" // not sure if this would work or not
if err := controller.Watch(source.Kind(mgr.GetCache(), &numaflowControllerDeployments),
// Watch NumaflowControllerRollouts
if err := controller.Watch(source.Kind(mgr.GetCache(), &apiv1.NumaflowControllerRollout{}), &handler.EnqueueRequestForObject{}, predicate.GenerationChangedPredicate{}); err != nil {
return err
}

handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &apiv1.NumaflowControllerRollout{}, handler.OnlyControllerOwner()),
predicate.GenerationChangedPredicate{}); err != nil {
return err
}
// Watch NumaflowControllerRollout child resources: numaflow-controller Deployment, ConfigMap, ServiceAccount, Role, RoleBinding
for _, kind := range []client.Object{&appsv1.Deployment{}, &corev1.ConfigMap{}, &corev1.ServiceAccount{}, &rbacv1.Role{}, &rbacv1.RoleBinding{}} {
if err := controller.Watch(source.Kind(mgr.GetCache(), kind),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &apiv1.NumaflowControllerRollout{}, handler.OnlyControllerOwner()),
predicate.GenerationChangedPredicate{}); err != nil {
return err
}
}

return nil
*/
return nil
}

// SplitYAMLToString splits a YAML file into strings. Returns list of yamls
Expand Down
59 changes: 28 additions & 31 deletions internal/controller/numaflowcontrollerrollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,30 @@ package controller

import (
"context"
"strings"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"strings"

apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
)

var _ = Describe("NumaflowControllerRollout Controller", func() {
var _ = Describe("NumaflowControllerRollout Controller", Ordered, func() {
const (
namespace = "default"
namespace = "default"
resourceName = "numaflow-controller"
)
Context("When reconciling a resource", func() {
const resourceName = "numaflow-controller"

ctx := context.Background()
ctx := context.Background()

Context("When reconciling a resource", func() {
typeNamespacedName := types.NamespacedName{
Name: resourceName,
Namespace: namespace, // TODO(user):Modify as needed
Namespace: namespace,
}

numaflowControllerRollout := apiv1.NumaflowControllerRollout{
Expand All @@ -51,13 +54,6 @@ var _ = Describe("NumaflowControllerRollout Controller", func() {
},
}

AfterEach(func() {
// Cleanup the resource after each test and ignore the error if it doesn't exist
resource := &apiv1.NumaflowControllerRollout{}
_ = k8sClient.Get(ctx, typeNamespacedName, resource)
_ = k8sClient.Delete(ctx, resource)
})

It("Should throw a CR validation error", func() {
By("Creating a NumaflowControllerRollout resource with an invalid name")
resource := numaflowControllerRollout
Expand All @@ -79,21 +75,22 @@ var _ = Describe("NumaflowControllerRollout Controller", func() {
Expect(err.Error()).To(ContainSubstring("numaflowcontrollerrollouts.numaplane.numaproj.io \"numaflow-controller\" already exists"))
})

// It("should successfully reconcile the resource", func() {
// By("Reconciling the created resource")
// controllerReconciler := &NumaflowControllerRolloutReconciler{
// client: k8sClient,
// scheme: k8sClient.Scheme(),
// restConfig: cfg,
// }

// _, err := controllerReconciler.Reconcile(ctx, sigsReconcile.Request{
// NamespacedName: typeNamespacedName,
// })
// Expect(err).NotTo(HaveOccurred())
// // TODO(user): Add more specific assertions depending on your controller's reconciliation logic.
// // Example: If you expect a certain status condition after reconciliation, verify it here.
// })
It("Should auto heal the Numaflow Controller Deployment with the spec based on the NumaflowControllerRollout version field value when the Deployment spec is changed", func() {
By("updating the Numaflow Controller Deployment and verifying the changed field is the same as the original and not the modified version")
verifyAutoHealing(ctx, appsv1.SchemeGroupVersion.WithKind("Deployment"), namespace, "numaflow-controller", "spec.template.spec.serviceAccountName", "someothersaname")
})

It("Should auto heal the numaflow-cmd-params-config ConfigMap with the spec based on the NumaflowControllerRollout version field value when the ConfigMap spec is changed", func() {
By("updating the numaflow-cmd-params-config ConfigMap and verifying the changed field is the same as the original and not the modified version")
verifyAutoHealing(ctx, corev1.SchemeGroupVersion.WithKind("ConfigMap"), namespace, "numaflow-cmd-params-config", "data.namespaced", "false")
})

AfterAll(func() {
// Cleanup the resource after each test and ignore the error if it doesn't exist
resource := &apiv1.NumaflowControllerRollout{}
_ = k8sClient.Get(ctx, typeNamespacedName, resource)
_ = k8sClient.Delete(ctx, resource)
})
})
})

Expand Down Expand Up @@ -192,19 +189,19 @@ spec:
Namespace: "default",
},
}
It("should apply ownership reference correctly", func() {

It("should apply ownership reference correctly", func() {
manifests, err := applyOwnershipToManifests(manifests, resource)
Expect(err).To(BeNil())
Expect(strings.TrimSpace(manifests[0])).To(Equal(strings.TrimSpace(emanifests[0])))
Expect(strings.TrimSpace(manifests[1])).To(Equal(strings.TrimSpace(emanifests[1])))
})

It("should not apply ownership if it already exists", func() {
manifests, err := applyOwnershipToManifests(emanifests, resource)
Expect(err).To(BeNil())
Expect(strings.TrimSpace(manifests[0])).To(Equal(strings.TrimSpace(emanifests[0])))
Expect(strings.TrimSpace(manifests[1])).To(Equal(strings.TrimSpace(emanifests[1])))

})
})
})
30 changes: 3 additions & 27 deletions internal/controller/pipelinerollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,10 @@ import (
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
)

var _ = Describe("PipelineRollout Controller", func() {
var _ = Describe("PipelineRollout Controller", Ordered, func() {
const (
namespace = "default"
pipelineRolloutName = "pipelinerollout-test"
timeout = 10 * time.Second
duration = 10 * time.Second
interval = 250 * time.Millisecond
)

ctx := context.Background()
Expand Down Expand Up @@ -284,29 +281,8 @@ var _ = Describe("PipelineRollout Controller", func() {
})

It("Should auto heal the Numaflow Pipeline with the PipelineRollout pipeline spec when the Numaflow Pipeline spec is changed", func() {
By("updating the Numaflow Pipeline")
currentPipeline := &numaflowv1.Pipeline{}
Expect(k8sClient.Get(ctx, resourceLookupKey, currentPipeline)).To(Succeed())

originalISBServiceName := currentPipeline.Spec.InterStepBufferServiceName
newISBServiceName := "my-isbsvc-updated-in-child"
currentPipeline.Spec.InterStepBufferServiceName = newISBServiceName

Expect(k8sClient.Update(ctx, currentPipeline)).ToNot(HaveOccurred())

By("Verifying the changed field of the Numaflow Pipeline is the same as the original and not the modified version")
e := Consistently(func() (string, error) {
updatedResource := &numaflowv1.Pipeline{}
err := k8sClient.Get(ctx, resourceLookupKey, updatedResource)
if err != nil {
return "", err
}

return updatedResource.Spec.InterStepBufferServiceName, nil
}, duration, interval)

e.Should(Equal(originalISBServiceName))
e.ShouldNot(Equal(newISBServiceName))
By("updating the Numaflow Pipeline and verifying the changed field is the same as the original and not the modified version")
verifyAutoHealing(ctx, numaflowv1.PipelineGroupVersionKind, namespace, pipelineRolloutName, "spec.interStepBufferServiceName", "someotherisbsname")
})

It("Should delete the PipelineRollout and Numaflow Pipeline", func() {
Expand Down
Loading
Loading