Skip to content

Commit

Permalink
CR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed ElSayed <[email protected]>
  • Loading branch information
ahmelsayed committed Jan 29, 2021
1 parent f1ee747 commit d3191b5
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 36 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
- Global authentication credentials can be managed using `ClusterTriggerAuthentication` objects ([#1452](https://github.com/kedacore/keda/pull/1452))
- Introducing OpenStack Swift scaler ([#1342](https://github.com/kedacore/keda/issues/1342))
- Introducing MongoDB scaler ([#1467](https://github.com/kedacore/keda/pull/1467))
- Emit Kubernetes Events on KEDA events ([#1523]())
- Emit Kubernetes Events on KEDA events ([#1523](https://github.com/kedacore/keda/pull/1523))

### Improvements

Expand Down
7 changes: 7 additions & 0 deletions api/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ func (c *Conditions) GetActiveCondition() Condition {
return c.getCondition(ConditionActive)
}

func (c *Conditions) GetReadyCondition() Condition {
if *c == nil {
c = GetInitializedConditions()
}
return c.getCondition(ConditionReady)
}

func (c Conditions) getCondition(conditionType ConditionType) Condition {
for i := range c {
if c[i].Type == conditionType {
Expand Down
11 changes: 7 additions & 4 deletions controllers/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"
"time"

"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -89,11 +89,14 @@ func (r *ScaledJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed")
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.CheckFailed, msg)
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.Ready, msg)
}

return ctrl.Result{}, err
Expand Down
2 changes: 1 addition & 1 deletion controllers/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *k
}

logger.Info("Successfully finalized ScaledJob")
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.Deleted, "ScaledJob was deleted")
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobDeleted, "ScaledJob was deleted")
return nil
}

Expand Down
9 changes: 7 additions & 2 deletions controllers/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,20 @@ func (r *ScaledObjectReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnkownState", "ScaledObject check failed")
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.CheckFailed, msg)
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectReady, "ScaledObject is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg)
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.Ready, msg)
}

if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, &conditions); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, err
}

Expand Down
2 changes: 1 addition & 1 deletion controllers/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled
}

logger.Info("Successfully finalized ScaledObject")
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.Deleted, "ScaledObject was deleted")
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectDeleted, "ScaledObject was deleted")
return nil
}

Expand Down
55 changes: 55 additions & 0 deletions controllers/triggerauthentication_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package controllers

import (
"context"
"github.com/go-logr/logr"
kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// +kubebuilder:rbac:groups=keda.sh,resources=triggerauthentications;triggerauthentications/status,verbs="*"

// TriggerAuthenticationReconciler reconciles a TriggerAuthentication object
type TriggerAuthenticationReconciler struct {
Client client.Client
Log logr.Logger
Recorder record.EventRecorder
}

func (r *TriggerAuthenticationReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
reqLogger := r.Log.WithValues("TriggerAuthentication.Namespace", req.Namespace, "TriggerAuthentication.Name", req.Name)

triggerAuthentication := &kedav1alpha1.TriggerAuthentication{}
err := r.Client.Get(context.TODO(), req.NamespacedName, triggerAuthentication)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
reqLogger.Error(err, "Failed ot get TriggerAuthentication")
return ctrl.Result{}, err
}

if triggerAuthentication.GetDeletionTimestamp() != nil {
r.Recorder.Event(triggerAuthentication, corev1.EventTypeNormal, eventreason.TriggerAuthenticationDeleted, "TriggerAuthentication was deleted")
return ctrl.Result{}, nil
}

if triggerAuthentication.ObjectMeta.Generation == 1 {
r.Recorder.Event(triggerAuthentication, corev1.EventTypeNormal, eventreason.TriggerAuthenticationAdded, "New TriggerAuthentication configured")
}

return ctrl.Result{}, nil
}

func (r* TriggerAuthenticationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&kedav1alpha1.TriggerAuthentication{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}
8 changes: 8 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "ScaledJob")
os.Exit(1)
}
if err = (&controllers.TriggerAuthenticationReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("TriggerAuthentication"),
Recorder: mgr.GetEventRecorderFor("triggerauthentication-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "TriggerAuthentication")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

setupLog.Info("Starting manager")
Expand Down
52 changes: 35 additions & 17 deletions pkg/eventreason/eventreason.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,54 @@ limitations under the License.
package eventreason

const (
// Ready is for event when ScaledObject or ScaledJob is ready
Ready = "Ready"
// ScaledObjectReady is for event when a new ScaledObject is ready
ScaledObjectReady = "ScaledObjectReady"

// CheckFailed is for event when ScaledObject or ScaledJob validation check failed
CheckFailed = "CheckFailed"
// ScaledJobReady is for event when a new ScaledJob is ready
ScaledJobReady = "ScaledJobReady"

// Deleted is for event when ScaledObject or ScaledJob is deleted
Deleted = "Deleted"
// ScaledObjectCheckFailed is for event when ScaledObject validation check fails
ScaledObjectCheckFailed = "ScaledObjectCheckFailed"

// ScalersStarted is for event when scalers watch started for ScaledObject or ScaledJob
ScalersStarted = "ScalersStarted"
// ScaledJobCheckFailed is for event when ScaledJob validation check fails
ScaledJobCheckFailed = "ScaledJobCheckFailed"

// ScalersRestarted is for event when scalers watch was restarted for ScaledObject or ScaledJob
ScalersRestarted = "ScalersRestarted"
// ScaledObjectDeleted is for event when ScaledObject is deleted
ScaledObjectDeleted = "ScaledObjectDeleted"

// ScalersStopped is for event when scalers watch was stopped for ScaledObject or ScaledJob
ScalersStopped = "ScalersStopped"
// ScaledJobDeleted is for event when ScaledJob is deleted
ScaledJobDeleted = "ScaledJobDeleted"

// KEDAScalersStarted is for event when scalers watch started for ScaledObject or ScaledJob
KEDAScalersStarted = "KEDAScalersStarted"

// KEDAScalersRestarted is for event when scalers watch was restarted for ScaledObject or ScaledJob
KEDAScalersRestarted = "KEDAScalersRestarted"

// KEDAScalersStopped is for event when scalers watch was stopped for ScaledObject or ScaledJob
KEDAScalersStopped = "KEDAScalersStopped"

// KEDAScalerFailed is for event when a scaler fails for a ScaledJob or a ScaledObject
KEDAScalerFailed = "KEDAScalerFailed"

// ScaleTargetActivated is for event when the scale target of ScaledObject was activated
ScaleTargetActivated = "ScaleTargetActivated"
KEDAScaleTargetActivated = "KEDAScaleTargetActivated"

// ScaleTargetDeactivated is for event when the scale target for ScaledObject was deactivated
ScaleTargetDeactivated = "ScaleTargetDeactivated"
KEDAScaleTargetDeactivated = "KEDAScaleTargetDeactivated"

// ScaleTargetActivationFailed is for event when the activation the scale target for ScaledObject fails
ScaleTargetActivationFailed = "ScaleTargetActivationFailed"
KEDAScaleTargetActivationFailed = "KEDAScaleTargetActivationFailed"

// ScaleTargetDeactivationFailed is for event when the deactivation of the scale target for ScaledObject fails
ScaleTargetDeactivationFailed = "ScaleTargetDeactivationFailed"
KEDAScaleTargetDeactivationFailed = "KEDAScaleTargetDeactivationFailed"

// JobsCreated is for event when jobs for ScaledJob are created
JobsCreated = "JobsCreated"
KEDAJobsCreated = "KEDAJobsCreated"

// TriggerAuthenticationDeleted is for event when a TriggerAuthentication is deleted
TriggerAuthenticationDeleted = "TriggerAuthenticationDeleted"

// TriggerAuthenticationAdded is for event when a TriggerAuthentication is added
TriggerAuthenticationAdded = "TriggerAuthenticationAdded"
)
2 changes: 1 addition & 1 deletion pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.S
}
}
logger.Info("Created jobs", "Number of jobs", scaleTo)
e.recorder.Eventf(scaledJob, corev1.EventTypeNormal, eventreason.JobsCreated, "Created %d jobs", scaleTo)
e.recorder.Eventf(scaledJob, corev1.EventTypeNormal, eventreason.KEDAJobsCreated, "Created %d jobs", scaleTo)
}

func (e *scaleExecutor) isJobFinished(j *batchv1.Job) bool {
Expand Down
8 changes: 4 additions & 4 deletions pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ func (e *scaleExecutor) scaleToZero(ctx context.Context, logger logr.Logger, sca
currentReplicas, err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale, 0)
if err == nil {
logger.Info("Successfully scaled ScaleTarget to 0 replicas")
e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.ScaleTargetDeactivated, "Deactivated %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, 0)
e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.KEDAScaleTargetDeactivated, "Deactivated %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, 0)
if err := e.setActiveCondition(ctx, logger, scaledObject, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil {
logger.Error(err, "Error in setting active condition")
return
}
} else {
e.recorder.Eventf(scaledObject, corev1.EventTypeWarning, eventreason.ScaleTargetDeactivationFailed, "Failed to deactivated %s %s/%s", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, 0)
e.recorder.Eventf(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScaleTargetDeactivationFailed, "Failed to deactivated %s %s/%s", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, 0)
}
} else {
logger.V(1).Info("ScaleTarget cooling down",
Expand Down Expand Up @@ -164,15 +164,15 @@ func (e *scaleExecutor) scaleFromZero(ctx context.Context, logger logr.Logger, s
logger.Info("Successfully updated ScaleTarget",
"Original Replicas Count", currentReplicas,
"New Replicas Count", replicas)
e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.ScaleTargetActivated, "Scaled %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, replicas)
e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.KEDAScaleTargetActivated, "Scaled %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, replicas)

// Scale was successful. Update lastScaleTime and lastActiveTime on the scaledObject
if err := e.updateLastActiveTime(ctx, logger, scaledObject); err != nil {
logger.Error(err, "Error in Updating lastScaleTime and lastActiveTime on the scaledObject")
return
}
} else {
e.recorder.Eventf(scaledObject, corev1.EventTypeWarning, eventreason.ScaleTargetActivationFailed, "Failed to scaled %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, replicas)
e.recorder.Eventf(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScaleTargetActivationFailed, "Failed to scaled %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, replicas)
}
}

Expand Down
13 changes: 8 additions & 5 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ func (h *scaleHandler) HandleScalableObject(scalableObject interface{}) error {
cancelValue()
}
h.scaleLoopContexts.Store(key, cancel)
h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.ScalersRestarted, "Restarted scalers watch")
h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.KEDAScalersRestarted, "Restarted scalers watch")
} else {
h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.ScalersStarted, "Started scalers watch")
h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.KEDAScalersStarted, "Started scalers watch")
}

// a mutex is used to synchronize scale requests per scalableObject
Expand All @@ -123,7 +123,7 @@ func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error {
cancel()
}
h.scaleLoopContexts.Delete(key)
h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.ScalersStopped, "Stopped scalers watch")
h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.KEDAScalersStopped, "Stopped scalers watch")
} else {
h.logger.V(1).Info("ScaleObject was not found in controller cache", "key", key)
}
Expand Down Expand Up @@ -201,22 +201,23 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac
defer scalingMutex.Unlock()
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
h.scaleExecutor.RequestScale(ctx, obj, h.checkScaledObjectScalers(ctx, scalers))
h.scaleExecutor.RequestScale(ctx, obj, h.checkScaledObjectScalers(ctx, scalers, obj))
case *kedav1alpha1.ScaledJob:
scaledJob := scalableObject.(*kedav1alpha1.ScaledJob)
isActive, scaleTo, maxScale := h.checkScaledJobScalers(ctx, scalers, scaledJob)
h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale)
}
}

func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []scalers.Scaler) bool {
func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) bool {
isActive := false
for i, scaler := range scalers {
isTriggerActive, err := scaler.IsActive(ctx)
scaler.Close()

if err != nil {
h.logger.V(1).Info("Error getting scale decision", "Error", err)
h.recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
continue
} else if isTriggerActive {
isActive = true
Expand Down Expand Up @@ -271,6 +272,7 @@ func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scal
scaler.Close()
if err != nil {
scalerLogger.V(1).Info("Error getting scale decision, but continue", "Error", err)
h.recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
continue
} else if isTriggerActive {
isActive = true
Expand Down Expand Up @@ -371,6 +373,7 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod
scaler, err := buildScaler(trigger.Type, config)
if err != nil {
closeScalers(scalersRes)
h.recorder.Event(withTriggers, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
return []scalers.Scaler{}, fmt.Errorf("error getting scaler for trigger #%d: %s", i, err)
}

Expand Down

0 comments on commit d3191b5

Please sign in to comment.