Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

Commit

Permalink
feat: Introduce skipping of automatic event responses per task handler (
Browse files Browse the repository at this point in the history
#537)

* feat: Introduce skipping of automatic event responses per task handler

Signed-off-by: warber <[email protected]>

* cleanup

Signed-off-by: warber <[email protected]>

Signed-off-by: warber <[email protected]>
  • Loading branch information
warber authored Sep 7, 2022
1 parent f07eb2f commit 278ad8b
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 6 deletions.
35 changes: 29 additions & 6 deletions pkg/sdk/keptn.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,41 @@ func (w *nopWG) Wait() {
// --
}

// WithTaskHandler registers a handler which is responsible for processing a .triggered event
// WithTaskHandler registers a handler which is responsible for processing a .triggered event.
// Note, that if you want to have more control on configuring the behavior of the task handler,
// you can use WithTaskEventHandler instead
func WithTaskHandler(eventType string, handler TaskHandler, filters ...func(keptnHandle IKeptn, event KeptnEvent) bool) KeptnOption {
return WithTaskEventHandler(eventType, handler, TaskHandlerOptions{
Filters: filters,
SkipAutomaticResponse: false,
})
}

// WithTaskEventHandler registers a handler which is responsible for processing a received .triggered event
func WithTaskEventHandler(eventType string, handler TaskHandler, options TaskHandlerOptions) KeptnOption {
return func(k *Keptn) {
k.taskRegistry.Add(eventType, taskEntry{taskHandler: handler, eventFilters: filters})
k.taskRegistry.Add(eventType, taskEntry{taskHandler: handler, eventFilters: options.Filters, taskHandlerOpts: options})
}
}

// WithAutomaticResponse sets the option to instruct the sdk to automatically send a .started and .finished event.
// Per default this behavior is turned on and can be disabled with this function
// Per default this behavior is turned on and can be disabled with this function. Note, that this affects ALL
// task handlers. If you want to disable automatic event responses for a specific task handler, this can be done
// with the respective TaskHandlerOptions passed to WithTaskEventHandler
func WithAutomaticResponse(autoResponse bool) KeptnOption {
return func(k *Keptn) {
k.automaticEventResponse = autoResponse
}
}

// TaskHandlerOptions are specific options for a task handler
type TaskHandlerOptions struct {
// Filters specifies functions that determine whether the event shall be handled or ignored
Filters []func(IKeptn, KeptnEvent) bool
// SkipAutomaticResponse determines whether automatic sending of .started/.finished events should be skipped
SkipAutomaticResponse bool
}

// WithGracefulShutdown sets the option to ensure running tasks/handlers will finish in case of interrupt or forced termination
// Per default this behavior is turned on and can be disabled with this function
func WithGracefulShutdown(gracefulShutdown bool) KeptnOption {
Expand Down Expand Up @@ -251,8 +271,11 @@ func (k *Keptn) OnEvent(ctx context.Context, event models.KeptnContextExtendedCE
}
}

// automatic response of events is enabled if it is turned on globally, and not disabled for the specific handler
autoResponse := k.automaticEventResponse && !k.taskRegistry.Get(*event.Type).taskHandlerOpts.SkipAutomaticResponse

// only respond with .started event if the incoming event is a task.triggered event
if keptnv2.IsTaskEventType(*event.Type) && keptnv2.IsTriggeredEventType(*event.Type) && k.automaticEventResponse {
if keptnv2.IsTaskEventType(*event.Type) && keptnv2.IsTriggeredEventType(*event.Type) && autoResponse {
startedEvent, err := createStartedEvent(k.source, event)
if err != nil {
k.logger.Errorf("Unable to create '.started' event from '.triggered' event: %v", err)
Expand All @@ -267,7 +290,7 @@ func (k *Keptn) OnEvent(ctx context.Context, event models.KeptnContextExtendedCE
result, err := handler.taskHandler.Execute(k, *keptnEvent)
if err != nil {
k.logger.Errorf("Error during task execution %v", err.Err)
if k.automaticEventResponse {
if autoResponse {
errorEvent, err := createErrorEvent(k.source, event, result, err)
if err != nil {
k.logger.Errorf("Unable to create '.error' event: %v", err)
Expand All @@ -282,7 +305,7 @@ func (k *Keptn) OnEvent(ctx context.Context, event models.KeptnContextExtendedCE
}
if result == nil {
k.logger.Infof("no finished data set by task executor for event %s. Skipping sending finished event", *event.Type)
} else if keptnv2.IsTaskEventType(*event.Type) && keptnv2.IsTriggeredEventType(*event.Type) && k.automaticEventResponse {
} else if keptnv2.IsTaskEventType(*event.Type) && keptnv2.IsTriggeredEventType(*event.Type) && autoResponse {
finishedEvent, err := createFinishedEvent(k.source, event, result)
if err != nil {
k.logger.Errorf("Unable to create '.finished' event: %v", err)
Expand Down
6 changes: 6 additions & 0 deletions pkg/sdk/keptn_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ func (f *FakeKeptn) SetAPI(api api.KeptnInterface) {
f.Keptn.api = api
}

func (f *FakeKeptn) AddTaskEventHandler(eventType string, handler TaskHandler, options TaskHandlerOptions) {
f.Keptn.taskRegistry.Add(eventType, taskEntry{taskHandler: handler, eventFilters: options.Filters, taskHandlerOpts: options})
}

// AddTaskEventHandler registers a TaskHandler
// Deprecated: use AddTaskEventHandler
func (f *FakeKeptn) AddTaskHandler(eventType string, handler TaskHandler, filters ...func(keptnHandle IKeptn, event KeptnEvent) bool) {
f.AddTaskHandlerWithSubscriptionID(eventType, handler, "", filters...)
}
Expand Down
59 changes: 59 additions & 0 deletions pkg/sdk/keptn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,65 @@ func Test_WhenReceivingAnEvent_StartedEventAndFinishedEventsAreSent(t *testing.T
fakeKeptn.AssertSentEventType(t, 1, "sh.keptn.event.faketask.finished")
}

func Test_WhenReceivingAnEvent_AndAutomaticEventResponseIsGloballyDiabled_StartedEventAndFinishedEventsAreNotSent(t *testing.T) {
taskHandler := &TaskHandlerMock{}
taskHandler.ExecuteFunc = func(keptnHandle IKeptn, event KeptnEvent) (interface{}, *Error) { return FakeTaskData{}, nil }
fakeKeptn := NewFakeKeptn("fake")
fakeKeptn.Keptn.automaticEventResponse = false
fakeKeptn.AddTaskEventHandler("sh.keptn.event.faketask.triggered", taskHandler, TaskHandlerOptions{})
fakeKeptn.AddTaskEventHandler("sh.keptn.event.faketask2.triggered", taskHandler, TaskHandlerOptions{})
fakeKeptn.NewEvent(models.KeptnContextExtendedCE{
Data: v0_2_0.EventData{Project: "prj", Stage: "stg", Service: "svc"},
ID: "id",
Shkeptncontext: "context",
Source: strutils.Stringp("source"),
Type: strutils.Stringp("sh.keptn.event.faketask.triggered"),
})

fakeKeptn.NewEvent(models.KeptnContextExtendedCE{
Data: v0_2_0.EventData{Project: "prj", Stage: "stg", Service: "svc"},
ID: "id",
Shkeptncontext: "context",
Source: strutils.Stringp("source"),
Type: strutils.Stringp("sh.keptn.event.faketask2.triggered"),
})

fakeKeptn.AssertNumberOfEventSent(t, 0)
}

func Test_WhenReceivingAnEvent_AndAutomaticEventResponseIsDisabledOnTaskHandler_StartedEventAndFinishedEventsAreNotSent(t *testing.T) {
taskHandler := &TaskHandlerMock{}
taskHandler.ExecuteFunc = func(keptnHandle IKeptn, event KeptnEvent) (interface{}, *Error) { return FakeTaskData{}, nil }
fakeKeptn := NewFakeKeptn("fake")
fakeKeptn.AddTaskEventHandler("sh.keptn.event.faketask.triggered", taskHandler, TaskHandlerOptions{
Filters: nil,
SkipAutomaticResponse: true,
})
fakeKeptn.AddTaskEventHandler("sh.keptn.event.faketask2.triggered", taskHandler, TaskHandlerOptions{
Filters: nil,
SkipAutomaticResponse: false,
})
fakeKeptn.NewEvent(models.KeptnContextExtendedCE{
Data: v0_2_0.EventData{Project: "prj", Stage: "stg", Service: "svc"},
ID: "id",
Shkeptncontext: "context",
Source: strutils.Stringp("source"),
Type: strutils.Stringp("sh.keptn.event.faketask.triggered"),
})

fakeKeptn.NewEvent(models.KeptnContextExtendedCE{
Data: v0_2_0.EventData{Project: "prj", Stage: "stg", Service: "svc"},
ID: "id",
Shkeptncontext: "context",
Source: strutils.Stringp("source"),
Type: strutils.Stringp("sh.keptn.event.faketask2.triggered"),
})

fakeKeptn.AssertNumberOfEventSent(t, 2)
fakeKeptn.AssertSentEventType(t, 0, "sh.keptn.event.faketask2.started")
fakeKeptn.AssertSentEventType(t, 1, "sh.keptn.event.faketask2.finished")
}

func Test_WhenReceivingAnEvent_TaskHandlerFails(t *testing.T) {
taskHandler := &TaskHandlerMock{}
taskHandler.ExecuteFunc = func(keptnHandle IKeptn, event KeptnEvent) (interface{}, *Error) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sdk/taskregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type taskEntry struct {
taskHandler TaskHandler
// eventFilters is a list of functions that are executed before a task is handled by the taskHandler. Only if all functions return 'true', the task will be handled
eventFilters []func(keptnHandle IKeptn, event KeptnEvent) bool
// taskHandlerOpts are the options for the handler
taskHandlerOpts TaskHandlerOptions
}

func newTaskMap() *taskRegistry {
Expand Down

0 comments on commit 278ad8b

Please sign in to comment.