From c7f88a810b97e962f715ffe23f1707bdb952290e Mon Sep 17 00:00:00 2001 From: Josef Karasek Date: Mon, 13 Nov 2023 03:56:26 +0100 Subject: [PATCH] suggested improvements after PR review (#1) Signed-off-by: Josef Karasek Signed-off-by: SpiritZhou --- apis/eventing/v1alpha1/condition_types.go | 28 +++ apis/keda/v1alpha1/condition_types.go | 4 +- cmd/operator/main.go | 10 +- .../eventing/cloudeventsource_controller.go | 50 ++-- controllers/keda/scaledobject_controller.go | 4 +- controllers/keda/suite_test.go | 2 +- pkg/eventemitter/cloudevent_http_handler.go | 40 ++-- pkg/eventemitter/eventdata/eventdata.go | 2 +- pkg/eventemitter/eventemitter.go | 226 +++++++++++------- pkg/eventemitter/eventemitter_test.go | 36 +-- pkg/scaling/executor/scale_scaledobjects.go | 6 +- 11 files changed, 244 insertions(+), 164 deletions(-) create mode 100644 apis/eventing/v1alpha1/condition_types.go diff --git a/apis/eventing/v1alpha1/condition_types.go b/apis/eventing/v1alpha1/condition_types.go new file mode 100644 index 00000000000..c00905d93b0 --- /dev/null +++ b/apis/eventing/v1alpha1/condition_types.go @@ -0,0 +1,28 @@ +/* +Copyright 2023 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +const ( + // CloudEventSourceConditionActiveReason defines the active condition reason for CloudEventSource + CloudEventSourceConditionActiveReason = "CloudEventSourceActive" + // CloudEventSourceConditionFailedReason defines the failed condition reason for CloudEventSource + CloudEventSourceConditionFailedReason = "CloudEventSourceFailed" + // CloudEventSourceConditionActiveMessage defines the active condition message for CloudEventSource + CloudEventSourceConditionActiveMessage = "Is configured to send events to the configured destination" + // CloudEventSourceConditionFailedMessage defines the failed condition message for CloudEventSource + CloudEventSourceConditionFailedMessage = "Failed to send events to the configured destination" +) diff --git a/apis/keda/v1alpha1/condition_types.go b/apis/keda/v1alpha1/condition_types.go index 4f3a182ee77..310831a681e 100644 --- a/apis/keda/v1alpha1/condition_types.go +++ b/apis/keda/v1alpha1/condition_types.go @@ -37,8 +37,8 @@ const ( ) const ( - // ScaledObjectConditionReadySucccesReason defines the default Reason for correct ScaledObject - ScaledObjectConditionReadySucccesReason = "ScaledObjectReady" + // ScaledObjectConditionReadySuccessReason defines the default Reason for correct ScaledObject + ScaledObjectConditionReadySuccessReason = "ScaledObjectReady" // ScaledObjectConditionReadySuccessMessage defines the default Message for correct ScaledObject ScaledObjectConditionReadySuccessMessage = "ScaledObject is defined correctly and is ready for scaling" // ScaledObjectConditionPausedReason defines the default Reason for paused ScaledObject diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 6b9169f6342..5a7901f1431 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -220,7 +220,7 @@ func main() { Recorder: eventRecorder, ScaleClient: scaleClient, ScaleHandler: scaledHandler, - EventEmitter: *eventEmitter, + EventEmitter: eventEmitter, }).SetupWithManager(mgr, controller.Options{ MaxConcurrentReconciles: scaledObjectMaxReconciles, }); err != nil { @@ -254,10 +254,10 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "ClusterTriggerAuthentication") os.Exit(1) } - if err = (&eventingcontrollers.CloudEventSourceReconciler{ - Client: mgr.GetClient(), - EventEmitter: *eventEmitter, - }).SetupWithManager(mgr); err != nil { + if err = (eventingcontrollers.NewCloudEventSourceReconciler( + mgr.GetClient(), + eventEmitter, + )).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CloudEventSource") os.Exit(1) } diff --git a/controllers/eventing/cloudeventsource_controller.go b/controllers/eventing/cloudeventsource_controller.go index 22483432ca8..e378cc99f26 100644 --- a/controllers/eventing/cloudeventsource_controller.go +++ b/controllers/eventing/cloudeventsource_controller.go @@ -38,23 +38,22 @@ import ( // CloudEventSourceReconciler reconciles a EventSource object type CloudEventSourceReconciler struct { client.Client - EventEmitter eventemitter.EventEmitter + eventEmitter eventemitter.EventHandler cloudEventSourceGenerations *sync.Map + eventSourcePromMetricsMap map[string]string + eventSourcePromMetricsLock *sync.Mutex } -type cloudEventSourceMetricsData struct { - namespace string -} - -var ( - eventSourcePromMetricsMap map[string]cloudEventSourceMetricsData - eventSourcePromMetricsLock *sync.Mutex -) - -func init() { - eventSourcePromMetricsMap = make(map[string]cloudEventSourceMetricsData) - eventSourcePromMetricsLock = &sync.Mutex{} +// NewCloudEventSourceReconciler creates a new CloudEventSourceReconciler +func NewCloudEventSourceReconciler(c client.Client, e eventemitter.EventHandler) *CloudEventSourceReconciler { + return &CloudEventSourceReconciler{ + Client: c, + eventEmitter: e, + cloudEventSourceGenerations: &sync.Map{}, + eventSourcePromMetricsMap: make(map[string]string), + eventSourcePromMetricsLock: &sync.Mutex{}, + } } // +kubebuilder:rbac:groups=eventing.keda.sh,resources=cloudeventsources;cloudeventsources/status,verbs="*" @@ -114,7 +113,6 @@ func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Req // SetupWithManager sets up the controller with the Manager. func (r *CloudEventSourceReconciler) SetupWithManager(mgr ctrl.Manager) error { - r.cloudEventSourceGenerations = &sync.Map{} return ctrl.NewControllerManagedBy(mgr). For(&eventingv1alpha1.CloudEventSource{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Complete(r) @@ -130,7 +128,7 @@ func (r *CloudEventSourceReconciler) requestEventLoop(ctx context.Context, logge return err } - if err = r.EventEmitter.HandleCloudEventSource(ctx, eventSource); err != nil { + if err = r.eventEmitter.HandleCloudEventSource(ctx, eventSource); err != nil { return err } @@ -148,7 +146,7 @@ func (r *CloudEventSourceReconciler) stopEventLoop(logger logr.Logger, eventSour return err } - if err := r.EventEmitter.DeleteCloudEventSource(eventSource); err != nil { + if err := r.eventEmitter.DeleteCloudEventSource(eventSource); err != nil { return err } // delete CloudEventSource's current Generation @@ -175,25 +173,25 @@ func (r *CloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger lo } func (r *CloudEventSourceReconciler) updatePromMetrics(eventSource *eventingv1alpha1.CloudEventSource, namespacedName string) { - eventSourcePromMetricsLock.Lock() - defer eventSourcePromMetricsLock.Unlock() + r.eventSourcePromMetricsLock.Lock() + defer r.eventSourcePromMetricsLock.Unlock() - if metricsData, ok := eventSourcePromMetricsMap[namespacedName]; ok { - metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, metricsData.namespace) + if ns, ok := r.eventSourcePromMetricsMap[namespacedName]; ok { + metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, ns) } metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.Namespace) - eventSourcePromMetricsMap[namespacedName] = cloudEventSourceMetricsData{namespace: eventSource.Namespace} + r.eventSourcePromMetricsMap[namespacedName] = eventSource.Namespace } // UpdatePromMetricsOnDelete is idempotent, so it can be called multiple times without side-effects func (r *CloudEventSourceReconciler) UpdatePromMetricsOnDelete(namespacedName string) { - eventSourcePromMetricsLock.Lock() - defer eventSourcePromMetricsLock.Unlock() + r.eventSourcePromMetricsLock.Lock() + defer r.eventSourcePromMetricsLock.Unlock() - if metricsData, ok := eventSourcePromMetricsMap[namespacedName]; ok { - metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, metricsData.namespace) + if ns, ok := r.eventSourcePromMetricsMap[namespacedName]; ok { + metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, ns) } - delete(eventSourcePromMetricsMap, namespacedName) + delete(r.eventSourcePromMetricsMap, namespacedName) } diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index 20d20d8f943..b0420c1e9a3 100755 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -69,7 +69,7 @@ type ScaledObjectReconciler struct { Recorder record.EventRecorder ScaleClient scale.ScalesGetter ScaleHandler scaling.ScaleHandler - EventEmitter eventemitter.EventEmitter + EventEmitter eventemitter.EventHandler restMapper meta.RESTMapper scaledObjectsGenerations *sync.Map @@ -191,7 +191,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeNormal, eventreason.ScaledObjectReady, message.ScalerReadyMsg) } reqLogger.V(1).Info(msg) - conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySucccesReason, msg) + conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySuccessReason, msg) } if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil { diff --git a/controllers/keda/suite_test.go b/controllers/keda/suite_test.go index b40e412fa8c..799cd46cacc 100644 --- a/controllers/keda/suite_test.go +++ b/controllers/keda/suite_test.go @@ -97,7 +97,7 @@ var _ = BeforeSuite(func() { Recorder: k8sManager.GetEventRecorderFor("keda-operator"), ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil), ScaleClient: scaleClient, - EventEmitter: *eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default"), + EventEmitter: eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default"), }).SetupWithManager(k8sManager, controller.Options{}) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/eventemitter/cloudevent_http_handler.go b/pkg/eventemitter/cloudevent_http_handler.go index e163c981e5d..fde47388622 100644 --- a/pkg/eventemitter/cloudevent_http_handler.go +++ b/pkg/eventemitter/cloudevent_http_handler.go @@ -33,13 +33,17 @@ import ( "github.com/kedacore/keda/v2/pkg/eventemitter/eventdata" ) +const ( + cloudEventSourceType = "com.cloudeventsource.keda" +) + type CloudEventHTTPHandler struct { - Endpoint string - Client cloudevents.Client - ClusterName string - ActiveStatus metav1.ConditionStatus ctx context.Context logger logr.Logger + endpoint string + client cloudevents.Client + clusterName string + activeStatus metav1.ConditionStatus } func NewCloudEventHTTPHandler(context context.Context, clusterName string, uri string, logger logr.Logger) (*CloudEventHTTPHandler, error) { @@ -55,47 +59,47 @@ func NewCloudEventHTTPHandler(context context.Context, clusterName string, uri s logger.Info("Create new cloudevents http handler with endPoint: " + uri) return &CloudEventHTTPHandler{ - Client: client, - Endpoint: uri, - ClusterName: clusterName, - ActiveStatus: metav1.ConditionTrue, + client: client, + endpoint: uri, + clusterName: clusterName, + activeStatus: metav1.ConditionTrue, ctx: ctx, logger: logger, }, nil } func (c *CloudEventHTTPHandler) SetActiveStatus(status metav1.ConditionStatus) { - c.ActiveStatus = status + c.activeStatus = status } func (c *CloudEventHTTPHandler) GetActiveStatus() metav1.ConditionStatus { - return c.ActiveStatus + return c.activeStatus } func (c *CloudEventHTTPHandler) CloseHandler() { - + c.logger.V(1).Info("Closing CloudEvent HTTP handler") } func (c *CloudEventHTTPHandler) EmitEvent(eventData eventdata.EventData, failureFunc func(eventData eventdata.EventData, err error)) { - source := "/" + c.ClusterName + "/" + eventData.Namespace + "/keda" - subject := "/" + c.ClusterName + "/" + eventData.Namespace + "/workload/" + eventData.ObjectName + source := "/" + c.clusterName + "/" + eventData.Namespace + "/keda" + subject := "/" + c.clusterName + "/" + eventData.Namespace + "/workload/" + eventData.ObjectName event := cloudevents.NewEvent() event.SetSource(source) event.SetSubject(subject) - event.SetType(CloudEventSourceType) + event.SetType(cloudEventSourceType) if err := event.SetData(cloudevents.ApplicationJSON, EmitData{Reason: eventData.Reason, Message: eventData.Message}); err != nil { - c.logger.Error(err, "Failed to set data to cloudevent") + c.logger.Error(err, "Failed to set data to CloudEvents receiver") return } - err := c.Client.Send(c.ctx, event) + err := c.client.Send(c.ctx, event) if protocol.IsNACK(err) || protocol.IsUndelivered(err) { - c.logger.Error(err, "Failed to send event to cloudevent") + c.logger.Error(err, "Failed to send event to CloudEvents receiver") failureFunc(eventData, err) return } - c.logger.V(1).Info("Publish Event to CloudEvents receiver Successfully") + c.logger.V(1).Info("Successfully published event to CloudEvents receiver") } diff --git a/pkg/eventemitter/eventdata/eventdata.go b/pkg/eventemitter/eventdata/eventdata.go index 9d39774484b..70ec5a30e2a 100644 --- a/pkg/eventemitter/eventdata/eventdata.go +++ b/pkg/eventemitter/eventdata/eventdata.go @@ -24,7 +24,7 @@ import ( type EventData struct { Namespace string ObjectName string - Eventtype string + EventType string Reason string Message string Time time.Time diff --git a/pkg/eventemitter/eventemitter.go b/pkg/eventemitter/eventemitter.go index 6cc7464a0eb..758dd2b1b07 100644 --- a/pkg/eventemitter/eventemitter.go +++ b/pkg/eventemitter/eventemitter.go @@ -41,28 +41,36 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" - v1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventemitter/eventdata" kedastatus "github.com/kedacore/keda/v2/pkg/status" ) -var log = logf.Log.WithName("event_emitter") -var ch chan eventdata.EventData - -const CloudEventSourceType = "com.cloudeventsource.keda" -const MaxRetryTimes = 5 -const MaxChannelBuffer = 1024 -const MaxWaitingEnqueueTime = 20 +const ( + maxRetryTimes = 5 + maxChannelBuffer = 1024 + maxWaitingEnqueueTime = 10 +) +// EventEmitter is the main struct for eventemitter package type EventEmitter struct { - client.Client - record.EventRecorder - clustername string - eventHandlersCache map[string]EventDataHandler - eventHandlersCachesLock *sync.RWMutex - eventLoopContexts *sync.Map + log logr.Logger + client client.Client + recorder record.EventRecorder + clusterName string + eventHandlersCache map[string]EventDataHandler + eventHandlersCacheLock *sync.RWMutex + eventLoopContexts *sync.Map + cloudEventProcessingChan chan eventdata.EventData +} + +// EventHandler defines the behavior for EventEmitter clients +type EventHandler interface { + DeleteCloudEventSource(cloudEventSource *eventingv1alpha1.CloudEventSource) error + HandleCloudEventSource(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error + Emit(object runtime.Object, namesapce types.NamespacedName, eventType, reason, message string) } +// EventDataHandler defines the behavior for different event handlers type EventDataHandler interface { EmitEvent(eventData eventdata.EventData, failureFunc func(eventData eventdata.EventData, err error)) SetActiveStatus(status metav1.ConditionStatus) @@ -70,23 +78,27 @@ type EventDataHandler interface { CloseHandler() } +// EmitData defines the data structure for emitting event type EmitData struct { Reason string `json:"reason"` Message string `json:"message"` } const ( - CloudEventHTTP = "CloudEventHTTP" + cloudEventHandlerTypeHTTP = "http" ) -func NewEventEmitter(client client.Client, recorder record.EventRecorder, clustername string) *EventEmitter { +// NewEventEmitter creates a new EventEmitter +func NewEventEmitter(client client.Client, recorder record.EventRecorder, clusterName string) EventHandler { return &EventEmitter{ - Client: client, - EventRecorder: recorder, - clustername: clustername, - eventHandlersCache: map[string]EventDataHandler{}, - eventHandlersCachesLock: &sync.RWMutex{}, - eventLoopContexts: &sync.Map{}, + log: logf.Log.WithName("event_emitter"), + client: client, + recorder: recorder, + clusterName: clusterName, + eventHandlersCache: map[string]EventDataHandler{}, + eventHandlersCacheLock: &sync.RWMutex{}, + eventLoopContexts: &sync.Map{}, + cloudEventProcessingChan: make(chan eventdata.EventData, maxChannelBuffer), } } @@ -104,7 +116,7 @@ func (e *EventEmitter) HandleCloudEventSource(ctx context.Context, cloudEventSou } key := cloudEventSource.GenerateIdentifier() - ctx, cancel := context.WithCancel(ctx) + cancelCtx, cancel := context.WithCancel(ctx) // cancel the outdated EventLoop for the same CloudEventSource (if exists) value, loaded := e.eventLoopContexts.LoadOrStore(key, cancel) @@ -114,14 +126,19 @@ func (e *EventEmitter) HandleCloudEventSource(ctx context.Context, cloudEventSou cancelValue() } e.eventLoopContexts.Store(key, cancel) + } else { + if updateErr := e.setCloudEventSourceStatusActive(ctx, cloudEventSource); updateErr != nil { + e.log.Error(updateErr, "Failed to update CloudEventSource status") + return updateErr + } } // a mutex is used to synchronize handler per cloudEventSource eventingMutex := &sync.Mutex{} // passing deep copy of CloudEventSource to the eventLoop go routines, it's a precaution to not have global objects shared between threads - log.V(1).Info("Start CloudEventSource loop.") - go e.startEventLoop(ctx, cloudEventSource, eventingMutex) + e.log.V(1).Info("Start CloudEventSource loop.") + go e.startEventLoop(cancelCtx, cloudEventSource.DeepCopy(), eventingMutex) return nil } @@ -137,7 +154,7 @@ func (e *EventEmitter) DeleteCloudEventSource(cloudEventSource *eventingv1alpha1 e.eventLoopContexts.Delete(key) e.clearEventHandlersCache(cloudEventSource) } else { - log.V(1).Info("CloudEventSource was not found in controller cache", "key", key) + e.log.V(1).Info("CloudEventSource was not found in controller cache", "key", key) } return nil @@ -146,39 +163,42 @@ func (e *EventEmitter) DeleteCloudEventSource(cloudEventSource *eventingv1alpha1 // createEventHandlers will create different handler as defined in CloudEventSource, and store them in cache for repeated // use in the loop. func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) { - e.eventHandlersCachesLock.Lock() - defer e.eventHandlersCachesLock.Unlock() + e.eventHandlersCacheLock.Lock() + defer e.eventHandlersCacheLock.Unlock() key := cloudEventSource.GenerateIdentifier() clusterName := cloudEventSource.Spec.ClusterName if clusterName == "" { - clusterName = e.clustername + clusterName = e.clusterName } - // Create different event destination here. + // Create different event destinations here if cloudEventSource.Spec.Destination.HTTP != nil { - var eventHandler EventDataHandler eventHandler, err := NewCloudEventHTTPHandler(ctx, clusterName, cloudEventSource.Spec.Destination.HTTP.URI, initializeLogger(cloudEventSource, "cloudevent_http")) - if err != nil { - log.Error(err, "create CloudEvent HTTP handler failed") - } else { - e.eventHandlersCache[key+CloudEventHTTP] = eventHandler + e.log.Error(err, "create CloudEvent HTTP handler failed") + return } + + eventHandlerKey := newEventHandlerKey(key, cloudEventHandlerTypeHTTP) + if h, ok := e.eventHandlersCache[eventHandlerKey]; ok { + h.CloseHandler() + } + e.eventHandlersCache[eventHandlerKey] = eventHandler } } // clearEventHandlersCache will clear all event handlers that created by the passing CloudEventSource func (e *EventEmitter) clearEventHandlersCache(cloudEventSource *eventingv1alpha1.CloudEventSource) { - e.eventHandlersCachesLock.Lock() - defer e.eventHandlersCachesLock.Unlock() + e.eventHandlersCacheLock.Lock() + defer e.eventHandlersCacheLock.Unlock() key := cloudEventSource.GenerateIdentifier() // Clear different event destination here. if cloudEventSource.Spec.Destination.HTTP != nil { - eventHandlerKey := key + CloudEventHTTP + eventHandlerKey := newEventHandlerKey(key, cloudEventHandlerTypeHTTP) if eventHandler, found := e.eventHandlersCache[eventHandlerKey]; found { eventHandler.CloseHandler() delete(e.eventHandlersCache, key) @@ -188,8 +208,8 @@ func (e *EventEmitter) clearEventHandlersCache(cloudEventSource *eventingv1alpha // clearEventHandlersCache will check if the event handlers that were created by passing CloudEventSource exist func (e *EventEmitter) checkIfEventHandlersExist(cloudEventSource *eventingv1alpha1.CloudEventSource) bool { - e.eventHandlersCachesLock.RLock() - defer e.eventHandlersCachesLock.RUnlock() + e.eventHandlersCacheLock.RLock() + defer e.eventHandlersCacheLock.RUnlock() key := cloudEventSource.GenerateIdentifier() @@ -202,24 +222,14 @@ func (e *EventEmitter) checkIfEventHandlersExist(cloudEventSource *eventingv1alp } func (e *EventEmitter) startEventLoop(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource, cloudEventSourceMutex sync.Locker) { - consumingInterval := 500 * time.Millisecond - if ch == nil { - ch = make(chan eventdata.EventData, MaxChannelBuffer) - } - for { - tmr := time.NewTimer(consumingInterval) - - e.checkEventHandlers(ctx, cloudEventSource, cloudEventSourceMutex) - select { - case <-tmr.C: - tmr.Stop() - case eventData := <-ch: - log.V(1).Info("Consuming events in queue.") + case eventData := <-e.cloudEventProcessingChan: + e.log.V(1).Info("Consuming events from CloudEventSource.") e.emitEventByHandler(eventData) + e.checkEventHandlers(ctx, cloudEventSource, cloudEventSourceMutex) case <-ctx.Done(): - tmr.Stop() + e.log.V(1).Info("CloudEventSource loop has stopped.") return } } @@ -227,51 +237,45 @@ func (e *EventEmitter) startEventLoop(ctx context.Context, cloudEventSource *eve // checkEventHandlers will check each eventhandler active status func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource, cloudEventSourceMutex sync.Locker) { + e.log.V(1).Info("Checking event handlers status.") cloudEventSourceMutex.Lock() defer cloudEventSourceMutex.Unlock() // Get the latest object - err := e.Client.Get(ctx, types.NamespacedName{Name: cloudEventSource.Name, Namespace: cloudEventSource.Namespace}, cloudEventSource) + err := e.client.Get(ctx, types.NamespacedName{Name: cloudEventSource.Name, Namespace: cloudEventSource.Namespace}, cloudEventSource) if err != nil { - log.Error(err, "error getting cloudEventSource", "cloudEventSource", cloudEventSource) + e.log.Error(err, "error getting cloudEventSource", "cloudEventSource", cloudEventSource) return } - keyprefix := cloudEventSource.GenerateIdentifier() + keyPrefix := cloudEventSource.GenerateIdentifier() needUpdate := false + cloudEventSourceStatus := cloudEventSource.Status.DeepCopy() for k, v := range e.eventHandlersCache { - if strings.Contains(k, keyprefix) { + e.log.V(1).Info("Checking event handler status.", "handler", k, "status", cloudEventSource.Status.Conditions.GetActiveCondition().Status) + if strings.Contains(k, keyPrefix) { if v.GetActiveStatus() != cloudEventSource.Status.Conditions.GetActiveCondition().Status { needUpdate = true - cloudEventSource.Status.Conditions.SetActiveCondition(metav1.ConditionFalse, v1alpha1.ScaledObjectConditionReadySucccesReason, v1alpha1.ScaledObjectConditionReadySuccessMessage) + cloudEventSourceStatus.Conditions.SetActiveCondition( + metav1.ConditionFalse, + eventingv1alpha1.CloudEventSourceConditionFailedReason, + eventingv1alpha1.CloudEventSourceConditionFailedMessage, + ) } } } if needUpdate { - cloudEventSourceStatus := cloudEventSource.Status.DeepCopy() - - transform := func(runtimeObj client.Object, target interface{}) error { - status, ok := target.(*eventingv1alpha1.CloudEventSourceStatus) - if !ok { - return fmt.Errorf("transform target is not eventingv1alpha1.CloudEventSourceStatus type %v", target) - } - switch obj := runtimeObj.(type) { - case *eventingv1alpha1.CloudEventSource: - obj.Status = *status - default: - } - return nil - } - - if err := kedastatus.TransformObject(ctx, e.Client, log, cloudEventSource, cloudEventSourceStatus, transform); err != nil { - log.Error(err, "Failed to update CloudEventSourceStatus") + if updateErr := e.updateCloudEventSourceStatus(ctx, cloudEventSource, cloudEventSourceStatus); updateErr != nil { + e.log.Error(updateErr, "Failed to update CloudEventSource status") } } } // Emit is emitting event to both local kubernetes and custom CloudEventSource handler. After emit event to local kubernetes, event will inqueue and waitng for handler's consuming. -func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedName, eventtype, reason, message string) { - e.EventRecorder.Event(object, eventtype, reason, message) +func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedName, eventType, reason, message string) { + e.recorder.Event(object, eventType, reason, message) + e.eventHandlersCacheLock.RLock() + defer e.eventHandlersCacheLock.RUnlock() if len(e.eventHandlersCache) == 0 { return } @@ -280,7 +284,7 @@ func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedNam eventData := eventdata.EventData{ Namespace: namesapce.Namespace, ObjectName: name, - Eventtype: eventtype, + EventType: eventType, Reason: reason, Message: message, Time: time.Now().UTC(), @@ -290,10 +294,10 @@ func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedNam func (e *EventEmitter) enqueueEventData(eventData eventdata.EventData) { select { - case ch <- eventData: - log.V(1).Info("Event enqueued successfully.") - case <-time.After(MaxWaitingEnqueueTime * time.Second): - log.Error(nil, "Event cannot enqueue. Need to be check if handler can emit events.") + case e.cloudEventProcessingChan <- eventData: + e.log.V(1).Info("Event enqueued successfully.") + case <-time.After(maxWaitingEnqueueTime * time.Second): + e.log.Error(nil, "Failed to enqueue CloudEvent. Need to be check if handler can emit events.") } } @@ -302,9 +306,13 @@ func (e *EventEmitter) enqueueEventData(eventData eventdata.EventData) { // 2. Once there is an error when emitting event, record the handler's key and reqeueu this EventData. // 3. If the maximum number of retries has been exceeded, discard this event. func (e *EventEmitter) emitEventByHandler(eventData eventdata.EventData) { - if eventData.RetryTimes >= MaxRetryTimes { - log.Error(eventData.Err, "Failed to emit Event multiple times. Will drop this event and need to check if event endpoint works well", "CloudEventSource", eventData.ObjectName) - e.emitErrorHandle(eventData, eventData.Err) + if eventData.RetryTimes >= maxRetryTimes { + e.log.Error(eventData.Err, "Failed to emit Event multiple times. Will drop this event and need to check if event endpoint works well", "CloudEventSource", eventData.ObjectName) + handler, found := e.eventHandlersCache[eventData.HandlerKey] + if found { + e.log.V(1).Info("Set handler failure status. 1", "handler", eventData.HandlerKey) + handler.SetActiveStatus(metav1.ConditionFalse) + } return } @@ -314,11 +322,11 @@ func (e *EventEmitter) emitEventByHandler(eventData eventdata.EventData) { if handler.GetActiveStatus() == metav1.ConditionTrue { go handler.EmitEvent(eventData, e.emitErrorHandle) } else { - log.V(1).Info("EventHandler's status is not active. Please check if event endpoint works well", "CloudEventSource", eventData.ObjectName) + e.log.V(1).Info("EventHandler's status is not active. Please check if event endpoint works well", "CloudEventSource", eventData.ObjectName) } } } else { - log.Info("Reemit event failed", "handler", eventData.HandlerKey, "retry times", eventData.RetryTimes) + e.log.Info("Failed to emit event", "handler", eventData.HandlerKey, "retry times", fmt.Sprintf("%d/%d", eventData.RetryTimes, maxRetryTimes), "error", eventData.Err) handler, found := e.eventHandlersCache[eventData.HandlerKey] if found && handler.GetActiveStatus() == metav1.ConditionTrue { go handler.EmitEvent(eventData, e.emitErrorHandle) @@ -327,8 +335,8 @@ func (e *EventEmitter) emitEventByHandler(eventData eventdata.EventData) { } func (e *EventEmitter) emitErrorHandle(eventData eventdata.EventData, err error) { - if eventData.RetryTimes >= MaxRetryTimes { - log.V(1).Info("Failed to emit Event multiple times. Will set handler failure status.", "handler", eventData.HandlerKey, "retry times", eventData.RetryTimes) + if eventData.RetryTimes >= maxRetryTimes { + e.log.V(1).Info("Failed to emit Event multiple times. Will set handler failure status.", "handler", eventData.HandlerKey, "retry times", eventData.RetryTimes) handler, found := e.eventHandlersCache[eventData.HandlerKey] if found { handler.SetActiveStatus(metav1.ConditionFalse) @@ -342,3 +350,41 @@ func (e *EventEmitter) emitErrorHandle(eventData eventdata.EventData, err error) requeueData.Err = err e.enqueueEventData(requeueData) } + +func (e *EventEmitter) setCloudEventSourceStatusActive(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error { + cloudEventSourceStatus := cloudEventSource.Status.DeepCopy() + cloudEventSourceStatus.Conditions.SetActiveCondition( + metav1.ConditionTrue, + eventingv1alpha1.CloudEventSourceConditionActiveReason, + eventingv1alpha1.CloudEventSourceConditionActiveMessage, + ) + return e.updateCloudEventSourceStatus(ctx, cloudEventSource, cloudEventSourceStatus) +} + +func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource, cloudEventSourceStatus *eventingv1alpha1.CloudEventSourceStatus) error { + e.log.V(1).Info("Updating CloudEventSource status", "CloudEventSource", cloudEventSource.Name) + transform := func(runtimeObj client.Object, target interface{}) error { + status, ok := target.(*eventingv1alpha1.CloudEventSourceStatus) + if !ok { + return fmt.Errorf("transform target is not eventingv1alpha1.CloudEventSourceStatus type %v", target) + } + switch obj := runtimeObj.(type) { + case *eventingv1alpha1.CloudEventSource: + e.log.V(1).Info("New CloudEventSource status", "status", *status) + obj.Status = *status + default: + } + return nil + } + + if err := kedastatus.TransformObject(ctx, e.client, e.log, cloudEventSource, cloudEventSourceStatus, transform); err != nil { + e.log.Error(err, "Failed to update CloudEventSourceStatus") + return err + } + + return nil +} + +func newEventHandlerKey(kindNamespaceName string, handlerType string) string { + return fmt.Sprintf("%s.%s", kindNamespaceName, handlerType) +} diff --git a/pkg/eventemitter/eventemitter_test.go b/pkg/eventemitter/eventemitter_test.go index baf89dc1607..b9426952cf2 100644 --- a/pkg/eventemitter/eventemitter_test.go +++ b/pkg/eventemitter/eventemitter_test.go @@ -63,21 +63,23 @@ func TestEventHandler_FailedEmitEvent(t *testing.T) { } caches := map[string]EventDataHandler{} - caches[cloudEventSource.GenerateIdentifier()+CloudEventHTTP] = eventHandler + key := newEventHandlerKey(cloudEventSource.GenerateIdentifier(), cloudEventHandlerTypeHTTP) + caches[key] = eventHandler eventEmitter := EventEmitter{ - Client: mockClient, - EventRecorder: recorder, - clustername: "cluster-name", - eventHandlersCache: caches, - eventHandlersCachesLock: &sync.RWMutex{}, - eventLoopContexts: &sync.Map{}, + client: mockClient, + recorder: recorder, + clusterName: "cluster-name", + eventHandlersCache: caches, + eventHandlersCacheLock: &sync.RWMutex{}, + eventLoopContexts: &sync.Map{}, + cloudEventProcessingChan: make(chan eventdata.EventData, 1), } eventData := eventdata.EventData{ Namespace: "aaa", ObjectName: "bbb", - Eventtype: "ccc", + EventType: "ccc", Reason: "ddd", Message: "eee", Time: time.Now().UTC(), @@ -123,21 +125,23 @@ func TestEventHandler_DirectCall(t *testing.T) { } caches := map[string]EventDataHandler{} - caches[cloudEventSource.GenerateIdentifier()+CloudEventHTTP] = eventHandler + key := newEventHandlerKey(cloudEventSource.GenerateIdentifier(), cloudEventHandlerTypeHTTP) + caches[key] = eventHandler eventEmitter := EventEmitter{ - Client: mockClient, - EventRecorder: recorder, - clustername: "cluster-name", - eventHandlersCache: caches, - eventHandlersCachesLock: &sync.RWMutex{}, - eventLoopContexts: &sync.Map{}, + client: mockClient, + recorder: recorder, + clusterName: "cluster-name", + eventHandlersCache: caches, + eventHandlersCacheLock: &sync.RWMutex{}, + eventLoopContexts: &sync.Map{}, + cloudEventProcessingChan: make(chan eventdata.EventData, 1), } eventData := eventdata.EventData{ Namespace: "aaa", ObjectName: "bbb", - Eventtype: "ccc", + EventType: "ccc", Reason: "ddd", Message: "eee", Time: time.Now().UTC(), diff --git a/pkg/scaling/executor/scale_scaledobjects.go b/pkg/scaling/executor/scale_scaledobjects.go index 2e1ed376a96..908237a275c 100644 --- a/pkg/scaling/executor/scale_scaledobjects.go +++ b/pkg/scaling/executor/scale_scaledobjects.go @@ -74,7 +74,7 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al readyCondition := scaledObject.Status.Conditions.GetReadyCondition() if !isError && !readyCondition.IsTrue() { if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionTrue, - kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil { + kedav1alpha1.ScaledObjectConditionReadySuccessReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil { logger.Error(err, "error setting ready condition") } } @@ -83,7 +83,7 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al pausedCount, err := GetPausedReplicaCount(scaledObject) if err != nil { if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionFalse, - kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil { + kedav1alpha1.ScaledObjectConditionReadySuccessReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil { logger.Error(err, "error setting ready condition") } logger.Error(err, "error getting the paused replica count on the current ScaledObject.") @@ -97,7 +97,7 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al if err != nil { logger.Error(err, "error scaling target to paused replicas count", "paused replicas", *pausedCount) if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionUnknown, - kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil { + kedav1alpha1.ScaledObjectConditionReadySuccessReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil { logger.Error(err, "error setting ready condition") } return