Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

Commit

Permalink
fix: Fix event handling for duplicated subscriptions in go-sdk remo…
Browse files Browse the repository at this point in the history
…te execution plane use case (#542)

* fixed issue and improved logging

Signed-off-by: warber <[email protected]>

* set logger for eventsource correctly

Signed-off-by: warber <[email protected]>

* review

Signed-off-by: warber <[email protected]>

Signed-off-by: warber <[email protected]>
  • Loading branch information
warber authored Oct 6, 2022
1 parent cc6d13c commit 8655d73
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 8 deletions.
10 changes: 7 additions & 3 deletions pkg/sdk/connector/controlplane/controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,17 @@ func (cp *ControlPlane) stopComponents() {

func (cp *ControlPlane) handle(ctx context.Context, eventUpdate types.EventUpdate, integration Integration) error {
cp.logger.Debugf("Received an event of type: %s", *eventUpdate.KeptnEvent.Type)
// if we already know the subscription ID we can just forward the event to be handled
if eventUpdate.SubscriptionID != "" {
return cp.forwardMatchedEvent(ctx, eventUpdate, integration, eventUpdate.SubscriptionID)
}
for _, subscription := range cp.currentSubscriptions {
if subscription.Event == eventUpdate.MetaData.Subject {
cp.logger.Debugf("Check if event matches subscription %s", subscription.ID)
matcher := eventmatcher.New(subscription)
if matcher.Matches(eventUpdate.KeptnEvent) {
cp.logger.Info("Forwarding matched event update: ", eventUpdate.KeptnEvent.ID)
if err := cp.forwardMatchedEvent(ctx, eventUpdate, integration, subscription); err != nil {
if err := cp.forwardMatchedEvent(ctx, eventUpdate, integration, subscription.ID); err != nil {
return err
}
}
Expand All @@ -228,7 +232,7 @@ func (cp *ControlPlane) getSender(sender types.EventSender) types.EventSender {
}
}

func (cp *ControlPlane) forwardMatchedEvent(ctx context.Context, eventUpdate types.EventUpdate, integration Integration, subscription models.EventSubscription) error {
func (cp *ControlPlane) forwardMatchedEvent(ctx context.Context, eventUpdate types.EventUpdate, integration Integration, subscriptionID string) error {
// increase the eventHandler WaitGroup
cp.eventHandlerWaitGroup.Add(1)
// when the event handler is done, decrease the WaitGroup again
Expand All @@ -237,7 +241,7 @@ func (cp *ControlPlane) forwardMatchedEvent(ctx context.Context, eventUpdate typ
err := eventUpdate.KeptnEvent.AddTemporaryData(
tmpDataDistributorKey,
types.AdditionalSubscriptionData{
SubscriptionID: subscription.ID,
SubscriptionID: subscriptionID,
},
models.AddTemporaryDataOptions{
OverwriteIfExisting: true,
Expand Down
111 changes: 111 additions & 0 deletions pkg/sdk/connector/controlplane/controlplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,117 @@ func TestControlPlaneInboundEventIsForwardedToIntegration(t *testing.T) {
}, 5*time.Second, 100*time.Millisecond)
}

func TestControlPlaneInboundEventWithSubscriptionIDIsForwardedToIntegration(t *testing.T) {
var eventChan chan types.EventUpdate
var subsChan chan []models.EventSubscription
var integrationReceivedEvent models.KeptnContextExtendedCE
var subscriptionSourceStopCalled bool
var eventSourceStopCalled bool

mtx := sync.RWMutex{}

callBackSender := func(ce models.KeptnContextExtendedCE) error { return nil }

ssm := &fake.SubscriptionSourceMock{
StartFn: func(ctx context.Context, data types.RegistrationData, c chan []models.EventSubscription, errC chan error, wg *sync.WaitGroup) error {
mtx.Lock()
defer mtx.Unlock()
subsChan = c
wg.Done()
return nil
},
RegisterFn: func(integration models.Integration) (string, error) {
return "some-id", nil
},
StopFn: func() error {
mtx.Lock()
defer mtx.Unlock()
subscriptionSourceStopCalled = true
return nil
},
}
esm := &fake.EventSourceMock{
StartFn: func(ctx context.Context, data types.RegistrationData, ces chan types.EventUpdate, errC chan error, wg *sync.WaitGroup) error {
mtx.Lock()
defer mtx.Unlock()
eventChan = ces
wg.Done()
return nil
},
OnSubscriptionUpdateFn: func(strings []models.EventSubscription) {},
SenderFn: func() types.EventSender { return callBackSender },
StopFn: func() error {
mtx.Lock()
defer mtx.Unlock()
eventSourceStopCalled = true
return nil
},
CleanupFn: func() error {
return nil
},
}
fm := &LogForwarderMock{
ForwardFn: func(keptnEvent models.KeptnContextExtendedCE, integrationID string) error {
return nil
},
}

controlPlane := New(ssm, esm, fm)

integration := ExampleIntegration{
RegistrationDataFn: func() types.RegistrationData { return types.RegistrationData{} },
OnEventFn: func(ctx context.Context, ce models.KeptnContextExtendedCE) error {
mtx.Lock()
defer mtx.Unlock()
integrationReceivedEvent = ce
return nil
},
}
ctx, cancel := context.WithCancel(context.TODO())
go controlPlane.Register(ctx, integration)
require.Eventually(t, func() bool {
mtx.RLock()
defer mtx.RUnlock()
return subsChan != nil
}, time.Second, time.Millisecond*100)
require.Eventually(t, func() bool {
mtx.RLock()
defer mtx.RUnlock()
return eventChan != nil
}, time.Second, time.Millisecond*100)

// event update containing explicit subscriptions ID
eventUpdate2 := types.EventUpdate{KeptnEvent: models.KeptnContextExtendedCE{ID: "EVENT_ID2", Type: strutils.Stringp("sh.keptn.event.echo.triggered")}, SubscriptionID: "SUBSCRIPTION_ID"}
eventChan <- eventUpdate2

require.Eventually(t, func() bool {
mtx.Lock()
defer mtx.Unlock()
eventUpdate2.KeptnEvent.Data = integrationReceivedEvent.Data
return reflect.DeepEqual(eventUpdate2.KeptnEvent, integrationReceivedEvent)
}, time.Second, time.Millisecond*100)

eventData := map[string]interface{}{}
err := integrationReceivedEvent.DataAs(&eventData)
require.Nil(t, err)

require.Equal(t, map[string]interface{}{
"temporaryData": map[string]interface{}{
"distributor": map[string]interface{}{
"subscriptionID": "SUBSCRIPTION_ID",
},
},
}, eventData)

cancel()

require.Eventually(t, func() bool {
mtx.RLock()
defer mtx.RUnlock()
return subscriptionSourceStopCalled && eventSourceStopCalled
}, 5*time.Second, 100*time.Millisecond)
}

func TestControlPlaneInboundEventIsForwardedToIntegrationWithoutLogForwarder(t *testing.T) {
var eventChan chan types.EventUpdate
var subsChan chan []models.EventSubscription
Expand Down
16 changes: 14 additions & 2 deletions pkg/sdk/connector/eventsource/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func (hes *HTTPEventSource) Start(ctx context.Context, data types.RegistrationDa
func (hes *HTTPEventSource) OnSubscriptionUpdate(subscriptions []models.EventSubscription) {
hes.mutex.Lock()
defer hes.mutex.Unlock()
if subscriptionDiffer(subscriptions, hes.currentSubscriptions) {
hes.logger.Infof("Got new subscriptions: %v", getEvents(subscriptions))
}
hes.currentSubscriptions = subscriptions
}

Expand Down Expand Up @@ -135,8 +138,9 @@ func (hes *HTTPEventSource) doPoll(eventUpdates chan types.EventUpdate) error {
continue
}
eventUpdates <- types.EventUpdate{
KeptnEvent: *e,
MetaData: types.EventUpdateMetaData{Subject: sub.Event},
KeptnEvent: *e,
MetaData: types.EventUpdateMetaData{Subject: sub.Event},
SubscriptionID: sub.ID,
}
hes.cache.Add(sub.ID, e.ID)
}
Expand Down Expand Up @@ -166,3 +170,11 @@ func getEventFilterForSubscription(subscription models.EventSubscription) api.Ev

return eventFilter
}

func getEvents(subscriptions []models.EventSubscription) []string {
events := []string{}
for _, s := range subscriptions {
events = append(events, s.Event)
}
return events
}
17 changes: 17 additions & 0 deletions pkg/sdk/connector/eventsource/http/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,23 @@ func dedup(elements []string) []string {
return result
}

// subscriptionDiffer checks whether two lists of event subscriptions are different based on the
// event type they are targeting
func subscriptionDiffer(s1 []models.EventSubscription, s2 []models.EventSubscription) bool {
for _, ns := range s1 {
found := false
for _, os := range s2 {
if os.Event == ns.Event {
found = true
}
}
if !found {
return true
}
}
return len(s1) != len(s2)
}

func ToIds(events []*models.KeptnContextExtendedCE) []string {
ids := []string{}
for _, e := range events {
Expand Down
83 changes: 83 additions & 0 deletions pkg/sdk/connector/eventsource/http/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package http

import (
"github.com/keptn/go-utils/pkg/api/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
Expand Down Expand Up @@ -95,3 +96,85 @@ func TestKeep(t *testing.T) {
assert.False(t, cache.Contains("t2", "e4"))
assert.True(t, cache.Contains("t2", "e5"))
}

func Test_subscriptionDiffer(t *testing.T) {
type args struct {
new []models.EventSubscription
old []models.EventSubscription
}
tests := []struct {
name string
args args
want bool
}{
{
name: "equal empty",
args: args{
new: []models.EventSubscription{},
old: []models.EventSubscription{},
},
want: false,
},
{
name: "equal",
args: args{
new: []models.EventSubscription{{Event: "e1"}},
old: []models.EventSubscription{{Event: "e1"}},
},
want: false,
},
{
name: "differ1",
args: args{
new: []models.EventSubscription{{Event: "e1"}},
old: []models.EventSubscription{{Event: "e2"}},
},
want: true,
},
{
name: "differ2",
args: args{
new: []models.EventSubscription{{Event: "e2"}},
old: []models.EventSubscription{{Event: "e1"}},
},
want: true,
},
{
name: "differ3",
args: args{
new: []models.EventSubscription{{Event: "e2"}, {Event: "e1"}},
old: []models.EventSubscription{{Event: "e1"}},
},
want: true,
},
{
name: "differ4",
args: args{
new: []models.EventSubscription{},
old: []models.EventSubscription{{Event: "e1"}},
},
want: true,
},
{
name: "differ5",
args: args{
new: []models.EventSubscription{{Event: "e1"}},
old: []models.EventSubscription{},
},
want: true,
},
{
name: "differ6",
args: args{
new: []models.EventSubscription{{Event: "e2"}},
old: []models.EventSubscription{{Event: "e1"}, {Event: "e1"}},
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, subscriptionDiffer(tt.args.new, tt.args.old), "subscriptionDiffer(%v, %v)", tt.args.new, tt.args.old)
})
}
}
5 changes: 3 additions & 2 deletions pkg/sdk/connector/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ type AdditionalSubscriptionData struct {
}

type EventUpdate struct {
KeptnEvent models.KeptnContextExtendedCE
MetaData EventUpdateMetaData
KeptnEvent models.KeptnContextExtendedCE
MetaData EventUpdateMetaData
SubscriptionID string // optional
}

type EventUpdateMetaData struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sdk/internal/api/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func getHttpScheme(env config.EnvConfig) (string, error) {

func eventSource(apiSet keptnapi.KeptnInterface, logger logger.Logger, env config.EnvConfig) eventsource.EventSource {
if env.PubSubConnectionType() == config.ConnectionTypeHTTP {
return eventsourceHttp.New(clock.New(), eventsourceHttp.NewEventAPI(apiSet.ShipyardControlV1(), apiSet.APIV1()))
return eventsourceHttp.New(clock.New(), eventsourceHttp.NewEventAPI(apiSet.ShipyardControlV1(), apiSet.APIV1()), eventsourceHttp.WithLogger(logger))
}
natsConnector := nats.New(env.EventBrokerURL, nats.WithLogger(logger))
return eventsourceNats.New(natsConnector, eventsourceNats.WithLogger(logger))
Expand Down

0 comments on commit 8655d73

Please sign in to comment.