diff --git a/eventbus/driver/driver.go b/eventbus/driver/driver.go index 85607a584c..31c62bc7b4 100644 --- a/eventbus/driver/driver.go +++ b/eventbus/driver/driver.go @@ -14,11 +14,12 @@ type Driver interface { // SubscribeEventSources is used to subscribe multiple event source dependencies // Parameter - ctx, context // Parameter - conn, eventbus connection + // Parameter - closeCh, channel to indicate to close the subscription // Parameter - dependencyExpr, example: "(dep1 || dep2) && dep3" // Parameter - dependencies, array of dependencies information // Parameter - filter, a function used to filter the message // Parameter - action, a function to be triggered after all conditions meet - SubscribeEventSources(ctx context.Context, conn Connection, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error + SubscribeEventSources(ctx context.Context, conn Connection, closeCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error // Publish a message Publish(conn Connection, message []byte) error diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index f6bdb1a938..5b5c84132b 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -3,8 +3,6 @@ package driver import ( "context" "encoding/json" - "os" - "os/signal" "strings" "time" @@ -73,7 +71,16 @@ func NewNATSStreaming(url, clusterID, subject, clientID string, auth *Auth, logg func (n *natsStreaming) Connect() (Connection, error) { log := n.logger.WithField("clientID", n.clientID) - opts := []nats.Option{} + opts := []nats.Option{ + nats.MaxReconnects(-1), + nats.ReconnectWait(3 * time.Second), + nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + log.Errorf("NATS connection lost, reason: %v", err) + }), + nats.ReconnectHandler(func(nnc *nats.Conn) { + log.Info("Reconnected to NATS server") + }), + } switch n.auth.Strategy { case eventbusv1alpha1.AuthStrategyToken: log.Info("NATS auth strategy: Token") @@ -89,9 +96,10 @@ func (n *natsStreaming) Connect() (Connection, error) { return nil, err } log.Info("Connected to NATS server.") - sc, err := stan.Connect(n.clusterID, n.clientID, stan.NatsConn(nc), + + sc, err := stan.Connect(n.clusterID, n.clientID, stan.NatsConn(nc), stan.Pings(5, 60), stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) { - log.Fatalf("Connection lost, reason: %v", reason) + log.Errorf("NATS streaming connection lost, reason: %v", reason) })) if err != nil { log.Errorf("Failed to connect to NATS streaming server, %v", err) @@ -114,7 +122,7 @@ func (n *natsStreaming) Publish(conn Connection, message []byte) error { // Parameter - dependencies, array of dependencies information // Parameter - filter, a function used to filter the message // Parameter - action, a function to be triggered after all conditions meet -func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connection, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error { +func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connection, closeCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error { log := n.logger.WithField("clientID", n.clientID) msgHolder, err := newEventSourceMessageHolder(dependencyExpr, dependencies) if err != nil { @@ -138,20 +146,18 @@ func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connecti return err } log.Infof("Subscribed to subject %s ...", n.subject) - - signalChan := make(chan os.Signal, 1) - cleanupDone := make(chan bool) - signal.Notify(signalChan, os.Interrupt) - go func() { - for range signalChan { - log.Info("Received an interrupt, unsubscribing and closing connection...") + for { + select { + case <-ctx.Done(): + log.Info("existing, unsubscribing and closing connection...") + _ = sub.Close() + log.Infof("subscription on subject %s closed", n.subject) + case <-closeCh: + log.Info("closing subscription...") _ = sub.Close() log.Infof("subscription on subject %s closed", n.subject) - cleanupDone <- true } - }() - <-cleanupDone - return nil + } } func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourceMessageHolder, filter func(dependencyName string, event cloudevents.Event) bool, action func(map[string]cloudevents.Event), log *logrus.Entry) { diff --git a/eventbus/eventbus.go b/eventbus/eventbus.go index 119c78f87b..9eb96ded35 100644 --- a/eventbus/eventbus.go +++ b/eventbus/eventbus.go @@ -47,6 +47,7 @@ func GetDriver(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig, s } v.WatchConfig() v.OnConfigChange(func(e fsnotify.Event) { + logger.Info("eventbus auth config file changed.") err = v.Unmarshal(cred) if err != nil { logger.Errorf("failed to unmarshal auth.yaml after reloading, err: %v", err) diff --git a/eventsources/eventing.go b/eventsources/eventing.go index 27e100b898..e66f14fdec 100644 --- a/eventsources/eventing.go +++ b/eventsources/eventing.go @@ -53,7 +53,7 @@ type EventingServer interface { GetEventSourceType() apicommon.EventSourceType // Function to start listening events. - StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error + StartListening(ctx context.Context, dispatch func([]byte) error) error } // GetEventingServers returns the mapping of event source type and list of eventing servers @@ -253,6 +253,31 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{}) logger.WithError(err).Errorln("failed to connect to eventbus") return err } + defer e.eventBusConn.Close() + + // Daemon to reconnect + go func(ctx context.Context) { + logger.Info("starting eventbus connection daemon...") + for { + select { + case <-ctx.Done(): + logger.Info("exiting eventbus connection daemon...") + return + default: + time.Sleep(3 * time.Second) + } + if e.eventBusConn == nil || e.eventBusConn.IsClosed() { + logger.Info("NATS connection lost, reconnecting...") + e.eventBusConn, err = driver.Connect() + if err != nil { + logger.WithError(err).Errorln("failed to reconnect to eventbus") + continue + } + logger.Info("reconnected the NATS streaming server...") + } + } + }(cctx) + for _, ss := range servers { for _, server := range ss { err := server.ValidateEventSource(cctx) @@ -261,7 +286,7 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{}) return err } go func(s EventingServer) { - err := s.StartListening(cctx, stopCh, func(data []byte) error { + err := s.StartListening(cctx, func(data []byte) error { event := cloudevents.NewEvent() event.SetID(fmt.Sprintf("%x", uuid.New())) event.SetType(string(s.GetEventSourceType())) @@ -280,7 +305,6 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{}) }) logger.WithField(logging.LabelEventSourceName, s.GetEventSourceName()). WithField(logging.LabelEventName, s.GetEventName()).WithError(err).Errorln("failed to start service.") - // TODO: Need retry. }(server) } } diff --git a/eventsources/sources/amqp/start.go b/eventsources/sources/amqp/start.go index 9700c8bd26..c73af01b75 100644 --- a/eventsources/sources/amqp/start.go +++ b/eventsources/sources/amqp/start.go @@ -54,7 +54,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), logging.LabelEventSourceName: el.GetEventSourceName(), @@ -141,7 +141,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc if err != nil { log.WithError(err).Errorln("failed to dispatch event") } - case <-stopCh: + case <-ctx.Done(): err = conn.Close() if err != nil { log.WithError(err).Info("failed to close connection") diff --git a/eventsources/sources/awssns/start.go b/eventsources/sources/awssns/start.go index bcf2eb9c36..0664d8af1d 100644 --- a/eventsources/sources/awssns/start.go +++ b/eventsources/sources/awssns/start.go @@ -212,7 +212,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts an SNS event source -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { logger := logging.FromContext(ctx) log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), diff --git a/eventsources/sources/awssqs/start.go b/eventsources/sources/awssqs/start.go index 4c69095aea..355b7ebd35 100644 --- a/eventsources/sources/awssqs/start.go +++ b/eventsources/sources/awssqs/start.go @@ -58,7 +58,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), logging.LabelEventSourceName: el.GetEventSourceName(), @@ -96,7 +96,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc log.Infoln("listening for messages on the queue...") for { select { - case <-stopCh: + case <-ctx.Done(): log.Info("exiting SQS event listener...") return nil default: diff --git a/eventsources/sources/azureeventshub/start.go b/eventsources/sources/azureeventshub/start.go index 168a323f65..0908568298 100644 --- a/eventsources/sources/azureeventshub/start.go +++ b/eventsources/sources/azureeventshub/start.go @@ -55,7 +55,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), logging.LabelEventSourceName: el.GetEventSourceName(), @@ -123,7 +123,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc listenerHandles = append(listenerHandles, listenerHandle) } - <-stopCh + <-ctx.Done() log.Infoln("stopping listener handlers") for _, handler := range listenerHandles { diff --git a/eventsources/sources/calendar/start.go b/eventsources/sources/calendar/start.go index b68ecf9d3d..84164a09e1 100644 --- a/eventsources/sources/calendar/start.go +++ b/eventsources/sources/calendar/start.go @@ -54,7 +54,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), logging.LabelEventSourceName: el.GetEventSourceName(), @@ -126,7 +126,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc if err != nil { log.WithError(err).Errorln("failed to dispatch event") } - case <-stopCh: + case <-ctx.Done(): log.Info("exiting calendar event listener...") return nil } diff --git a/eventsources/sources/emitter/start.go b/eventsources/sources/emitter/start.go index b192ff8f5f..ac76601f25 100644 --- a/eventsources/sources/emitter/start.go +++ b/eventsources/sources/emitter/start.go @@ -53,7 +53,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), logging.LabelEventSourceName: el.GetEventSourceName(), @@ -130,7 +130,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc return errors.Wrapf(err, "failed to subscribe to channel %s", emitterEventSource.ChannelName) } - <-stopCh + <-ctx.Done() log.WithField("channel-name", emitterEventSource.ChannelName).Infoln("event source stopped, unsubscribe the channel") diff --git a/eventsources/sources/file/start.go b/eventsources/sources/file/start.go index 77911b4e28..72d9524d60 100644 --- a/eventsources/sources/file/start.go +++ b/eventsources/sources/file/start.go @@ -58,7 +58,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), logging.LabelEventSourceName: el.GetEventSourceName(), @@ -68,12 +68,12 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc fileEventSource := &el.FileEventSource if fileEventSource.Polling { - if err := el.listenEventsPolling(ctx, stopCh, dispatch, log); err != nil { + if err := el.listenEventsPolling(ctx, dispatch, log); err != nil { log.WithError(err).Errorln("failed to listen to events") return err } } else { - if err := el.listenEvents(ctx, stopCh, dispatch, log); err != nil { + if err := el.listenEvents(ctx, dispatch, log); err != nil { log.WithError(err).Errorln("failed to listen to events") return err } @@ -82,7 +82,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc } // listenEvents listen to file related events. -func (el *EventListener) listenEvents(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error, log *logrus.Entry) error { +func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte) error, log *logrus.Entry) error { fileEventSource := &el.FileEventSource // create new fs watcher @@ -154,7 +154,7 @@ func (el *EventListener) listenEvents(ctx context.Context, stopCh <-chan struct{ } case err := <-watcher.Errors: return errors.Wrapf(err, "failed to process %s", el.GetEventName()) - case <-stopCh: + case <-ctx.Done(): log.Infoln("event source has been stopped") return nil } @@ -162,7 +162,7 @@ func (el *EventListener) listenEvents(ctx context.Context, stopCh <-chan struct{ } // listenEvents listen to file related events using polling. -func (el *EventListener) listenEventsPolling(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error, log *logrus.Entry) error { +func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte) error, log *logrus.Entry) error { fileEventSource := &el.FileEventSource // create new fs watcher @@ -234,7 +234,7 @@ func (el *EventListener) listenEventsPolling(ctx context.Context, stopCh <-chan case err := <-watcher.Error: log.WithError(err).Errorf("failed to process %s", el.GetEventName()) return - case <-stopCh: + case <-ctx.Done(): log.Infoln("event source has been stopped") return } diff --git a/eventsources/sources/gcppubsub/start.go b/eventsources/sources/gcppubsub/start.go index ed727af3cd..74167ea957 100644 --- a/eventsources/sources/gcppubsub/start.go +++ b/eventsources/sources/gcppubsub/start.go @@ -55,7 +55,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens to GCP PubSub events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { // In order to listen events from GCP PubSub, // 1. Parse the event source that contains configuration to connect to GCP PubSub // 2. Create a new PubSub client @@ -169,7 +169,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc return errors.Wrapf(err, "failed to receive the messages for subscription %s and topic %s for %s", subscriptionName, pubsubEventSource.Topic, el.GetEventName()) } - <-stopCh + <-ctx.Done() log.Infoln("event source has been stopped") diff --git a/eventsources/sources/github/start.go b/eventsources/sources/github/start.go index 8363c46b75..fb902f711e 100644 --- a/eventsources/sources/github/start.go +++ b/eventsources/sources/github/start.go @@ -274,8 +274,8 @@ func (router *Router) PostInactivate() error { return nil } -// StartListening starts an SNS event source -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +// StartListening starts an event source +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { logger := logging.FromContext(ctx) log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), diff --git a/eventsources/sources/gitlab/start.go b/eventsources/sources/gitlab/start.go index 63df6840d6..86154b4cdf 100644 --- a/eventsources/sources/gitlab/start.go +++ b/eventsources/sources/gitlab/start.go @@ -262,8 +262,8 @@ func (router *Router) PostInactivate() error { return nil } -// StartListening starts an SNS event source -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +// StartListening starts an event source +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { logger := logging.FromContext(ctx) log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), diff --git a/eventsources/sources/hdfs/start.go b/eventsources/sources/hdfs/start.go index 430327bc45..cb8ffcbbdf 100644 --- a/eventsources/sources/hdfs/start.go +++ b/eventsources/sources/hdfs/start.go @@ -61,7 +61,7 @@ func (w *WatchableHDFS) GetFileID(fi os.FileInfo) interface{} { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), logging.LabelEventSourceName: el.GetEventSourceName(), @@ -164,7 +164,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc } case err := <-watcher.Errors: return errors.Wrapf(err, "failed to watch events for %s", el.GetEventName()) - case <-stopCh: + case <-ctx.Done(): return nil } } diff --git a/eventsources/sources/kafka/start.go b/eventsources/sources/kafka/start.go index d323fcbdc9..83a4cd67e5 100644 --- a/eventsources/sources/kafka/start.go +++ b/eventsources/sources/kafka/start.go @@ -63,7 +63,7 @@ func verifyPartitionAvailable(part int32, partitions []int32) bool { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), logging.LabelEventSourceName: el.GetEventSourceName(), @@ -157,7 +157,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc case err := <-partitionConsumer.Errors(): return errors.Wrapf(err, "failed to consume messages for event source %s", el.GetEventName()) - case <-stopCh: + case <-ctx.Done(): log.Infoln("event source is stopped, closing partition consumer") err = partitionConsumer.Close() if err != nil { diff --git a/eventsources/sources/minio/start.go b/eventsources/sources/minio/start.go index 99a5d6f8ca..44ab326c3d 100644 --- a/eventsources/sources/minio/start.go +++ b/eventsources/sources/minio/start.go @@ -52,7 +52,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), logging.LabelEventSourceName: el.GetEventSourceName(), @@ -104,7 +104,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc } } - <-stopCh + <-ctx.Done() doneCh <- struct{}{} log.Infoln("event source is stopped") diff --git a/eventsources/sources/mqtt/start.go b/eventsources/sources/mqtt/start.go index abd43c0929..b662b9d9ba 100644 --- a/eventsources/sources/mqtt/start.go +++ b/eventsources/sources/mqtt/start.go @@ -54,7 +54,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), logging.LabelEventSourceName: el.GetEventSourceName(), @@ -122,7 +122,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc return errors.Wrapf(token.Error(), "failed to subscribe to the topic %s for event source %s", mqttEventSource.Topic, el.GetEventName()) } - <-stopCh + <-ctx.Done() log.Infoln("event source is stopped, unsubscribing the client...") token := client.Unsubscribe(mqttEventSource.Topic) diff --git a/eventsources/sources/nats/start.go b/eventsources/sources/nats/start.go index 0a464c7c87..c9988a67d4 100644 --- a/eventsources/sources/nats/start.go +++ b/eventsources/sources/nats/start.go @@ -54,7 +54,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), logging.LabelEventSourceName: el.GetEventSourceName(), @@ -122,7 +122,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc return errors.Wrapf(err, "connection failure for event source %s", el.GetEventName()) } - <-stopCh + <-ctx.Done() log.Infoln("event source is stopped") return nil } diff --git a/eventsources/sources/nsq/start.go b/eventsources/sources/nsq/start.go index 6f84b8e589..0976cf71b0 100644 --- a/eventsources/sources/nsq/start.go +++ b/eventsources/sources/nsq/start.go @@ -61,7 +61,7 @@ type messageHandler struct { } // StartListening listens NSQ events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), logging.LabelEventSourceName: el.GetEventSourceName(), @@ -107,7 +107,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc return errors.Wrapf(err, "lookup failed for host %s for event source %s", nsqEventSource.HostAddress, el.GetEventName()) } - <-stopCh + <-ctx.Done() log.Infoln("event source has stopped") consumer.Stop() return nil diff --git a/eventsources/sources/redis/start.go b/eventsources/sources/redis/start.go index f1594a3975..fb498d00ba 100644 --- a/eventsources/sources/redis/start.go +++ b/eventsources/sources/redis/start.go @@ -53,7 +53,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens events published by redis -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), logging.LabelEventSourceName: el.GetEventSourceName(), @@ -121,7 +121,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc if err != nil { log.WithError(err).Errorln("failed to dispatch event") } - case <-stopCh: + case <-ctx.Done(): log.Infoln("event source is stopped. unsubscribing the subscription") if err := pubsub.Unsubscribe(redisEventSource.Channels...); err != nil { log.WithError(err).Errorln("failed to unsubscribe") diff --git a/eventsources/sources/slack/start.go b/eventsources/sources/slack/start.go index 05d25bcf56..8e87778cc5 100644 --- a/eventsources/sources/slack/start.go +++ b/eventsources/sources/slack/start.go @@ -268,8 +268,8 @@ func (rc *Router) verifyRequest(request *http.Request) error { return nil } -// StartListening starts an SNS event source -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +// StartListening starts an event source +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { logger := logging.FromContext(ctx) log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), diff --git a/eventsources/sources/storagegrid/start.go b/eventsources/sources/storagegrid/start.go index 14962c0d92..39e06be8d6 100644 --- a/eventsources/sources/storagegrid/start.go +++ b/eventsources/sources/storagegrid/start.go @@ -318,8 +318,8 @@ func (router *Router) PostInactivate() error { return nil } -// StartListening starts an SNS event source -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +// StartListening starts an event source +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { logger := logging.FromContext(ctx) log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), diff --git a/eventsources/sources/stripe/start.go b/eventsources/sources/stripe/start.go index be731452af..0ce278f0ef 100644 --- a/eventsources/sources/stripe/start.go +++ b/eventsources/sources/stripe/start.go @@ -182,8 +182,8 @@ func filterEvent(event *stripe.Event, filters []string) bool { return false } -// StartListening starts an SNS event source -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +// StartListening starts an event source +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { logger := logging.FromContext(ctx) log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), diff --git a/eventsources/sources/webhook/start.go b/eventsources/sources/webhook/start.go index 1439c8b0e2..72e748d3ae 100644 --- a/eventsources/sources/webhook/start.go +++ b/eventsources/sources/webhook/start.go @@ -134,7 +134,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error { logger := logging.FromContext(ctx) log := logging.FromContext(ctx).WithFields(map[string]interface{}{ logging.LabelEventSourceType: el.GetEventSourceType(), diff --git a/sensors/listener.go b/sensors/listener.go index e91656c5af..70d9fb5830 100644 --- a/sensors/listener.go +++ b/sensors/listener.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/Knetic/govaluate" "github.com/antonmedv/expr" @@ -41,21 +42,11 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan cctx, cancel := context.WithCancel(ctx) defer cancel() logger := logging.FromContext(cctx) - err := sensorCtx.listenEventsOverEventBus(cctx, stopCh) - if err != nil { - logger.WithError(err).Errorln("subscription failure. stopping sensor operations") - return err - } - return nil -} - -func (sensorCtx *SensorContext) listenEventsOverEventBus(ctx context.Context, stopCh <-chan struct{}) error { - logger := logging.FromContext(ctx) sensor := sensorCtx.Sensor // Get a mapping of dependencyExpression: []triggers triggerMapping := make(map[string][]v1alpha1.Trigger) for _, trigger := range sensor.Spec.Triggers { - depExpr, err := sensorCtx.getDependencyExpression(ctx, trigger) + depExpr, err := sensorCtx.getDependencyExpression(cctx, trigger) if err != nil { logger.WithError(err).Errorln("failed to get dependency expression") return err @@ -120,8 +111,8 @@ func (sensorCtx *SensorContext) listenEventsOverEventBus(ctx context.Context, st return } defer conn.Close() - logger.Infof("Started to subscribe events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID) - err = ebDriver.SubscribeEventSources(ctx, conn, depExpression, deps, func(depName string, event cloudevents.Event) bool { + + filterFunc := func(depName string, event cloudevents.Event) bool { dep, ok := depMapping[depName] if !ok { return false @@ -136,17 +127,49 @@ func (sensorCtx *SensorContext) listenEventsOverEventBus(ctx context.Context, st return false } return result - }, func(events map[string]cloudevents.Event) { + } + + actionFunc := func(events map[string]cloudevents.Event) { err := sensorCtx.triggerActions(ctx, events, triggers) if err != nil { logger.WithError(err).Errorln("Failed to trigger actions") } - }) + } + + // Attempt to reconnect + closeSubCh := make(chan struct{}, 1) + go func(ctx context.Context, dvr eventbusdriver.Driver) { + logger.Infof("starting eventbus connection daemon for client %s...", clientID) + for { + select { + case <-ctx.Done(): + logger.Infof("exiting eventbus connection daemon for client %s...", clientID) + return + default: + time.Sleep(3 * time.Second) + } + if conn == nil || conn.IsClosed() { + logger.Info("NATS connection lost, reconnecting...") + conn, err = dvr.Connect() + if err != nil { + logger.WithError(err).Errorf("failed to reconnect to eventbus, client: %s", clientID) + continue + } + logger.Infof("reconnected the NATS streaming server for client %s...", clientID) + closeSubCh <- struct{}{} + time.Sleep(2 * time.Second) + err = ebDriver.SubscribeEventSources(ctx, conn, closeSubCh, depExpression, deps, filterFunc, actionFunc) + } + } + }(ctx, ebDriver) + + logger.Infof("Started to subscribe events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID) + err = ebDriver.SubscribeEventSources(ctx, conn, closeSubCh, depExpression, deps, filterFunc, actionFunc) if err != nil { logger.WithError(err).Errorln("Failed to subscribe to event bus") return } - }(ctx, k, v) + }(cctx, k, v) } logger.Info("Sensor started.") <-stopCh @@ -230,6 +253,8 @@ func (sensorCtx *SensorContext) getDependencyExpression(ctx context.Context, tri groupDepExpr = strings.ReplaceAll(groupDepExpr, "&&", " + \"&&\" + ") groupDepExpr = strings.ReplaceAll(groupDepExpr, "||", " + \"||\" + ") groupDepExpr = strings.ReplaceAll(groupDepExpr, "-", "_") + groupDepExpr = strings.ReplaceAll(groupDepExpr, "(", "\"(\"+") + groupDepExpr = strings.ReplaceAll(groupDepExpr, ")", "+\")\"") program, err := expr.Compile(groupDepExpr, expr.Env(depGroupMapping)) if err != nil { logger.WithError(err).Errorln("Failed to compile group dependency expression") @@ -241,6 +266,8 @@ func (sensorCtx *SensorContext) getDependencyExpression(ctx context.Context, tri return "", err } depExpression = fmt.Sprintf("%v", result) + depExpression = strings.ReplaceAll(depExpression, "\"(\"", "(") + depExpression = strings.ReplaceAll(depExpression, "\")\"", ")") } else { deps := []string{} for _, dep := range sensor.Spec.Dependencies { diff --git a/sensors/listener_test.go b/sensors/listener_test.go index 6f813081fb..caabc19551 100644 --- a/sensors/listener_test.go +++ b/sensors/listener_test.go @@ -119,7 +119,7 @@ func TestGetDependencyExpression(t *testing.T) { {Name: "group-1", Dependencies: []string{"dep1", "dep1a"}}, {Name: "group-2", Dependencies: []string{"dep2"}}, } - obj.Spec.Circuit = "group-1 || group-2" + obj.Spec.Circuit = "((group-2) || group-1)" trig := fakeTrigger.DeepCopy() trig.Template.Switch = &v1alpha1.TriggerSwitch{ Any: []string{"group-1"},