Skip to content

Commit

Permalink
Add event recorder for isb service, pipeline and numaflow controller
Browse files Browse the repository at this point in the history
Signed-off-by: chandankumar4 <[email protected]>
  • Loading branch information
chandankumar4 committed Aug 12, 2024
1 parent 9300aca commit 3667dc9
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 12 deletions.
3 changes: 3 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func main() {
mgr.GetScheme(),
newRawConfig,
customMetrics,
mgr.GetEventRecorderFor(apiv1.RolloutPipeline),
)

if err = pipelineRolloutReconciler.SetupWithManager(mgr); err != nil {
Expand All @@ -166,6 +167,7 @@ func main() {
newRawConfig,
kubectl,
customMetrics,
mgr.GetEventRecorderFor(apiv1.RolloutNumaflowController),
)
if err != nil {
numaLogger.Fatal(err, "Unable to create NumaflowControllerRollout controller")
Expand All @@ -180,6 +182,7 @@ func main() {
mgr.GetScheme(),
newRawConfig,
customMetrics,
mgr.GetEventRecorderFor(apiv1.RolloutISBSvc),
)

if err = isbServiceRolloutReconciler.SetupWithManager(mgr); err != nil {
Expand Down
13 changes: 13 additions & 0 deletions internal/controller/isbservicerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -32,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/selection"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
runtimecontroller "sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -59,20 +61,23 @@ type ISBServiceRolloutReconciler struct {
scheme *runtime.Scheme
restConfig *rest.Config
customMetrics *metrics.CustomMetrics
recorder record.EventRecorder
}

func NewISBServiceRolloutReconciler(
client client.Client,
s *runtime.Scheme,
restConfig *rest.Config,
customMetrics *metrics.CustomMetrics,
recorder record.EventRecorder,
) *ISBServiceRolloutReconciler {

return &ISBServiceRolloutReconciler{
client,
s,
restConfig,
customMetrics,
recorder,
}
}

Expand All @@ -99,6 +104,7 @@ func (r *ISBServiceRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Re
} else {
r.customMetrics.ISBServicesSyncFailed.WithLabelValues().Inc()
numaLogger.Error(err, "Unable to get ISBServiceRollout", "request", req)
r.recorder.Eventf(isbServiceRollout, corev1.EventTypeWarning, "GetISBServiceFailed", "Failed to get isb service rollout: %v", err.Error())
return ctrl.Result{}, err
}
}
Expand All @@ -112,10 +118,12 @@ func (r *ISBServiceRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Re
result, err := r.reconcile(ctx, isbServiceRollout, syncStartTime)
if err != nil {
numaLogger.Errorf(err, "ISBServiceRollout %v reconcile returned error: %v", req.NamespacedName, err)
r.recorder.Eventf(isbServiceRollout, corev1.EventTypeWarning, "ReconcileFailed", "Failed to reconcile isb service rollout: %v", err.Error())
r.customMetrics.ISBServicesSyncFailed.WithLabelValues().Inc()
statusUpdateErr := r.updateISBServiceRolloutStatusToFailed(ctx, isbServiceRollout, err)
if statusUpdateErr != nil {
r.customMetrics.ISBServicesSyncFailed.WithLabelValues().Inc()
r.recorder.Eventf(isbServiceRollout, corev1.EventTypeWarning, "UpdateStatusFailed", "Failed to update isb service rollout status: %v", err.Error())
return ctrl.Result{}, statusUpdateErr
}
return ctrl.Result{}, err
Expand All @@ -126,10 +134,12 @@ func (r *ISBServiceRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Re
isbServiceRolloutStatus := isbServiceRollout.Status
if err := r.client.Update(ctx, isbServiceRollout); err != nil {
numaLogger.Error(err, "Error Updating ISBServiceRollout", "ISBServiceRollout", isbServiceRollout)
r.recorder.Eventf(isbServiceRollout, corev1.EventTypeWarning, "UpdateFailed", "Failed to update isb service rollout: %v", err.Error())
r.customMetrics.ISBServicesSyncFailed.WithLabelValues().Inc()
statusUpdateErr := r.updateISBServiceRolloutStatusToFailed(ctx, isbServiceRollout, err)
if statusUpdateErr != nil {
r.customMetrics.ISBServicesSyncFailed.WithLabelValues().Inc()
r.recorder.Eventf(isbServiceRollout, corev1.EventTypeWarning, "UpdateStatusFailed", "Failed to update isb service rollout status: %v", err.Error())
return ctrl.Result{}, statusUpdateErr
}
return ctrl.Result{}, err
Expand All @@ -143,12 +153,14 @@ func (r *ISBServiceRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Re
statusUpdateErr := r.updateISBServiceRolloutStatus(ctx, isbServiceRollout)
if statusUpdateErr != nil {
r.customMetrics.ISBServicesSyncFailed.WithLabelValues().Inc()
r.recorder.Eventf(isbServiceRollout, corev1.EventTypeWarning, "UpdateStatusFailed", "Failed to update isb service rollout status: %v", err.Error())
return ctrl.Result{}, statusUpdateErr
}
}

// generate metrics for ISB Service.
r.customMetrics.IncISBServiceMetrics(isbServiceRollout.Name, isbServiceRollout.Namespace)
r.recorder.Eventf(isbServiceRollout, corev1.EventTypeNormal, "ReconcilationSuccessful", "Reconciliation successful")
numaLogger.Debug("reconciliation successful")

return result, nil
Expand Down Expand Up @@ -283,6 +295,7 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont

if common.DataLossPrevention {
return processChildObjectWithoutDataLoss(ctx, isbServiceRollout.Namespace, isbServiceRollout.Name, r, isbServiceNeedsUpdating, isbServiceIsUpdating, func() error {
r.recorder.Eventf(isbServiceRollout, corev1.EventTypeNormal, "PipelinesPaused", "All Pipelines have paused for ISBService update")
err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef)
if err != nil {
return err
Expand Down
16 changes: 15 additions & 1 deletion internal/controller/numaflowcontrollerrollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
runtimecontroller "sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -77,6 +78,7 @@ type NumaflowControllerRolloutReconciler struct {
kubectl kubeUtil.Kubectl
stateCache sync.LiveStateCache
customMetrics *metrics.CustomMetrics
recorder record.EventRecorder
}

func NewNumaflowControllerRolloutReconciler(
Expand All @@ -85,6 +87,7 @@ func NewNumaflowControllerRolloutReconciler(
rawConfig *rest.Config,
kubectl kubeUtil.Kubectl,
customMetrics *metrics.CustomMetrics,
recorder record.EventRecorder,
) (*NumaflowControllerRolloutReconciler, error) {
stateCache := sync.NewLiveStateCache(rawConfig, customMetrics)
numaLogger := logger.GetBaseLogger().WithName("state cache").WithValues("numaflowcontrollerrollout")
Expand All @@ -106,6 +109,7 @@ func NewNumaflowControllerRolloutReconciler(
kubectl,
stateCache,
customMetrics,
recorder,
}, nil
}

Expand All @@ -132,6 +136,7 @@ func (r *NumaflowControllerRolloutReconciler) Reconcile(ctx context.Context, req
} else {
r.customMetrics.NumaflowControllersSyncFailed.WithLabelValues().Inc()
numaLogger.Error(err, "Unable to get NumaflowControllerRollout", "request", req)
r.recorder.Eventf(numaflowControllerRollout, corev1.EventTypeWarning, "GetNumaflowControllerFailed", "Failed to get numaflow controller rollout: %v", err.Error())
return ctrl.Result{}, err
}
}
Expand All @@ -145,23 +150,27 @@ func (r *NumaflowControllerRolloutReconciler) Reconcile(ctx context.Context, req
result, err := r.reconcile(ctx, numaflowControllerRollout, req.Namespace, syncStartTime)
if err != nil {
numaLogger.Errorf(err, "NumaflowControllerRollout %v reconcile returned error: %v", req.NamespacedName, err)
r.recorder.Eventf(numaflowControllerRollout, corev1.EventTypeWarning, "ReconcileFailed", "Failed to reconcile numaflow controller rollout: %v", err.Error())
r.customMetrics.NumaflowControllersSyncFailed.WithLabelValues().Inc()
statusUpdateErr := r.updateNumaflowControllerRolloutStatusToFailed(ctx, numaflowControllerRollout, err)
if statusUpdateErr != nil {
r.customMetrics.NumaflowControllersSyncFailed.WithLabelValues().Inc()
r.recorder.Eventf(numaflowControllerRollout, corev1.EventTypeWarning, "UpdateStatusFailed", "Failed to update status of numaflow controller rollout: %v", statusUpdateErr.Error())
return ctrl.Result{}, statusUpdateErr
}
return ctrl.Result{}, err
}

deployment, _, err := r.getNumaflowControllerDeployment(ctx, numaflowControllerRollout)
if err != nil {
r.recorder.Eventf(numaflowControllerRollout, corev1.EventTypeWarning, "GetDeploymentFailed", "Failed to get numaflow controller deployment: %v", err.Error())
return ctrl.Result{}, err
}

// update our Status with the Deployment's Status
err = r.processNumaflowControllerStatus(ctx, numaflowControllerRollout, deployment)
if err != nil {
r.recorder.Eventf(numaflowControllerRollout, corev1.EventTypeWarning, "ProcessStatusFailed", "Failed to process numaflow controller status: %v", err.Error())
return ctrl.Result{}, err
}

Expand All @@ -170,10 +179,12 @@ func (r *NumaflowControllerRolloutReconciler) Reconcile(ctx context.Context, req
numaflowControllerRolloutStatus := numaflowControllerRollout.Status
if err := r.client.Update(ctx, numaflowControllerRollout); err != nil {
numaLogger.Error(err, "Error Updating NumaflowControllerRollout", "NumaflowControllerRollout", numaflowControllerRollout)
r.recorder.Eventf(numaflowControllerRollout, corev1.EventTypeWarning, "UpdateFailed", "Failed to update numaflow controller rollout: %v", err.Error())
r.customMetrics.NumaflowControllersSyncFailed.WithLabelValues().Inc()
statusUpdateErr := r.updateNumaflowControllerRolloutStatusToFailed(ctx, numaflowControllerRollout, err)
if statusUpdateErr != nil {
r.customMetrics.NumaflowControllersSyncFailed.WithLabelValues().Inc()
r.recorder.Eventf(numaflowControllerRollout, corev1.EventTypeWarning, "UpdateStatusFailed", "Failed to update status of numaflow controller rollout: %v", statusUpdateErr.Error())
return ctrl.Result{}, statusUpdateErr
}
return ctrl.Result{}, err
Expand All @@ -187,6 +198,7 @@ func (r *NumaflowControllerRolloutReconciler) Reconcile(ctx context.Context, req
statusUpdateErr := r.updateNumaflowControllerRolloutStatus(ctx, numaflowControllerRollout)
if statusUpdateErr != nil {
r.customMetrics.NumaflowControllersSyncFailed.WithLabelValues().Inc()
r.recorder.Eventf(numaflowControllerRollout, corev1.EventTypeWarning, "UpdateStatusFailed", "Failed to update status of numaflow controller rollout: %v", statusUpdateErr.Error())
return ctrl.Result{}, statusUpdateErr
}
}
Expand All @@ -195,7 +207,7 @@ func (r *NumaflowControllerRolloutReconciler) Reconcile(ctx context.Context, req
r.customMetrics.IncNumaflowControllerMetrics(numaflowControllerRollout.Name, numaflowControllerRollout.Namespace, numaflowControllerRollout.Spec.Controller.Version)

numaLogger.Debug("reconciliation successful")

r.recorder.Eventf(numaflowControllerRollout, corev1.EventTypeNormal, "ReconcileSuccess", "Reconciliation successful")
return result, nil
}

Expand Down Expand Up @@ -225,6 +237,7 @@ func (r *NumaflowControllerRolloutReconciler) reconcile(

if !controllerRollout.DeletionTimestamp.IsZero() {
numaLogger.Info("Deleting NumaflowControllerRollout")
r.recorder.Eventf(controllerRollout, corev1.EventTypeNormal, "Deleting", "Deleting NumaflowControllerRollout")
if controllerutil.ContainsFinalizer(controllerRollout, finalizerName) {
GetPauseModule().deletePauseRequest(controllerKey)
controllerutil.RemoveFinalizer(controllerRollout, finalizerName)
Expand Down Expand Up @@ -275,6 +288,7 @@ func (r *NumaflowControllerRolloutReconciler) reconcile(

needsRequeue, err := processChildObjectWithoutDataLoss(ctx, controllerRollout.Namespace, controllerRollout.Name, r, controllerDeploymentNeedsUpdating,
controllerDeploymentIsUpdating, func() error {
r.recorder.Eventf(controllerRollout, corev1.EventTypeNormal, "AllPipelinesPaused", "All Pipelines have paused so Numaflow Controller can safely update")
phase, err := r.sync(controllerRollout, namespace, numaLogger)
if err != nil {
return err
Expand Down
18 changes: 17 additions & 1 deletion internal/controller/pipelinerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -73,13 +74,15 @@ type PipelineRolloutReconciler struct {
shutdownWorkerWaitGroup *sync.WaitGroup
// customMetrics is used to generate the custom metrics for the Pipeline
customMetrics *metrics.CustomMetrics
recorder record.EventRecorder
}

func NewPipelineRolloutReconciler(
client client.Client,
s *runtime.Scheme,
restConfig *rest.Config,
customMetrics *metrics.CustomMetrics,
recorder record.EventRecorder,
) *PipelineRolloutReconciler {

numaLogger := logger.GetBaseLogger().WithName(loggerName)
Expand All @@ -97,6 +100,7 @@ func NewPipelineRolloutReconciler(
pipelineRolloutQueue,
&sync.WaitGroup{},
customMetrics,
recorder,
}
pipelineROReconciler = r

Expand Down Expand Up @@ -142,6 +146,7 @@ func (r *PipelineRolloutReconciler) processPipelineRollout(ctx context.Context,
} else {
r.customMetrics.PipelinesSyncFailed.WithLabelValues().Inc()
numaLogger.Error(err, "Unable to get PipelineRollout")
r.recorder.Eventf(pipelineRollout, "Warning", "GetPipelineRolloutFailed", "Failed to get PipelineRollout: %v", err)
return ctrl.Result{}, err
}
}
Expand All @@ -154,9 +159,11 @@ func (r *PipelineRolloutReconciler) processPipelineRollout(ctx context.Context,

requeue, err := r.reconcile(ctx, pipelineRollout, syncStartTime)
if err != nil {
r.recorder.Eventf(pipelineRollout, "Warning", "ReconcileFailed", "Failed to reconcile PipelineRollout: %v", err)
statusUpdateErr := r.updatePipelineRolloutStatusToFailed(ctx, pipelineRollout, err)
if statusUpdateErr != nil {
r.customMetrics.PipelinesSyncFailed.WithLabelValues().Inc()
r.recorder.Eventf(pipelineRollout, "Warning", "UpdateStatusFailed", "Failed to update PipelineRollout status: %v", statusUpdateErr)
return ctrl.Result{}, statusUpdateErr
}

Expand All @@ -166,9 +173,11 @@ func (r *PipelineRolloutReconciler) processPipelineRollout(ctx context.Context,
// Update PipelineRollout Status based on child resource (Pipeline) Status
err = r.processPipelineStatus(ctx, pipelineRollout)
if err != nil {
r.recorder.Eventf(pipelineRollout, "Warning", "ProcessPipelineStatusFailed", "Failed to process Pipeline Status: %v", err)
statusUpdateErr := r.updatePipelineRolloutStatusToFailed(ctx, pipelineRollout, err)
if statusUpdateErr != nil {
r.customMetrics.PipelinesSyncFailed.WithLabelValues().Inc()
r.recorder.Eventf(pipelineRollout, "Warning", "UpdateStatusFailed", "Failed to update PipelineRollout status: %v", statusUpdateErr)
return ctrl.Result{}, statusUpdateErr
}

Expand All @@ -180,13 +189,14 @@ func (r *PipelineRolloutReconciler) processPipelineRollout(ctx context.Context,
pipelineRolloutStatus := pipelineRollout.Status
if err := r.client.Update(ctx, pipelineRollout); err != nil {
numaLogger.Error(err, "Error Updating PipelineRollout", "PipelineRollout", pipelineRollout)
r.recorder.Eventf(pipelineRollout, "Warning", "UpdateFailed", "Failed to update PipelineRollout: %v", err)
r.customMetrics.PipelinesSyncFailed.WithLabelValues().Inc()
statusUpdateErr := r.updatePipelineRolloutStatusToFailed(ctx, pipelineRollout, err)
if statusUpdateErr != nil {
r.customMetrics.PipelinesSyncFailed.WithLabelValues().Inc()
r.recorder.Eventf(pipelineRollout, "Warning", "UpdateStatusFailed", "Failed to update PipelineRollout status: %v", statusUpdateErr)
return ctrl.Result{}, statusUpdateErr
}

return ctrl.Result{}, err
}
// restore the original status, which would've been wiped in the previous call to Update()
Expand All @@ -198,6 +208,7 @@ func (r *PipelineRolloutReconciler) processPipelineRollout(ctx context.Context,
statusUpdateErr := r.updatePipelineRolloutStatus(ctx, pipelineRollout)
if statusUpdateErr != nil {
r.customMetrics.PipelinesSyncFailed.WithLabelValues().Inc()
r.recorder.Eventf(pipelineRollout, "Warning", "UpdateStatusFailed", "Failed to update PipelineRollout status: %v", statusUpdateErr)
return ctrl.Result{}, statusUpdateErr
}
}
Expand All @@ -209,6 +220,7 @@ func (r *PipelineRolloutReconciler) processPipelineRollout(ctx context.Context,
return ctrl.Result{Requeue: true, RequeueAfter: 30 * time.Second}, nil
}

r.recorder.Eventf(pipelineRollout, "Normal", "ReconcileSuccess", "Reconciliation successful")
numaLogger.Debug("reconciliation successful")

return ctrl.Result{}, nil
Expand Down Expand Up @@ -455,6 +467,7 @@ func (r *PipelineRolloutReconciler) processExistingPipelineWithoutDataLoss(ctx c
if pipelineNeedsToUpdate {
if !*shouldBePaused || (*shouldBePaused && isPipelinePaused(ctx, existingPipelineDef)) {
numaLogger.Infof("it's safe to update Pipeline so updating now")
r.recorder.Eventf(pipelineRollout, "Normal", "PipelineUpdate", "it's safe to update Pipeline so updating now")
err = updatePipelineSpec(ctx, r.restConfig, newPipelineDef)
if err != nil {
return err
Expand Down Expand Up @@ -560,18 +573,21 @@ func (r *PipelineRolloutReconciler) setPipelineLifecycle(ctx context.Context, pa

if paused && !lifeCycleIsPaused {
numaLogger.Info("pausing pipeline")
r.recorder.Eventf(existingPipelineDef, "Normal", "PipelinePause", "pausing pipeline")
if err := GetPauseModule().pausePipeline(ctx, r.restConfig, existingPipelineDef); err != nil {
return err
}
} else if !paused && lifeCycleIsPaused {
numaLogger.Info("resuming pipeline")
r.recorder.Eventf(existingPipelineDef, "Normal", "PipelineResume", "resuming pipeline")

run, err := GetPauseModule().runPipelineIfSafe(ctx, r.restConfig, existingPipelineDef)
if err != nil {
return err
}
if !run {
numaLogger.Infof("new pause request, can't resume pipeline at this time, will try again later")
r.recorder.Eventf(existingPipelineDef, "Normal", "PipelineResumeFailed", "new pause request, can't resume pipeline at this time, will try again later")
}
}
return nil
Expand Down
16 changes: 6 additions & 10 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,12 @@ var _ = BeforeSuite(func() {

customMetrics = metrics.RegisterCustomMetrics()

err = NewPipelineRolloutReconciler(
k8sManager.GetClient(),
k8sManager.GetScheme(),
cfg, customMetrics).SetupWithManager(k8sManager)
err = NewPipelineRolloutReconciler(k8sManager.GetClient(), k8sManager.GetScheme(), cfg, customMetrics,
k8sManager.GetEventRecorderFor(apiv1.RolloutPipeline)).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

err = NewISBServiceRolloutReconciler(
k8sManager.GetClient(),
k8sManager.GetScheme(),
cfg, customMetrics).SetupWithManager(k8sManager)
err = NewISBServiceRolloutReconciler(k8sManager.GetClient(), k8sManager.GetScheme(), cfg, customMetrics,
k8sManager.GetEventRecorderFor(apiv1.RolloutISBSvc)).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

stateCache := sync.NewLiveStateCache(cfg, customMetrics)
Expand All @@ -160,8 +156,8 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred())
config.GetConfigManagerInstance().GetControllerDefinitionsMgr().UpdateControllerDefinitionConfig(getNumaflowControllerDefinitions())

numaflowControllerReconciler, err := NewNumaflowControllerRolloutReconciler(k8sManager.GetClient(),
k8sManager.GetScheme(), cfg, kubernetes.NewKubectl(), customMetrics)
numaflowControllerReconciler, err := NewNumaflowControllerRolloutReconciler(k8sManager.GetClient(), k8sManager.GetScheme(),
cfg, kubernetes.NewKubectl(), customMetrics, k8sManager.GetEventRecorderFor(apiv1.RolloutNumaflowController))
Expect(err).ToNot(HaveOccurred())
err = numaflowControllerReconciler.SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())
Expand Down
Loading

0 comments on commit 3667dc9

Please sign in to comment.