Skip to content

Commit

Permalink
valid gcp-pubsub gateway type plus validation of parameter (#492)
Browse files Browse the repository at this point in the history
  • Loading branch information
borgoat authored Feb 14, 2020
1 parent 21cb5b5 commit 2ac99a4
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 6 deletions.
2 changes: 1 addition & 1 deletion examples/gateways/gcp-pubsub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ metadata:
gateways.argoproj.io/gateway-controller-instanceid: argo-events
spec:
replica: 1
type: gcp-pubsub
type: pubsub
eventSourceRef:
name: gcp-pubsub-event-source
template:
Expand Down
3 changes: 2 additions & 1 deletion gateways/client/event-source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ func TestInitEventSourceContexts(t *testing.T) {
fmt.Println("server is stopped")
}()

contexts := gatewayContext.initEventSourceContexts(eventSource)
contexts, err := gatewayContext.initEventSourceContexts(eventSource)
assert.NoError(t, err)
assert.NotNil(t, contexts)
for _, esContext := range contexts {
assert.Equal(t, "first-webhook", esContext.source.Name)
Expand Down
17 changes: 13 additions & 4 deletions gateways/client/event-sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package main
import (
"context"
"fmt"
"io"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/gateways"
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
Expand All @@ -28,7 +30,6 @@ import (
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"io"
)

// populateEventSourceContexts sets up the contexts for event sources
Expand Down Expand Up @@ -266,7 +267,10 @@ func (gatewayContext *GatewayContext) deactivateEventSources(eventSourceNames []

// syncEventSources syncs active event-sources and the updated ones
func (gatewayContext *GatewayContext) syncEventSources(eventSource *eventSourceV1Alpha1.EventSource) error {
eventSourceContexts := gatewayContext.initEventSourceContexts(eventSource)
eventSourceContexts, err := gatewayContext.initEventSourceContexts(eventSource)
if err != nil {
return err
}

staleEventSources, newEventSources := gatewayContext.diffEventSources(eventSourceContexts)
gatewayContext.logger.WithField(common.LabelEventSource, staleEventSources).Infoln("deleted event sources")
Expand All @@ -284,8 +288,11 @@ func (gatewayContext *GatewayContext) syncEventSources(eventSource *eventSourceV
}

// initEventSourceContext creates an internal representation of event sources.
func (gatewayContext *GatewayContext) initEventSourceContexts(eventSource *eventSourceV1Alpha1.EventSource) map[string]*EventSourceContext {
// It returns an error if the Gateway is set in such a way
// that it wouldn't pick up any known Event Source.
func (gatewayContext *GatewayContext) initEventSourceContexts(eventSource *eventSourceV1Alpha1.EventSource) (map[string]*EventSourceContext, error) {
eventSourceContexts := make(map[string]*EventSourceContext)
var err error

switch gatewayContext.gateway.Spec.Type {
case apicommon.SNSEvent:
Expand Down Expand Up @@ -380,7 +387,9 @@ func (gatewayContext *GatewayContext) initEventSourceContexts(eventSource *event
for key, value := range eventSource.Spec.Generic {
gatewayContext.populateEventSourceContexts(key, value, eventSourceContexts)
}
default:
err = fmt.Errorf("gateway with type %s is invalid", gatewayContext.gateway.Spec.Type)
}

return eventSourceContexts
return eventSourceContexts, err
}

0 comments on commit 2ac99a4

Please sign in to comment.