diff --git a/eventsources/common/common.go b/eventsources/common/common.go new file mode 100644 index 0000000000..d4d23327e0 --- /dev/null +++ b/eventsources/common/common.go @@ -0,0 +1,13 @@ +package common + +import "github.com/cloudevents/sdk-go/v2/event" + +type Options func(*event.Event) error + +// Option to set different ID for event +func WithID(id string) Options { + return func(e *event.Event) error { + e.SetID(id) + return nil + } +} diff --git a/eventsources/common/webhook/webhook.go b/eventsources/common/webhook/webhook.go index fa33fa23a5..25ee684a8d 100644 --- a/eventsources/common/webhook/webhook.go +++ b/eventsources/common/webhook/webhook.go @@ -27,6 +27,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" metrics "github.com/argoproj/argo-events/metrics" "github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1" ) @@ -177,7 +178,7 @@ func activateRoute(router Router, controller *Controller) { } // manageRouteChannels consumes data from route's data channel and stops the processing when the event source is stopped/removed -func manageRouteChannels(router Router, dispatch func([]byte) error) { +func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecommon.Options) error) { route := router.GetRoute() logger := route.Logger for { @@ -198,7 +199,7 @@ func manageRouteChannels(router Router, dispatch func([]byte) error) { } // ManagerRoute manages the lifecycle of a route -func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte) error) error { +func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte, ...eventsourcecommon.Options) error) error { route := router.GetRoute() logger := route.Logger diff --git a/eventsources/eventing.go b/eventsources/eventing.go index a7adc4b257..0cf17a35f9 100644 --- a/eventsources/eventing.go +++ b/eventsources/eventing.go @@ -19,6 +19,7 @@ import ( "github.com/argoproj/argo-events/common/logging" "github.com/argoproj/argo-events/eventbus" eventbusdriver "github.com/argoproj/argo-events/eventbus/driver" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources/amqp" "github.com/argoproj/argo-events/eventsources/sources/awssns" "github.com/argoproj/argo-events/eventsources/sources/awssqs" @@ -63,7 +64,7 @@ type EventingServer interface { GetEventSourceType() apicommon.EventSourceType // Function to start listening events. - StartListening(ctx context.Context, dispatch func([]byte) error) error + StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error } // GetEventingServers returns the mapping of event source type and list of eventing servers @@ -392,13 +393,19 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even Jitter: &jitter, } if err = common.Connect(&backoff, func() error { - return s.StartListening(ctx, func(data []byte) error { + return s.StartListening(ctx, func(data []byte, opts ...eventsourcecommon.Options) error { event := cloudevents.NewEvent() event.SetID(fmt.Sprintf("%x", uuid.New())) event.SetType(string(s.GetEventSourceType())) event.SetSource(s.GetEventSourceName()) event.SetSubject(s.GetEventName()) event.SetTime(time.Now()) + for _, opt := range opts { + err := opt(&event) + if err != nil { + return err + } + } err := event.SetData(cloudevents.ApplicationJSON, data) if err != nil { return err diff --git a/eventsources/sources/amqp/start.go b/eventsources/sources/amqp/start.go index 639c0f2bf2..67fac11daf 100644 --- a/eventsources/sources/amqp/start.go +++ b/eventsources/sources/amqp/start.go @@ -27,6 +27,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -58,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) @@ -153,7 +154,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte) error, log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/awssns/start.go b/eventsources/sources/awssns/start.go index 5dcadb9aca..e54c561653 100644 --- a/eventsources/sources/awssns/start.go +++ b/eventsources/sources/awssns/start.go @@ -36,6 +36,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" commonaws "github.com/argoproj/argo-events/eventsources/common/aws" "github.com/argoproj/argo-events/eventsources/common/webhook" "github.com/argoproj/argo-events/eventsources/sources" @@ -267,7 +268,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts an SNS event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { logger := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) diff --git a/eventsources/sources/awssqs/start.go b/eventsources/sources/awssqs/start.go index 26411bf138..5b5458bb26 100644 --- a/eventsources/sources/awssqs/start.go +++ b/eventsources/sources/awssqs/start.go @@ -28,6 +28,7 @@ import ( "go.uber.org/zap" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" awscommon "github.com/argoproj/argo-events/eventsources/common/aws" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" @@ -60,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the AWS SQS event source...") @@ -122,7 +123,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) processMessage(ctx context.Context, message *sqslib.Message, dispatch func([]byte) error, ack func(), log *zap.SugaredLogger) { +func (el *EventListener) processMessage(ctx context.Context, message *sqslib.Message, dispatch func([]byte, ...eventsourcecommon.Options) error, ack func(), log *zap.SugaredLogger) { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/azureeventshub/start.go b/eventsources/sources/azureeventshub/start.go index 0ff867dee9..540c572ee8 100644 --- a/eventsources/sources/azureeventshub/start.go +++ b/eventsources/sources/azureeventshub/start.go @@ -28,6 +28,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -59,7 +60,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Azure Events Hub event source...") diff --git a/eventsources/sources/bitbucketserver/start.go b/eventsources/sources/bitbucketserver/start.go index 2c657fcc87..1315daac0e 100644 --- a/eventsources/sources/bitbucketserver/start.go +++ b/eventsources/sources/bitbucketserver/start.go @@ -29,6 +29,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/common/webhook" "github.com/argoproj/argo-events/eventsources/sources" "github.com/argoproj/argo-events/pkg/apis/events" @@ -157,7 +158,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { defer sources.Recover(el.GetEventName()) bitbucketserverEventSource := &el.BitbucketServerEventSource diff --git a/eventsources/sources/calendar/start.go b/eventsources/sources/calendar/start.go index 60c0e099b4..4fceb8f4a7 100644 --- a/eventsources/sources/calendar/start.go +++ b/eventsources/sources/calendar/start.go @@ -31,6 +31,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/persist" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -134,7 +135,7 @@ func (el *EventListener) getExecutionTime() (time.Time, error) { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { el.log = logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) el.log.Info("started processing the calendar event source...") diff --git a/eventsources/sources/emitter/start.go b/eventsources/sources/emitter/start.go index ed7fe046fc..fb8ebe8032 100644 --- a/eventsources/sources/emitter/start.go +++ b/eventsources/sources/emitter/start.go @@ -27,6 +27,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -58,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Emitter event source...") diff --git a/eventsources/sources/file/start.go b/eventsources/sources/file/start.go index b0ea5e5a23..a0d0b8350b 100644 --- a/eventsources/sources/file/start.go +++ b/eventsources/sources/file/start.go @@ -29,6 +29,7 @@ import ( "go.uber.org/zap" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/common/fsevent" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" @@ -60,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) @@ -81,7 +82,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } // listenEvents listen to file related events. -func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte) error, log *zap.SugaredLogger) error { +func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error { fileEventSource := &el.FileEventSource // create new fs watcher @@ -161,7 +162,7 @@ func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte) } // listenEvents listen to file related events using polling. -func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte) error, log *zap.SugaredLogger) error { +func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error { fileEventSource := &el.FileEventSource // create new fs watcher diff --git a/eventsources/sources/gcppubsub/start.go b/eventsources/sources/gcppubsub/start.go index 4f05b14351..69af5d6c5a 100644 --- a/eventsources/sources/gcppubsub/start.go +++ b/eventsources/sources/gcppubsub/start.go @@ -34,6 +34,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -65,7 +66,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens to GCP PubSub events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { // In order to listen events from GCP PubSub, // 1. Parse the event source that contains configuration to connect to GCP PubSub // 2. Create a new PubSub client diff --git a/eventsources/sources/generic/start.go b/eventsources/sources/generic/start.go index 24ec184e75..ea3b783260 100644 --- a/eventsources/sources/generic/start.go +++ b/eventsources/sources/generic/start.go @@ -14,6 +14,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -47,7 +48,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens to generic events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { logger := logging.FromContext(ctx). With(zap.String(logging.LabelEventSourceType, string(el.GetEventSourceType())), zap.String(logging.LabelEventName, el.GetEventName()), @@ -93,7 +94,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(event *Event, dispatch func([]byte) error, logger *zap.SugaredLogger) error { +func (el *EventListener) handleOne(event *Event, dispatch func([]byte, ...eventsourcecommon.Options) error, logger *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/github/start.go b/eventsources/sources/github/start.go index bc1125159f..27e9e470e1 100644 --- a/eventsources/sources/github/start.go +++ b/eventsources/sources/github/start.go @@ -31,6 +31,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/common/webhook" "github.com/argoproj/argo-events/pkg/apis/events" ) @@ -213,7 +214,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { logger := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) logger.Info("started processing the Github event source...") diff --git a/eventsources/sources/gitlab/start.go b/eventsources/sources/gitlab/start.go index 491872ee11..719f5a584c 100644 --- a/eventsources/sources/gitlab/start.go +++ b/eventsources/sources/gitlab/start.go @@ -27,6 +27,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/common/webhook" "github.com/argoproj/argo-events/eventsources/sources" "github.com/argoproj/argo-events/pkg/apis/events" @@ -143,7 +144,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { logger := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) logger.Info("started processing the Gitlab event source...") diff --git a/eventsources/sources/hdfs/start.go b/eventsources/sources/hdfs/start.go index a271336026..0d0698761b 100644 --- a/eventsources/sources/hdfs/start.go +++ b/eventsources/sources/hdfs/start.go @@ -14,6 +14,7 @@ import ( "go.uber.org/zap" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/common/fsevent" "github.com/argoproj/argo-events/eventsources/common/naivewatcher" "github.com/argoproj/argo-events/eventsources/sources" @@ -64,7 +65,7 @@ func (w *WatchableHDFS) GetFileID(fi os.FileInfo) interface{} { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Emitter event source...") @@ -156,7 +157,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(event fsevent.Event, dispatch func([]byte) error, log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(event fsevent.Event, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/kafka/start.go b/eventsources/sources/kafka/start.go index 6d18fd44c5..d7b13b265f 100644 --- a/eventsources/sources/kafka/start.go +++ b/eventsources/sources/kafka/start.go @@ -19,6 +19,7 @@ package kafka import ( "context" "encoding/json" + "fmt" "strconv" "strings" "sync" @@ -30,6 +31,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -70,7 +72,7 @@ func verifyPartitionAvailable(part int32, partitions []int32) bool { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) @@ -85,7 +87,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte) error) error { +func (el *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Options) error) error { config, err := getSaramaConfig(kafkaEventSource, log) if err != nil { return err @@ -155,7 +157,7 @@ func (el *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.Sug return nil } -func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte) error) error { +func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Options) error) error { defer sources.Recover(el.GetEventName()) log.Info("start kafka event source...") @@ -228,7 +230,10 @@ func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.Sugared if err != nil { return errors.Wrap(err, "failed to marshal the event data, rejecting the event...") } - if err = dispatch(eventBody); err != nil { + + kafkaID := genUniqueID(el.GetEventSourceName(), el.GetEventName(), kafkaEventSource.URL, msg.Topic, msg.Partition, msg.Offset) + + if err = dispatch(eventBody, eventsourcecommon.SetCustomID(kafkaID)); err != nil { return errors.Wrap(err, "failed to dispatch a Kafka event...") } return nil @@ -310,7 +315,7 @@ func getSaramaConfig(kafkaEventSource *v1alpha1.KafkaEventSource, log *zap.Sugar // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool - dispatch func([]byte) error + dispatch func([]byte, ...eventsourcecommon.Options) error logger *zap.SugaredLogger kafkaEventSource *v1alpha1.KafkaEventSource eventSourceName string @@ -375,9 +380,20 @@ func (consumer *Consumer) processOne(session sarama.ConsumerGroupSession, messag return errors.Wrap(err, "failed to marshal the event data, rejecting the event...") } - if err = consumer.dispatch(eventBody); err != nil { + messageID := genUniqueID(consumer.eventSourceName, consumer.eventName, consumer.kafkaEventSource.URL, message.Topic, message.Partition, message.Offset) + + if err = consumer.dispatch(eventBody, eventsourcecommon.WithID(messageID)); err != nil { return errors.Wrap(err, "failed to dispatch a kafka event...") } session.MarkMessage(message, "") return nil } + +// Function can be passed as Option to generate unique id for kafka event +// eventSourceName:eventName:kafka-url:topic:partition:offset +func genUniqueID(eventSourceName, eventName, kafkaURL, topic string, partition int32, offset int64) string { + + kafkaID := fmt.Sprintf("%s:%s:%s:%s:%d:%d", eventSourceName, eventName, strings.Split(kafkaURL, ",")[0], topic, partition, offset) + + return kafkaID +} diff --git a/eventsources/sources/minio/start.go b/eventsources/sources/minio/start.go index 7e47b53b98..b787cbb7ed 100644 --- a/eventsources/sources/minio/start.go +++ b/eventsources/sources/minio/start.go @@ -29,6 +29,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -59,7 +60,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName(), zap.String("bucketName", el.MinioEventSource.Bucket.Name)) @@ -105,7 +106,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt return nil } -func (el *EventListener) handleOne(notification notification.Info, dispatch func([]byte) error, log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(notification notification.Info, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/mqtt/start.go b/eventsources/sources/mqtt/start.go index a218b4a52c..d2b3a43b7b 100644 --- a/eventsources/sources/mqtt/start.go +++ b/eventsources/sources/mqtt/start.go @@ -27,6 +27,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -58,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) diff --git a/eventsources/sources/nats/start.go b/eventsources/sources/nats/start.go index 568d0ee174..d7677648c0 100644 --- a/eventsources/sources/nats/start.go +++ b/eventsources/sources/nats/start.go @@ -27,6 +27,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -58,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) diff --git a/eventsources/sources/nsq/start.go b/eventsources/sources/nsq/start.go index 91c60ddfb5..63676d7bb0 100644 --- a/eventsources/sources/nsq/start.go +++ b/eventsources/sources/nsq/start.go @@ -28,6 +28,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -62,14 +63,14 @@ type messageHandler struct { eventSourceName string eventName string metrics *metrics.Metrics - dispatch func([]byte) error + dispatch func([]byte, ...eventsourcecommon.Options) error logger *zap.SugaredLogger isJSON bool metadata map[string]string } // StartListening listens NSQ events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the NSQ event source...") diff --git a/eventsources/sources/pulsar/start.go b/eventsources/sources/pulsar/start.go index 9019c58be7..6f411a4b45 100644 --- a/eventsources/sources/pulsar/start.go +++ b/eventsources/sources/pulsar/start.go @@ -26,6 +26,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -57,7 +58,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens Pulsar events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Pulsar event source...") @@ -169,7 +170,7 @@ consumeMessages: return nil } -func (el *EventListener) handleOne(msg pulsar.Message, dispatch func([]byte) error, log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(msg pulsar.Message, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/redis/start.go b/eventsources/sources/redis/start.go index 8e9a2dcf47..51b3529dc5 100644 --- a/eventsources/sources/redis/start.go +++ b/eventsources/sources/redis/start.go @@ -27,6 +27,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -58,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens events published by redis -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Redis event source...") @@ -125,7 +126,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(message *redis.Message, dispatch func([]byte) error, log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(message *redis.Message, dispatch func([]byte, ...eventsourcecommon.Options) error, log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/resource/start.go b/eventsources/sources/resource/start.go index d3ba05439d..f1bf0257e8 100644 --- a/eventsources/sources/resource/start.go +++ b/eventsources/sources/resource/start.go @@ -38,6 +38,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -76,7 +77,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening watches resource updates and consume those events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) diff --git a/eventsources/sources/slack/start.go b/eventsources/sources/slack/start.go index 996f81b228..75b40de13c 100644 --- a/eventsources/sources/slack/start.go +++ b/eventsources/sources/slack/start.go @@ -31,6 +31,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/common/webhook" "github.com/argoproj/argo-events/eventsources/sources" metrics "github.com/argoproj/argo-events/metrics" @@ -311,7 +312,7 @@ func (rc *Router) verifyRequest(request *http.Request) error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) diff --git a/eventsources/sources/storagegrid/start.go b/eventsources/sources/storagegrid/start.go index a37d774690..4a6e6ff1a9 100644 --- a/eventsources/sources/storagegrid/start.go +++ b/eventsources/sources/storagegrid/start.go @@ -35,6 +35,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/common/webhook" "github.com/argoproj/argo-events/eventsources/sources" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -336,7 +337,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Storage Grid event source...") diff --git a/eventsources/sources/stripe/start.go b/eventsources/sources/stripe/start.go index 1faa7de41f..1d4d0bc7f5 100644 --- a/eventsources/sources/stripe/start.go +++ b/eventsources/sources/stripe/start.go @@ -30,6 +30,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/common/webhook" "github.com/argoproj/argo-events/eventsources/sources" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -190,7 +191,7 @@ func filterEvent(event *stripe.Event, filters []string) bool { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Stripe event source...") diff --git a/eventsources/sources/webhook/start.go b/eventsources/sources/webhook/start.go index 90d49b600b..e81ea57ec0 100644 --- a/eventsources/sources/webhook/start.go +++ b/eventsources/sources/webhook/start.go @@ -27,6 +27,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" + eventsourcecommon "github.com/argoproj/argo-events/eventsources/common" "github.com/argoproj/argo-events/eventsources/common/webhook" metrics "github.com/argoproj/argo-events/metrics" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -143,7 +144,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Options) error) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the webhook event source...")