From 8655d7329e7c5008d7acd95411a8dd00e2c456e4 Mon Sep 17 00:00:00 2001 From: Bernd Warmuth <72415058+warber@users.noreply.github.com> Date: Thu, 6 Oct 2022 13:36:01 +0200 Subject: [PATCH] fix: Fix event handling for duplicated subscriptions in `go-sdk` remote execution plane use case (#542) * fixed issue and improved logging Signed-off-by: warber * set logger for eventsource correctly Signed-off-by: warber * review Signed-off-by: warber Signed-off-by: warber --- .../connector/controlplane/controlplane.go | 10 +- .../controlplane/controlplane_test.go | 111 ++++++++++++++++++ pkg/sdk/connector/eventsource/http/http.go | 16 ++- pkg/sdk/connector/eventsource/http/utils.go | 17 +++ .../connector/eventsource/http/utils_test.go | 83 +++++++++++++ pkg/sdk/connector/types/types.go | 5 +- pkg/sdk/internal/api/initializer.go | 2 +- 7 files changed, 236 insertions(+), 8 deletions(-) diff --git a/pkg/sdk/connector/controlplane/controlplane.go b/pkg/sdk/connector/controlplane/controlplane.go index d51a7463..d55c9538 100644 --- a/pkg/sdk/connector/controlplane/controlplane.go +++ b/pkg/sdk/connector/controlplane/controlplane.go @@ -199,13 +199,17 @@ func (cp *ControlPlane) stopComponents() { func (cp *ControlPlane) handle(ctx context.Context, eventUpdate types.EventUpdate, integration Integration) error { cp.logger.Debugf("Received an event of type: %s", *eventUpdate.KeptnEvent.Type) + // if we already know the subscription ID we can just forward the event to be handled + if eventUpdate.SubscriptionID != "" { + return cp.forwardMatchedEvent(ctx, eventUpdate, integration, eventUpdate.SubscriptionID) + } for _, subscription := range cp.currentSubscriptions { if subscription.Event == eventUpdate.MetaData.Subject { cp.logger.Debugf("Check if event matches subscription %s", subscription.ID) matcher := eventmatcher.New(subscription) if matcher.Matches(eventUpdate.KeptnEvent) { cp.logger.Info("Forwarding matched event update: ", eventUpdate.KeptnEvent.ID) - if err := cp.forwardMatchedEvent(ctx, eventUpdate, integration, subscription); err != nil { + if err := cp.forwardMatchedEvent(ctx, eventUpdate, integration, subscription.ID); err != nil { return err } } @@ -228,7 +232,7 @@ func (cp *ControlPlane) getSender(sender types.EventSender) types.EventSender { } } -func (cp *ControlPlane) forwardMatchedEvent(ctx context.Context, eventUpdate types.EventUpdate, integration Integration, subscription models.EventSubscription) error { +func (cp *ControlPlane) forwardMatchedEvent(ctx context.Context, eventUpdate types.EventUpdate, integration Integration, subscriptionID string) error { // increase the eventHandler WaitGroup cp.eventHandlerWaitGroup.Add(1) // when the event handler is done, decrease the WaitGroup again @@ -237,7 +241,7 @@ func (cp *ControlPlane) forwardMatchedEvent(ctx context.Context, eventUpdate typ err := eventUpdate.KeptnEvent.AddTemporaryData( tmpDataDistributorKey, types.AdditionalSubscriptionData{ - SubscriptionID: subscription.ID, + SubscriptionID: subscriptionID, }, models.AddTemporaryDataOptions{ OverwriteIfExisting: true, diff --git a/pkg/sdk/connector/controlplane/controlplane_test.go b/pkg/sdk/connector/controlplane/controlplane_test.go index 15c00a5a..b9b3934d 100644 --- a/pkg/sdk/connector/controlplane/controlplane_test.go +++ b/pkg/sdk/connector/controlplane/controlplane_test.go @@ -215,6 +215,117 @@ func TestControlPlaneInboundEventIsForwardedToIntegration(t *testing.T) { }, 5*time.Second, 100*time.Millisecond) } +func TestControlPlaneInboundEventWithSubscriptionIDIsForwardedToIntegration(t *testing.T) { + var eventChan chan types.EventUpdate + var subsChan chan []models.EventSubscription + var integrationReceivedEvent models.KeptnContextExtendedCE + var subscriptionSourceStopCalled bool + var eventSourceStopCalled bool + + mtx := sync.RWMutex{} + + callBackSender := func(ce models.KeptnContextExtendedCE) error { return nil } + + ssm := &fake.SubscriptionSourceMock{ + StartFn: func(ctx context.Context, data types.RegistrationData, c chan []models.EventSubscription, errC chan error, wg *sync.WaitGroup) error { + mtx.Lock() + defer mtx.Unlock() + subsChan = c + wg.Done() + return nil + }, + RegisterFn: func(integration models.Integration) (string, error) { + return "some-id", nil + }, + StopFn: func() error { + mtx.Lock() + defer mtx.Unlock() + subscriptionSourceStopCalled = true + return nil + }, + } + esm := &fake.EventSourceMock{ + StartFn: func(ctx context.Context, data types.RegistrationData, ces chan types.EventUpdate, errC chan error, wg *sync.WaitGroup) error { + mtx.Lock() + defer mtx.Unlock() + eventChan = ces + wg.Done() + return nil + }, + OnSubscriptionUpdateFn: func(strings []models.EventSubscription) {}, + SenderFn: func() types.EventSender { return callBackSender }, + StopFn: func() error { + mtx.Lock() + defer mtx.Unlock() + eventSourceStopCalled = true + return nil + }, + CleanupFn: func() error { + return nil + }, + } + fm := &LogForwarderMock{ + ForwardFn: func(keptnEvent models.KeptnContextExtendedCE, integrationID string) error { + return nil + }, + } + + controlPlane := New(ssm, esm, fm) + + integration := ExampleIntegration{ + RegistrationDataFn: func() types.RegistrationData { return types.RegistrationData{} }, + OnEventFn: func(ctx context.Context, ce models.KeptnContextExtendedCE) error { + mtx.Lock() + defer mtx.Unlock() + integrationReceivedEvent = ce + return nil + }, + } + ctx, cancel := context.WithCancel(context.TODO()) + go controlPlane.Register(ctx, integration) + require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() + return subsChan != nil + }, time.Second, time.Millisecond*100) + require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() + return eventChan != nil + }, time.Second, time.Millisecond*100) + + // event update containing explicit subscriptions ID + eventUpdate2 := types.EventUpdate{KeptnEvent: models.KeptnContextExtendedCE{ID: "EVENT_ID2", Type: strutils.Stringp("sh.keptn.event.echo.triggered")}, SubscriptionID: "SUBSCRIPTION_ID"} + eventChan <- eventUpdate2 + + require.Eventually(t, func() bool { + mtx.Lock() + defer mtx.Unlock() + eventUpdate2.KeptnEvent.Data = integrationReceivedEvent.Data + return reflect.DeepEqual(eventUpdate2.KeptnEvent, integrationReceivedEvent) + }, time.Second, time.Millisecond*100) + + eventData := map[string]interface{}{} + err := integrationReceivedEvent.DataAs(&eventData) + require.Nil(t, err) + + require.Equal(t, map[string]interface{}{ + "temporaryData": map[string]interface{}{ + "distributor": map[string]interface{}{ + "subscriptionID": "SUBSCRIPTION_ID", + }, + }, + }, eventData) + + cancel() + + require.Eventually(t, func() bool { + mtx.RLock() + defer mtx.RUnlock() + return subscriptionSourceStopCalled && eventSourceStopCalled + }, 5*time.Second, 100*time.Millisecond) +} + func TestControlPlaneInboundEventIsForwardedToIntegrationWithoutLogForwarder(t *testing.T) { var eventChan chan types.EventUpdate var subsChan chan []models.EventSubscription diff --git a/pkg/sdk/connector/eventsource/http/http.go b/pkg/sdk/connector/eventsource/http/http.go index b9c22e42..710d2bbc 100644 --- a/pkg/sdk/connector/eventsource/http/http.go +++ b/pkg/sdk/connector/eventsource/http/http.go @@ -104,6 +104,9 @@ func (hes *HTTPEventSource) Start(ctx context.Context, data types.RegistrationDa func (hes *HTTPEventSource) OnSubscriptionUpdate(subscriptions []models.EventSubscription) { hes.mutex.Lock() defer hes.mutex.Unlock() + if subscriptionDiffer(subscriptions, hes.currentSubscriptions) { + hes.logger.Infof("Got new subscriptions: %v", getEvents(subscriptions)) + } hes.currentSubscriptions = subscriptions } @@ -135,8 +138,9 @@ func (hes *HTTPEventSource) doPoll(eventUpdates chan types.EventUpdate) error { continue } eventUpdates <- types.EventUpdate{ - KeptnEvent: *e, - MetaData: types.EventUpdateMetaData{Subject: sub.Event}, + KeptnEvent: *e, + MetaData: types.EventUpdateMetaData{Subject: sub.Event}, + SubscriptionID: sub.ID, } hes.cache.Add(sub.ID, e.ID) } @@ -166,3 +170,11 @@ func getEventFilterForSubscription(subscription models.EventSubscription) api.Ev return eventFilter } + +func getEvents(subscriptions []models.EventSubscription) []string { + events := []string{} + for _, s := range subscriptions { + events = append(events, s.Event) + } + return events +} diff --git a/pkg/sdk/connector/eventsource/http/utils.go b/pkg/sdk/connector/eventsource/http/utils.go index 28efabf4..cd2487d0 100644 --- a/pkg/sdk/connector/eventsource/http/utils.go +++ b/pkg/sdk/connector/eventsource/http/utils.go @@ -141,6 +141,23 @@ func dedup(elements []string) []string { return result } +// subscriptionDiffer checks whether two lists of event subscriptions are different based on the +// event type they are targeting +func subscriptionDiffer(s1 []models.EventSubscription, s2 []models.EventSubscription) bool { + for _, ns := range s1 { + found := false + for _, os := range s2 { + if os.Event == ns.Event { + found = true + } + } + if !found { + return true + } + } + return len(s1) != len(s2) +} + func ToIds(events []*models.KeptnContextExtendedCE) []string { ids := []string{} for _, e := range events { diff --git a/pkg/sdk/connector/eventsource/http/utils_test.go b/pkg/sdk/connector/eventsource/http/utils_test.go index a45ff8e1..c012bc27 100644 --- a/pkg/sdk/connector/eventsource/http/utils_test.go +++ b/pkg/sdk/connector/eventsource/http/utils_test.go @@ -1,6 +1,7 @@ package http import ( + "github.com/keptn/go-utils/pkg/api/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "testing" @@ -95,3 +96,85 @@ func TestKeep(t *testing.T) { assert.False(t, cache.Contains("t2", "e4")) assert.True(t, cache.Contains("t2", "e5")) } + +func Test_subscriptionDiffer(t *testing.T) { + type args struct { + new []models.EventSubscription + old []models.EventSubscription + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "equal empty", + args: args{ + new: []models.EventSubscription{}, + old: []models.EventSubscription{}, + }, + want: false, + }, + { + name: "equal", + args: args{ + new: []models.EventSubscription{{Event: "e1"}}, + old: []models.EventSubscription{{Event: "e1"}}, + }, + want: false, + }, + { + name: "differ1", + args: args{ + new: []models.EventSubscription{{Event: "e1"}}, + old: []models.EventSubscription{{Event: "e2"}}, + }, + want: true, + }, + { + name: "differ2", + args: args{ + new: []models.EventSubscription{{Event: "e2"}}, + old: []models.EventSubscription{{Event: "e1"}}, + }, + want: true, + }, + { + name: "differ3", + args: args{ + new: []models.EventSubscription{{Event: "e2"}, {Event: "e1"}}, + old: []models.EventSubscription{{Event: "e1"}}, + }, + want: true, + }, + { + name: "differ4", + args: args{ + new: []models.EventSubscription{}, + old: []models.EventSubscription{{Event: "e1"}}, + }, + want: true, + }, + { + name: "differ5", + args: args{ + new: []models.EventSubscription{{Event: "e1"}}, + old: []models.EventSubscription{}, + }, + want: true, + }, + { + name: "differ6", + args: args{ + new: []models.EventSubscription{{Event: "e2"}}, + old: []models.EventSubscription{{Event: "e1"}, {Event: "e1"}}, + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, subscriptionDiffer(tt.args.new, tt.args.old), "subscriptionDiffer(%v, %v)", tt.args.new, tt.args.old) + }) + } +} diff --git a/pkg/sdk/connector/types/types.go b/pkg/sdk/connector/types/types.go index a4ef9d60..3404d0d6 100644 --- a/pkg/sdk/connector/types/types.go +++ b/pkg/sdk/connector/types/types.go @@ -11,8 +11,9 @@ type AdditionalSubscriptionData struct { } type EventUpdate struct { - KeptnEvent models.KeptnContextExtendedCE - MetaData EventUpdateMetaData + KeptnEvent models.KeptnContextExtendedCE + MetaData EventUpdateMetaData + SubscriptionID string // optional } type EventUpdateMetaData struct { diff --git a/pkg/sdk/internal/api/initializer.go b/pkg/sdk/internal/api/initializer.go index 93288529..6595256b 100644 --- a/pkg/sdk/internal/api/initializer.go +++ b/pkg/sdk/internal/api/initializer.go @@ -104,7 +104,7 @@ func getHttpScheme(env config.EnvConfig) (string, error) { func eventSource(apiSet keptnapi.KeptnInterface, logger logger.Logger, env config.EnvConfig) eventsource.EventSource { if env.PubSubConnectionType() == config.ConnectionTypeHTTP { - return eventsourceHttp.New(clock.New(), eventsourceHttp.NewEventAPI(apiSet.ShipyardControlV1(), apiSet.APIV1())) + return eventsourceHttp.New(clock.New(), eventsourceHttp.NewEventAPI(apiSet.ShipyardControlV1(), apiSet.APIV1()), eventsourceHttp.WithLogger(logger)) } natsConnector := nats.New(env.EventBrokerURL, nats.WithLogger(logger)) return eventsourceNats.New(natsConnector, eventsourceNats.WithLogger(logger))