diff --git a/CHANGELOG.md b/CHANGELOG.md index da19e7796fa..1a8e2cf3d02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ ### New - TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- Emit Kubernetes Events on KEDA events ([#1523](https://github.com/kedacore/keda/pull/1523)) ### Improvements diff --git a/adapter/main.go b/adapter/main.go index 655163fa968..a7103c050b2 100644 --- a/adapter/main.go +++ b/adapter/main.go @@ -8,6 +8,9 @@ import ( "strconv" "time" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" + appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/util/wait" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" @@ -70,7 +73,9 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric return nil, fmt.Errorf("unable to construct new client (%s)", err) } - handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout) + broadcaster := record.NewBroadcaster() + recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"}) + handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout, recorder) namespace, err := getWatchNamespace() if err != nil { diff --git a/api/v1alpha1/condition_types.go b/api/v1alpha1/condition_types.go index 823a7893e50..2e1c68d450a 100644 --- a/api/v1alpha1/condition_types.go +++ b/api/v1alpha1/condition_types.go @@ -115,6 +115,14 @@ func (c *Conditions) GetActiveCondition() Condition { return c.getCondition(ConditionActive) } +// GetReadyCondition returns Condition of type Ready +func (c *Conditions) GetReadyCondition() Condition { + if *c == nil { + c = GetInitializedConditions() + } + return c.getCondition(ConditionReady) +} + func (c Conditions) getCondition(conditionType ConditionType) Condition { for i := range c { if c[i].Type == conditionType { diff --git a/controllers/scaledjob_controller.go b/controllers/scaledjob_controller.go index 8585137b6e0..8c4b0c84d67 100644 --- a/controllers/scaledjob_controller.go +++ b/controllers/scaledjob_controller.go @@ -5,6 +5,11 @@ import ( "fmt" "time" + "github.com/kedacore/keda/v2/pkg/eventreason" + corev1 "k8s.io/api/core/v1" + + "k8s.io/client-go/tools/record" + "github.com/go-logr/logr" batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -29,12 +34,13 @@ type ScaledJobReconciler struct { Log logr.Logger Scheme *runtime.Scheme GlobalHTTPTimeout time.Duration + Recorder record.EventRecorder scaleHandler scaling.ScaleHandler } // SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance. func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager) error { - r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout) + r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler")) return ctrl.NewControllerManagedBy(mgr). // Ignore updates to ScaledJob Status (in this case metadata.Generation does not change) @@ -84,7 +90,12 @@ func (r *ScaledJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { reqLogger.Error(err, msg) conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg) conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed") + r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg) } else { + wasReady := conditions.GetReadyCondition() + if wasReady.IsFalse() || wasReady.IsUnknown() { + r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling") + } reqLogger.V(1).Info(msg) conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg) } diff --git a/controllers/scaledjob_finalizer.go b/controllers/scaledjob_finalizer.go index aa0ca78c7a6..a31ba4669f8 100644 --- a/controllers/scaledjob_finalizer.go +++ b/controllers/scaledjob_finalizer.go @@ -3,6 +3,9 @@ package controllers import ( "context" + "github.com/kedacore/keda/v2/pkg/eventreason" + corev1 "k8s.io/api/core/v1" + "github.com/go-logr/logr" kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" @@ -33,6 +36,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *k } logger.Info("Successfully finalized ScaledJob") + r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobDeleted, "ScaledJob was deleted") return nil } diff --git a/controllers/scaledobject_controller.go b/controllers/scaledobject_controller.go index a308b8d9955..e9321f5f131 100644 --- a/controllers/scaledobject_controller.go +++ b/controllers/scaledobject_controller.go @@ -6,6 +6,10 @@ import ( "sync" "time" + "github.com/kedacore/keda/v2/pkg/eventreason" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" + "github.com/go-logr/logr" autoscalingv1 "k8s.io/api/autoscaling/v1" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" @@ -46,6 +50,7 @@ type ScaledObjectReconciler struct { Client client.Client Scheme *runtime.Scheme GlobalHTTPTimeout time.Duration + Recorder record.EventRecorder scaleClient *scale.ScalesGetter restMapper meta.RESTMapper @@ -91,7 +96,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error { // Init the rest of ScaledObjectReconciler r.restMapper = mgr.GetRESTMapper() r.scaledObjectsGenerations = &sync.Map{} - r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.GlobalHTTPTimeout) + r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.GlobalHTTPTimeout, r.Recorder) // Start controller return ctrl.NewControllerManagedBy(mgr). @@ -159,13 +164,20 @@ func (r *ScaledObjectReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error reqLogger.Error(err, msg) conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg) conditions.SetActiveCondition(metav1.ConditionUnknown, "UnkownState", "ScaledObject check failed") + r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, msg) } else { + wasReady := conditions.GetReadyCondition() + if wasReady.IsFalse() || wasReady.IsUnknown() { + r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectReady, "ScaledObject is ready for scaling") + } reqLogger.V(1).Info(msg) conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg) } + if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, &conditions); err != nil { return ctrl.Result{}, err } + return ctrl.Result{}, err } diff --git a/controllers/scaledobject_finalizer.go b/controllers/scaledobject_finalizer.go index 778ee67ee25..a4e3c72b5c7 100644 --- a/controllers/scaledobject_finalizer.go +++ b/controllers/scaledobject_finalizer.go @@ -3,6 +3,9 @@ package controllers import ( "context" + "github.com/kedacore/keda/v2/pkg/eventreason" + corev1 "k8s.io/api/core/v1" + "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -54,6 +57,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled } logger.Info("Successfully finalized ScaledObject") + r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectDeleted, "ScaledObject was deleted") return nil } diff --git a/controllers/triggerauthentication_controller.go b/controllers/triggerauthentication_controller.go new file mode 100644 index 00000000000..e96cc430391 --- /dev/null +++ b/controllers/triggerauthentication_controller.go @@ -0,0 +1,58 @@ +package controllers + +import ( + "context" + + "github.com/go-logr/logr" + kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" + "github.com/kedacore/keda/v2/pkg/eventreason" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// +kubebuilder:rbac:groups=keda.sh,resources=triggerauthentications;triggerauthentications/status,verbs="*" + +// TriggerAuthenticationReconciler reconciles a TriggerAuthentication object +type TriggerAuthenticationReconciler struct { + Client client.Client + Log logr.Logger + Recorder record.EventRecorder +} + +// Reconcile performs reconciliation on the identified TriggerAuthentication resource based on the request information passed, returns the result and an error (if any). +func (r *TriggerAuthenticationReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { + reqLogger := r.Log.WithValues("TriggerAuthentication.Namespace", req.Namespace, "TriggerAuthentication.Name", req.Name) + + triggerAuthentication := &kedav1alpha1.TriggerAuthentication{} + err := r.Client.Get(context.TODO(), req.NamespacedName, triggerAuthentication) + if err != nil { + if errors.IsNotFound(err) { + return ctrl.Result{}, nil + } + reqLogger.Error(err, "Failed ot get TriggerAuthentication") + return ctrl.Result{}, err + } + + if triggerAuthentication.GetDeletionTimestamp() != nil { + r.Recorder.Event(triggerAuthentication, corev1.EventTypeNormal, eventreason.TriggerAuthenticationDeleted, "TriggerAuthentication was deleted") + return ctrl.Result{}, nil + } + + if triggerAuthentication.ObjectMeta.Generation == 1 { + r.Recorder.Event(triggerAuthentication, corev1.EventTypeNormal, eventreason.TriggerAuthenticationAdded, "New TriggerAuthentication configured") + } + + return ctrl.Result{}, nil +} + +// SetupWithManager initializes the TriggerAuthenticationReconciler instance and starts a new controller managed by the passed Manager instance. +func (r *TriggerAuthenticationReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&kedav1alpha1.TriggerAuthentication{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Complete(r) +} diff --git a/go.sum b/go.sum index b2a877794fa..e9d1d2de36c 100644 --- a/go.sum +++ b/go.sum @@ -958,6 +958,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/kedacore/keda v1.5.0 h1:c8xA1Vo3H7rPwFiWUX3CBXnjBSrbYDmUs9iEfDlf4bQ= github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= diff --git a/main.go b/main.go index 40f66988efe..8b3786f387b 100644 --- a/main.go +++ b/main.go @@ -123,12 +123,14 @@ func main() { } globalHTTPTimeout := time.Duration(globalHTTPTimeoutMS) * time.Millisecond + eventRecorder := mgr.GetEventRecorderFor("keda-operator") if err = (&controllers.ScaledObjectReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("ScaledObject"), Scheme: mgr.GetScheme(), GlobalHTTPTimeout: globalHTTPTimeout, + Recorder: eventRecorder, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ScaledObject") os.Exit(1) @@ -138,10 +140,19 @@ func main() { Log: ctrl.Log.WithName("controllers").WithName("ScaledJob"), Scheme: mgr.GetScheme(), GlobalHTTPTimeout: globalHTTPTimeout, + Recorder: eventRecorder, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ScaledJob") os.Exit(1) } + if err = (&controllers.TriggerAuthenticationReconciler{ + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("TriggerAuthentication"), + Recorder: eventRecorder, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "TriggerAuthentication") + os.Exit(1) + } // +kubebuilder:scaffold:builder setupLog.Info("Starting manager") diff --git a/pkg/eventreason/eventreason.go b/pkg/eventreason/eventreason.go new file mode 100644 index 00000000000..b9a3e15739f --- /dev/null +++ b/pkg/eventreason/eventreason.go @@ -0,0 +1,67 @@ +/* +Copyright 2020 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventreason + +const ( + // ScaledObjectReady is for event when a new ScaledObject is ready + ScaledObjectReady = "ScaledObjectReady" + + // ScaledJobReady is for event when a new ScaledJob is ready + ScaledJobReady = "ScaledJobReady" + + // ScaledObjectCheckFailed is for event when ScaledObject validation check fails + ScaledObjectCheckFailed = "ScaledObjectCheckFailed" + + // ScaledJobCheckFailed is for event when ScaledJob validation check fails + ScaledJobCheckFailed = "ScaledJobCheckFailed" + + // ScaledObjectDeleted is for event when ScaledObject is deleted + ScaledObjectDeleted = "ScaledObjectDeleted" + + // ScaledJobDeleted is for event when ScaledJob is deleted + ScaledJobDeleted = "ScaledJobDeleted" + + // KEDAScalersStarted is for event when scalers watch started for ScaledObject or ScaledJob + KEDAScalersStarted = "KEDAScalersStarted" + + // KEDAScalersStopped is for event when scalers watch was stopped for ScaledObject or ScaledJob + KEDAScalersStopped = "KEDAScalersStopped" + + // KEDAScalerFailed is for event when a scaler fails for a ScaledJob or a ScaledObject + KEDAScalerFailed = "KEDAScalerFailed" + + // KEDAScaleTargetActivated is for event when the scale target of ScaledObject was activated + KEDAScaleTargetActivated = "KEDAScaleTargetActivated" + + // KEDAScaleTargetDeactivated is for event when the scale target for ScaledObject was deactivated + KEDAScaleTargetDeactivated = "KEDAScaleTargetDeactivated" + + // KEDAScaleTargetActivationFailed is for event when the activation the scale target for ScaledObject fails + KEDAScaleTargetActivationFailed = "KEDAScaleTargetActivationFailed" + + // KEDAScaleTargetDeactivationFailed is for event when the deactivation of the scale target for ScaledObject fails + KEDAScaleTargetDeactivationFailed = "KEDAScaleTargetDeactivationFailed" + + // KEDAJobsCreated is for event when jobs for ScaledJob are created + KEDAJobsCreated = "KEDAJobsCreated" + + // TriggerAuthenticationDeleted is for event when a TriggerAuthentication is deleted + TriggerAuthenticationDeleted = "TriggerAuthenticationDeleted" + + // TriggerAuthenticationAdded is for event when a TriggerAuthentication is added + TriggerAuthenticationAdded = "TriggerAuthenticationAdded" +) diff --git a/pkg/scaling/executor/scale_executor.go b/pkg/scaling/executor/scale_executor.go index 4a703afe8d8..b8fd1d144ed 100644 --- a/pkg/scaling/executor/scale_executor.go +++ b/pkg/scaling/executor/scale_executor.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "k8s.io/client-go/tools/record" + "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -30,15 +32,17 @@ type scaleExecutor struct { scaleClient *scale.ScalesGetter reconcilerScheme *runtime.Scheme logger logr.Logger + recorder record.EventRecorder } // NewScaleExecutor creates a ScaleExecutor object -func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme) ScaleExecutor { +func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, recorder record.EventRecorder) ScaleExecutor { return &scaleExecutor{ client: client, scaleClient: scaleClient, reconcilerScheme: reconcilerScheme, logger: logf.Log.WithName("scaleexecutor"), + recorder: recorder, } } diff --git a/pkg/scaling/executor/scale_jobs.go b/pkg/scaling/executor/scale_jobs.go index 43c72272ee5..57eb5b637d1 100644 --- a/pkg/scaling/executor/scale_jobs.go +++ b/pkg/scaling/executor/scale_jobs.go @@ -5,6 +5,8 @@ import ( "sort" "strconv" + "github.com/kedacore/keda/v2/pkg/eventreason" + "github.com/go-logr/logr" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -108,6 +110,7 @@ func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.S } } logger.Info("Created jobs", "Number of jobs", scaleTo) + e.recorder.Eventf(scaledJob, corev1.EventTypeNormal, eventreason.KEDAJobsCreated, "Created %d jobs", scaleTo) } func (e *scaleExecutor) isJobFinished(j *batchv1.Job) bool { diff --git a/pkg/scaling/executor/scale_scaledobjects.go b/pkg/scaling/executor/scale_scaledobjects.go index 6e4a5c1cc4b..fd87bc1e5a3 100644 --- a/pkg/scaling/executor/scale_scaledobjects.go +++ b/pkg/scaling/executor/scale_scaledobjects.go @@ -4,6 +4,9 @@ import ( "context" "time" + "github.com/kedacore/keda/v2/pkg/eventreason" + corev1 "k8s.io/api/core/v1" + "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" @@ -121,13 +124,16 @@ func (e *scaleExecutor) scaleToZero(ctx context.Context, logger logr.Logger, sca if scaledObject.Status.LastActiveTime == nil || scaledObject.Status.LastActiveTime.Add(cooldownPeriod).Before(time.Now()) { // or last time a trigger was active was > cooldown period, so scale down. - _, err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale, 0) + currentReplicas, err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale, 0) if err == nil { logger.Info("Successfully scaled ScaleTarget to 0 replicas") + e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.KEDAScaleTargetDeactivated, "Deactivated %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, 0) if err := e.setActiveCondition(ctx, logger, scaledObject, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil { logger.Error(err, "Error in setting active condition") return } + } else { + e.recorder.Eventf(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScaleTargetDeactivationFailed, "Failed to deactivated %s %s/%s", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, 0) } } else { logger.V(1).Info("ScaleTarget cooling down", @@ -158,12 +164,15 @@ func (e *scaleExecutor) scaleFromZero(ctx context.Context, logger logr.Logger, s logger.Info("Successfully updated ScaleTarget", "Original Replicas Count", currentReplicas, "New Replicas Count", replicas) + e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.KEDAScaleTargetActivated, "Scaled %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, replicas) // Scale was successful. Update lastScaleTime and lastActiveTime on the scaledObject if err := e.updateLastActiveTime(ctx, logger, scaledObject); err != nil { logger.Error(err, "Error in Updating lastScaleTime and lastActiveTime on the scaledObject") return } + } else { + e.recorder.Eventf(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScaleTargetActivationFailed, "Failed to scaled %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, replicas) } } diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 4a71a3c84e5..af55825e97e 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -6,6 +6,9 @@ import ( "sync" "time" + "github.com/kedacore/keda/v2/pkg/eventreason" + "k8s.io/client-go/tools/record" + "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" "k8s.io/api/autoscaling/v2beta2" @@ -44,16 +47,18 @@ type scaleHandler struct { scaleLoopContexts *sync.Map scaleExecutor executor.ScaleExecutor globalHTTPTimeout time.Duration + recorder record.EventRecorder } // NewScaleHandler creates a ScaleHandler object -func NewScaleHandler(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration) ScaleHandler { +func NewScaleHandler(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration, recorder record.EventRecorder) ScaleHandler { return &scaleHandler{ client: client, logger: logf.Log.WithName("scalehandler"), scaleLoopContexts: &sync.Map{}, - scaleExecutor: executor.NewScaleExecutor(client, scaleClient, reconcilerScheme), + scaleExecutor: executor.NewScaleExecutor(client, scaleClient, reconcilerScheme, recorder), globalHTTPTimeout: globalHTTPTimeout, + recorder: recorder, } } @@ -90,6 +95,8 @@ func (h *scaleHandler) HandleScalableObject(scalableObject interface{}) error { cancelValue() } h.scaleLoopContexts.Store(key, cancel) + } else { + h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.KEDAScalersStarted, "Started scalers watch") } // a mutex is used to synchronize scale requests per scalableObject @@ -115,6 +122,7 @@ func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error { cancel() } h.scaleLoopContexts.Delete(key) + h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.KEDAScalersStopped, "Stopped scalers watch") } else { h.logger.V(1).Info("ScaleObject was not found in controller cache", "key", key) } @@ -194,7 +202,7 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac defer scalingMutex.Unlock() switch obj := scalableObject.(type) { case *kedav1alpha1.ScaledObject: - h.scaleExecutor.RequestScale(ctx, obj, h.checkScaledObjectScalers(ctx, scalers)) + h.scaleExecutor.RequestScale(ctx, obj, h.checkScaledObjectScalers(ctx, scalers, obj)) case *kedav1alpha1.ScaledJob: scaledJob := scalableObject.(*kedav1alpha1.ScaledJob) isActive, scaleTo, maxScale := h.checkScaledJobScalers(ctx, scalers, scaledJob) @@ -202,7 +210,7 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac } } -func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []scalers.Scaler) bool { +func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) bool { isActive := false for i, scaler := range scalers { isTriggerActive, err := scaler.IsActive(ctx) @@ -210,6 +218,7 @@ func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []s if err != nil { h.logger.V(1).Info("Error getting scale decision", "Error", err) + h.recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) continue } else if isTriggerActive { isActive = true @@ -264,6 +273,7 @@ func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scal scaler.Close() if err != nil { scalerLogger.V(1).Info("Error getting scale decision, but continue", "Error", err) + h.recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) continue } else if isTriggerActive { isActive = true @@ -364,6 +374,7 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod scaler, err := buildScaler(trigger.Type, config) if err != nil { closeScalers(scalersRes) + h.recorder.Event(withTriggers, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) return []scalers.Scaler{}, fmt.Errorf("error getting scaler for trigger #%d: %s", i, err) }