Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add cloud events support #1843

Merged
merged 4 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/en/docs/crd-ref/options/v1alpha1/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,6 @@ _Appears in:_
| --- | --- |
| `OTelCollectorUrl` _string_ | OTelCollectorUrl can be used to set the Open Telemetry collector that the lifecycle operator should use |
| `keptnAppCreationRequestTimeoutSeconds` _integer_ | KeptnAppCreationRequestTimeoutSeconds is used to set the interval in which automatic app discovery searches for workload to put into the same auto-generated KeptnApp |
| `cloudEventsEndpoint` _string_ | CloudEventsEndpoint can be used to set the endpoint where Cloud Events should be posted by the lifecycle operator |


4 changes: 4 additions & 0 deletions helm/chart/templates/keptnconfig-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ spec:
description: OTelCollectorUrl can be used to set the Open Telemetry
collector that the lifecycle operator should use
type: string
cloudEventsEndpoint:
description: CloudEventsEndpoint can be used to set the endpoint where
Cloud Events should be posted by the lifecycle operator
type: string
keptnAppCreationRequestTimeoutSeconds:
default: 30
description: KeptnAppCreationRequestTimeoutSeconds is used to set the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ type KeptnConfigSpec struct {
// OTelCollectorUrl can be used to set the Open Telemetry collector that the lifecycle operator should use
// +optional
OTelCollectorUrl string `json:"OTelCollectorUrl,omitempty"`

// KeptnAppCreationRequestTimeoutSeconds is used to set the interval in which automatic app discovery
// searches for workload to put into the same auto-generated KeptnApp
// +kubebuilder:default:=30
// +optional
KeptnAppCreationRequestTimeoutSeconds uint `json:"keptnAppCreationRequestTimeoutSeconds,omitempty"`

// CloudEventsEndpoint can be used to set the endpoint where Cloud Events should be posted by the lifecycle operator
// +optional
CloudEventsEndpoint string `json:"cloudEventsEndpoint,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ spec:
description: OTelCollectorUrl can be used to set the Open Telemetry
collector that the lifecycle operator should use
type: string
cloudEventsEndpoint:
description: CloudEventsEndpoint can be used to set the endpoint where
Cloud Events should be posted by the lifecycle operator
type: string
keptnAppCreationRequestTimeoutSeconds:
default: 30
description: KeptnAppCreationRequestTimeoutSeconds is used to set
Expand Down
11 changes: 11 additions & 0 deletions lifecycle-operator/controllers/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ const defaultKeptnAppCreationRequestTimeout = 30 * time.Second
type IConfig interface {
SetCreationRequestTimeout(value time.Duration)
GetCreationRequestTimeout() time.Duration
SetCloudEventsEndpoint(endpoint string)
GetCloudEventsEndpoint() string
}

type ControllerConfig struct {
keptnAppCreationRequestTimeout time.Duration
cloudEventsEndpoint string
}

var instance *ControllerConfig
Expand All @@ -34,3 +37,11 @@ func (o *ControllerConfig) SetCreationRequestTimeout(value time.Duration) {
func (o *ControllerConfig) GetCreationRequestTimeout() time.Duration {
return o.keptnAppCreationRequestTimeout
}

func (o *ControllerConfig) SetCloudEventsEndpoint(endpoint string) {
o.cloudEventsEndpoint = endpoint
}

func (o *ControllerConfig) GetCloudEventsEndpoint() string {
return o.cloudEventsEndpoint
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func TestEvaluationHandler(t *testing.T) {
handler := EvaluationHandler{
SpanHandler: &spanHandlerMock,
Log: ctrl.Log.WithName("controller"),
EventSender: NewEventSender(fakeRecorder),
EventSender: NewK8sSender(fakeRecorder),
Client: fake.NewClientBuilder().WithObjects(&tt.evalObj).Build(),
Tracer: trace.NewNoopTracerProvider().Tracer("tracer"),
Scheme: scheme.Scheme,
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestEvaluationHandler_createEvaluation(t *testing.T) {
handler := EvaluationHandler{
SpanHandler: &kltfake.ISpanHandlerMock{},
Log: ctrl.Log.WithName("controller"),
EventSender: NewEventSender(record.NewFakeRecorder(100)),
EventSender: NewK8sSender(record.NewFakeRecorder(100)),
Client: fake.NewClientBuilder().Build(),
Tracer: trace.NewNoopTracerProvider().Tracer("tracer"),
Scheme: scheme.Scheme,
Expand Down
91 changes: 84 additions & 7 deletions lifecycle-operator/controllers/common/eventsender.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package common

import (
"context"
"fmt"
"strings"

ce "github.com/cloudevents/sdk-go/v2"
"github.com/go-logr/logr"
apicommon "github.com/keptn/lifecycle-toolkit/lifecycle-operator/apis/lifecycle/v1alpha3/common"
"github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/common/config"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -15,28 +20,100 @@ type IEvent interface {

// ===== Main =====

func NewEventSender(recorder record.EventRecorder) IEvent {
return newK8sSender(recorder)
type EventMultiplexer struct {
logger logr.Logger
emitters []IEvent
}

func NewEventMultiplexer(logger logr.Logger, recorder record.EventRecorder, client ce.Client) *EventMultiplexer {
multiplexer := &EventMultiplexer{
logger: logger,
}
multiplexer.register(newCloudEventSender(logger, client))
multiplexer.register(NewK8sSender(recorder))
return multiplexer
}

func (e *EventMultiplexer) register(emitter IEvent) {
if emitter != nil {
e.emitters = append(e.emitters, emitter)
}
}

func (e *EventMultiplexer) Emit(phase apicommon.KeptnPhaseType, eventType string, reconcileObject client.Object, status string, message string, version string) {
for _, emitter := range e.emitters {
thisthat marked this conversation as resolved.
Show resolved Hide resolved
e.logger.Info(fmt.Sprintf("Emitting event using %T", emitter))
emitter.Emit(phase, eventType, reconcileObject, status, message, version)
}
}

// ===== Cloud Event Sender =====
// TODO: implement Cloud Event logic

type cloudEvent struct {
client ce.Client
logger logr.Logger
}

func newCloudEventSender(logger logr.Logger, client ce.Client) *cloudEvent {
odubajDT marked this conversation as resolved.
Show resolved Hide resolved
return &cloudEvent{
client: client,
logger: logger,
}
}

// Emit creates a Cloud Event and send it to the endpoint
func (e *cloudEvent) Emit(phase apicommon.KeptnPhaseType, eventType string, reconcileObject client.Object, status string, message string, version string) {
endpoint := config.Instance().GetCloudEventsEndpoint()
if endpoint == "" {
// if no endpoint is configured we don't emit any event
if !strings.HasPrefix(endpoint, "http") {
e.logger.V(5).Info(fmt.Sprintf("CloudEvent endpoint configured but it does not start with http: %s", endpoint))
}
return
}
event := ce.NewEvent()
event.SetSource("keptn.sh")
event.SetType(fmt.Sprintf("%s.%s", phase.LongName, status))
thisthat marked this conversation as resolved.
Show resolved Hide resolved

msg := setEventMessage(phase, reconcileObject, message, version)
bacherfl marked this conversation as resolved.
Show resolved Hide resolved
err := event.SetData(ce.ApplicationJSON, map[string]interface{}{
"message": msg,
"type": eventType,
"version": version,
thisthat marked this conversation as resolved.
Show resolved Hide resolved
"resource": map[string]string{
"group": reconcileObject.GetObjectKind().GroupVersionKind().Group,
"kind": reconcileObject.GetObjectKind().GroupVersionKind().Kind,
"version": reconcileObject.GetObjectKind().GroupVersionKind().Version,
"name": reconcileObject.GetName(),
"namespace": reconcileObject.GetNamespace(),
},
})
if err != nil {
e.logger.V(5).Info(fmt.Sprintf("Failed to set data for CloudEvent: %v", err))
return
}

ctx := ce.ContextWithTarget(context.TODO(), endpoint)
bacherfl marked this conversation as resolved.
Show resolved Hide resolved
if result := e.client.Send(ctx, event); ce.IsUndelivered(result) {
e.logger.V(5).Info(fmt.Sprintf("Failed to send CloudEvent: %v", event))
}
}

// ===== K8s Event Sender =====

type k8sEvent struct {
recorder record.EventRecorder
}

func newK8sSender(recorder record.EventRecorder) IEvent {
func NewK8sSender(recorder record.EventRecorder) IEvent {
odubajDT marked this conversation as resolved.
Show resolved Hide resolved
return &k8sEvent{
recorder: recorder,
}
}

// SendEvent creates k8s Event and adds it to Eventqueue
func (s *k8sEvent) Emit(phase apicommon.KeptnPhaseType, eventType string, reconcileObject client.Object, status string, message string, version string) {
// Emit creates k8s Event and adds it to Eventqueue
func (e *k8sEvent) Emit(phase apicommon.KeptnPhaseType, eventType string, reconcileObject client.Object, status string, message string, version string) {
msg := setEventMessage(phase, reconcileObject, message, version)
annotations := setAnnotations(reconcileObject, phase)
s.recorder.AnnotatedEventf(reconcileObject, annotations, eventType, fmt.Sprintf("%s%s", phase.ShortName, status), msg)
e.recorder.AnnotatedEventf(reconcileObject, annotations, eventType, fmt.Sprintf("%s%s", phase.ShortName, status), msg)
}
Loading