Skip to content

Commit

Permalink
Emit kubernetes events from KEDA (kedacore#1523)
Browse files Browse the repository at this point in the history
* Emit kubernetes events from KEDA

Signed-off-by: Ahmed ElSayed <[email protected]>

* CR comments

Signed-off-by: Ahmed ElSayed <[email protected]>

* Fix CI errors

Signed-off-by: Ahmed ElSayed <[email protected]>

* goimports

Signed-off-by: Ahmed ElSayed <[email protected]>

* Code review comments

Signed-off-by: Ahmed ElSayed <[email protected]>

* Fix CHANGELOG.md

Signed-off-by: Ahmed ElSayed <[email protected]>
  • Loading branch information
ahmelsayed authored and ycabrer committed Mar 1, 2021
1 parent faefe77 commit 331325b
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
### New

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- Emit Kubernetes Events on KEDA events ([#1523](https://github.com/kedacore/keda/pull/1523))

### Improvements

Expand Down
7 changes: 6 additions & 1 deletion adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"strconv"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/util/wait"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
Expand Down Expand Up @@ -70,7 +73,9 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric
return nil, fmt.Errorf("unable to construct new client (%s)", err)
}

handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout)
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"})
handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout, recorder)

namespace, err := getWatchNamespace()
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions api/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ func (c *Conditions) GetActiveCondition() Condition {
return c.getCondition(ConditionActive)
}

// GetReadyCondition returns Condition of type Ready
func (c *Conditions) GetReadyCondition() Condition {
if *c == nil {
c = GetInitializedConditions()
}
return c.getCondition(ConditionReady)
}

func (c Conditions) getCondition(conditionType ConditionType) Condition {
for i := range c {
if c[i].Type == conditionType {
Expand Down
13 changes: 12 additions & 1 deletion controllers/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import (
"fmt"
"time"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -29,12 +34,13 @@ type ScaledJobReconciler struct {
Log logr.Logger
Scheme *runtime.Scheme
GlobalHTTPTimeout time.Duration
Recorder record.EventRecorder
scaleHandler scaling.ScaleHandler
}

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout)
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"))

return ctrl.NewControllerManagedBy(mgr).
// Ignore updates to ScaledJob Status (in this case metadata.Generation does not change)
Expand Down Expand Up @@ -84,7 +90,12 @@ func (r *ScaledJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed")
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
}
Expand Down
4 changes: 4 additions & 0 deletions controllers/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package controllers
import (
"context"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

"github.com/go-logr/logr"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
Expand Down Expand Up @@ -33,6 +36,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *k
}

logger.Info("Successfully finalized ScaledJob")
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobDeleted, "ScaledJob was deleted")
return nil
}

Expand Down
14 changes: 13 additions & 1 deletion controllers/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"sync"
"time"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
Expand Down Expand Up @@ -46,6 +50,7 @@ type ScaledObjectReconciler struct {
Client client.Client
Scheme *runtime.Scheme
GlobalHTTPTimeout time.Duration
Recorder record.EventRecorder

scaleClient *scale.ScalesGetter
restMapper meta.RESTMapper
Expand Down Expand Up @@ -91,7 +96,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Init the rest of ScaledObjectReconciler
r.restMapper = mgr.GetRESTMapper()
r.scaledObjectsGenerations = &sync.Map{}
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.GlobalHTTPTimeout)
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.GlobalHTTPTimeout, r.Recorder)

// Start controller
return ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -159,13 +164,20 @@ func (r *ScaledObjectReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnkownState", "ScaledObject check failed")
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectReady, "ScaledObject is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg)
}

if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, &conditions); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, err
}

Expand Down
4 changes: 4 additions & 0 deletions controllers/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package controllers
import (
"context"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -54,6 +57,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled
}

logger.Info("Successfully finalized ScaledObject")
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectDeleted, "ScaledObject was deleted")
return nil
}

Expand Down
58 changes: 58 additions & 0 deletions controllers/triggerauthentication_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package controllers

import (
"context"

"github.com/go-logr/logr"
kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// +kubebuilder:rbac:groups=keda.sh,resources=triggerauthentications;triggerauthentications/status,verbs="*"

// TriggerAuthenticationReconciler reconciles a TriggerAuthentication object
type TriggerAuthenticationReconciler struct {
Client client.Client
Log logr.Logger
Recorder record.EventRecorder
}

// Reconcile performs reconciliation on the identified TriggerAuthentication resource based on the request information passed, returns the result and an error (if any).
func (r *TriggerAuthenticationReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
reqLogger := r.Log.WithValues("TriggerAuthentication.Namespace", req.Namespace, "TriggerAuthentication.Name", req.Name)

triggerAuthentication := &kedav1alpha1.TriggerAuthentication{}
err := r.Client.Get(context.TODO(), req.NamespacedName, triggerAuthentication)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
reqLogger.Error(err, "Failed ot get TriggerAuthentication")
return ctrl.Result{}, err
}

if triggerAuthentication.GetDeletionTimestamp() != nil {
r.Recorder.Event(triggerAuthentication, corev1.EventTypeNormal, eventreason.TriggerAuthenticationDeleted, "TriggerAuthentication was deleted")
return ctrl.Result{}, nil
}

if triggerAuthentication.ObjectMeta.Generation == 1 {
r.Recorder.Event(triggerAuthentication, corev1.EventTypeNormal, eventreason.TriggerAuthenticationAdded, "New TriggerAuthentication configured")
}

return ctrl.Result{}, nil
}

// SetupWithManager initializes the TriggerAuthenticationReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *TriggerAuthenticationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&kedav1alpha1.TriggerAuthentication{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kedacore/keda v1.5.0 h1:c8xA1Vo3H7rPwFiWUX3CBXnjBSrbYDmUs9iEfDlf4bQ=
github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
Expand Down
11 changes: 11 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ func main() {
}

globalHTTPTimeout := time.Duration(globalHTTPTimeoutMS) * time.Millisecond
eventRecorder := mgr.GetEventRecorderFor("keda-operator")

if err = (&controllers.ScaledObjectReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("ScaledObject"),
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledObject")
os.Exit(1)
Expand All @@ -138,10 +140,19 @@ func main() {
Log: ctrl.Log.WithName("controllers").WithName("ScaledJob"),
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledJob")
os.Exit(1)
}
if err = (&controllers.TriggerAuthenticationReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("TriggerAuthentication"),
Recorder: eventRecorder,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "TriggerAuthentication")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

setupLog.Info("Starting manager")
Expand Down
67 changes: 67 additions & 0 deletions pkg/eventreason/eventreason.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2020 The KEDA Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package eventreason

const (
// ScaledObjectReady is for event when a new ScaledObject is ready
ScaledObjectReady = "ScaledObjectReady"

// ScaledJobReady is for event when a new ScaledJob is ready
ScaledJobReady = "ScaledJobReady"

// ScaledObjectCheckFailed is for event when ScaledObject validation check fails
ScaledObjectCheckFailed = "ScaledObjectCheckFailed"

// ScaledJobCheckFailed is for event when ScaledJob validation check fails
ScaledJobCheckFailed = "ScaledJobCheckFailed"

// ScaledObjectDeleted is for event when ScaledObject is deleted
ScaledObjectDeleted = "ScaledObjectDeleted"

// ScaledJobDeleted is for event when ScaledJob is deleted
ScaledJobDeleted = "ScaledJobDeleted"

// KEDAScalersStarted is for event when scalers watch started for ScaledObject or ScaledJob
KEDAScalersStarted = "KEDAScalersStarted"

// KEDAScalersStopped is for event when scalers watch was stopped for ScaledObject or ScaledJob
KEDAScalersStopped = "KEDAScalersStopped"

// KEDAScalerFailed is for event when a scaler fails for a ScaledJob or a ScaledObject
KEDAScalerFailed = "KEDAScalerFailed"

// KEDAScaleTargetActivated is for event when the scale target of ScaledObject was activated
KEDAScaleTargetActivated = "KEDAScaleTargetActivated"

// KEDAScaleTargetDeactivated is for event when the scale target for ScaledObject was deactivated
KEDAScaleTargetDeactivated = "KEDAScaleTargetDeactivated"

// KEDAScaleTargetActivationFailed is for event when the activation the scale target for ScaledObject fails
KEDAScaleTargetActivationFailed = "KEDAScaleTargetActivationFailed"

// KEDAScaleTargetDeactivationFailed is for event when the deactivation of the scale target for ScaledObject fails
KEDAScaleTargetDeactivationFailed = "KEDAScaleTargetDeactivationFailed"

// KEDAJobsCreated is for event when jobs for ScaledJob are created
KEDAJobsCreated = "KEDAJobsCreated"

// TriggerAuthenticationDeleted is for event when a TriggerAuthentication is deleted
TriggerAuthenticationDeleted = "TriggerAuthenticationDeleted"

// TriggerAuthenticationAdded is for event when a TriggerAuthentication is added
TriggerAuthenticationAdded = "TriggerAuthenticationAdded"
)
6 changes: 5 additions & 1 deletion pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -30,15 +32,17 @@ type scaleExecutor struct {
scaleClient *scale.ScalesGetter
reconcilerScheme *runtime.Scheme
logger logr.Logger
recorder record.EventRecorder
}

// NewScaleExecutor creates a ScaleExecutor object
func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme) ScaleExecutor {
func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, recorder record.EventRecorder) ScaleExecutor {
return &scaleExecutor{
client: client,
scaleClient: scaleClient,
reconcilerScheme: reconcilerScheme,
logger: logf.Log.WithName("scaleexecutor"),
recorder: recorder,
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sort"
"strconv"

"github.com/kedacore/keda/v2/pkg/eventreason"

"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -108,6 +110,7 @@ func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.S
}
}
logger.Info("Created jobs", "Number of jobs", scaleTo)
e.recorder.Eventf(scaledJob, corev1.EventTypeNormal, eventreason.KEDAJobsCreated, "Created %d jobs", scaleTo)
}

func (e *scaleExecutor) isJobFinished(j *batchv1.Job) bool {
Expand Down
Loading

0 comments on commit 331325b

Please sign in to comment.