Skip to content

Commit

Permalink
fix(eventbus): NATS eventbus auto reconnect and code refactory (#749)
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy authored Jul 17, 2020
1 parent 210aefb commit a8afcb7
Show file tree
Hide file tree
Showing 27 changed files with 142 additions and 83 deletions.
3 changes: 2 additions & 1 deletion eventbus/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ type Driver interface {
// SubscribeEventSources is used to subscribe multiple event source dependencies
// Parameter - ctx, context
// Parameter - conn, eventbus connection
// Parameter - closeCh, channel to indicate to close the subscription
// Parameter - dependencyExpr, example: "(dep1 || dep2) && dep3"
// Parameter - dependencies, array of dependencies information
// Parameter - filter, a function used to filter the message
// Parameter - action, a function to be triggered after all conditions meet
SubscribeEventSources(ctx context.Context, conn Connection, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error
SubscribeEventSources(ctx context.Context, conn Connection, closeCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error

// Publish a message
Publish(conn Connection, message []byte) error
Expand Down
40 changes: 23 additions & 17 deletions eventbus/driver/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package driver
import (
"context"
"encoding/json"
"os"
"os/signal"
"strings"
"time"

Expand Down Expand Up @@ -73,7 +71,16 @@ func NewNATSStreaming(url, clusterID, subject, clientID string, auth *Auth, logg

func (n *natsStreaming) Connect() (Connection, error) {
log := n.logger.WithField("clientID", n.clientID)
opts := []nats.Option{}
opts := []nats.Option{
nats.MaxReconnects(-1),
nats.ReconnectWait(3 * time.Second),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Errorf("NATS connection lost, reason: %v", err)
}),
nats.ReconnectHandler(func(nnc *nats.Conn) {
log.Info("Reconnected to NATS server")
}),
}
switch n.auth.Strategy {
case eventbusv1alpha1.AuthStrategyToken:
log.Info("NATS auth strategy: Token")
Expand All @@ -89,9 +96,10 @@ func (n *natsStreaming) Connect() (Connection, error) {
return nil, err
}
log.Info("Connected to NATS server.")
sc, err := stan.Connect(n.clusterID, n.clientID, stan.NatsConn(nc),

sc, err := stan.Connect(n.clusterID, n.clientID, stan.NatsConn(nc), stan.Pings(5, 60),
stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
log.Fatalf("Connection lost, reason: %v", reason)
log.Errorf("NATS streaming connection lost, reason: %v", reason)
}))
if err != nil {
log.Errorf("Failed to connect to NATS streaming server, %v", err)
Expand All @@ -114,7 +122,7 @@ func (n *natsStreaming) Publish(conn Connection, message []byte) error {
// Parameter - dependencies, array of dependencies information
// Parameter - filter, a function used to filter the message
// Parameter - action, a function to be triggered after all conditions meet
func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connection, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error {
func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connection, closeCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error {
log := n.logger.WithField("clientID", n.clientID)
msgHolder, err := newEventSourceMessageHolder(dependencyExpr, dependencies)
if err != nil {
Expand All @@ -138,20 +146,18 @@ func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connecti
return err
}
log.Infof("Subscribed to subject %s ...", n.subject)

signalChan := make(chan os.Signal, 1)
cleanupDone := make(chan bool)
signal.Notify(signalChan, os.Interrupt)
go func() {
for range signalChan {
log.Info("Received an interrupt, unsubscribing and closing connection...")
for {
select {
case <-ctx.Done():
log.Info("existing, unsubscribing and closing connection...")
_ = sub.Close()
log.Infof("subscription on subject %s closed", n.subject)
case <-closeCh:
log.Info("closing subscription...")
_ = sub.Close()
log.Infof("subscription on subject %s closed", n.subject)
cleanupDone <- true
}
}()
<-cleanupDone
return nil
}
}

func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourceMessageHolder, filter func(dependencyName string, event cloudevents.Event) bool, action func(map[string]cloudevents.Event), log *logrus.Entry) {
Expand Down
1 change: 1 addition & 0 deletions eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func GetDriver(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig, s
}
v.WatchConfig()
v.OnConfigChange(func(e fsnotify.Event) {
logger.Info("eventbus auth config file changed.")
err = v.Unmarshal(cred)
if err != nil {
logger.Errorf("failed to unmarshal auth.yaml after reloading, err: %v", err)
Expand Down
30 changes: 27 additions & 3 deletions eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type EventingServer interface {
GetEventSourceType() apicommon.EventSourceType

// Function to start listening events.
StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error
StartListening(ctx context.Context, dispatch func([]byte) error) error
}

// GetEventingServers returns the mapping of event source type and list of eventing servers
Expand Down Expand Up @@ -253,6 +253,31 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
logger.WithError(err).Errorln("failed to connect to eventbus")
return err
}
defer e.eventBusConn.Close()

// Daemon to reconnect
go func(ctx context.Context) {
logger.Info("starting eventbus connection daemon...")
for {
select {
case <-ctx.Done():
logger.Info("exiting eventbus connection daemon...")
return
default:
time.Sleep(3 * time.Second)
}
if e.eventBusConn == nil || e.eventBusConn.IsClosed() {
logger.Info("NATS connection lost, reconnecting...")
e.eventBusConn, err = driver.Connect()
if err != nil {
logger.WithError(err).Errorln("failed to reconnect to eventbus")
continue
}
logger.Info("reconnected the NATS streaming server...")
}
}
}(cctx)

for _, ss := range servers {
for _, server := range ss {
err := server.ValidateEventSource(cctx)
Expand All @@ -261,7 +286,7 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
return err
}
go func(s EventingServer) {
err := s.StartListening(cctx, stopCh, func(data []byte) error {
err := s.StartListening(cctx, func(data []byte) error {
event := cloudevents.NewEvent()
event.SetID(fmt.Sprintf("%x", uuid.New()))
event.SetType(string(s.GetEventSourceType()))
Expand All @@ -280,7 +305,6 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
})
logger.WithField(logging.LabelEventSourceName, s.GetEventSourceName()).
WithField(logging.LabelEventName, s.GetEventName()).WithError(err).Errorln("failed to start service.")
// TODO: Need retry.
}(server)
}
}
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/amqp/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -141,7 +141,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
if err != nil {
log.WithError(err).Errorln("failed to dispatch event")
}
case <-stopCh:
case <-ctx.Done():
err = conn.Close()
if err != nil {
log.WithError(err).Info("failed to close connection")
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/awssns/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts an SNS event source
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
logger := logging.FromContext(ctx)
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/awssqs/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -96,7 +96,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
log.Infoln("listening for messages on the queue...")
for {
select {
case <-stopCh:
case <-ctx.Done():
log.Info("exiting SQS event listener...")
return nil
default:
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/azureeventshub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -123,7 +123,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
listenerHandles = append(listenerHandles, listenerHandle)
}

<-stopCh
<-ctx.Done()
log.Infoln("stopping listener handlers")

for _, handler := range listenerHandles {
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/calendar/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -126,7 +126,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
if err != nil {
log.WithError(err).Errorln("failed to dispatch event")
}
case <-stopCh:
case <-ctx.Done():
log.Info("exiting calendar event listener...")
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/emitter/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -130,7 +130,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
return errors.Wrapf(err, "failed to subscribe to channel %s", emitterEventSource.ChannelName)
}

<-stopCh
<-ctx.Done()

log.WithField("channel-name", emitterEventSource.ChannelName).Infoln("event source stopped, unsubscribe the channel")

Expand Down
14 changes: 7 additions & 7 deletions eventsources/sources/file/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand All @@ -68,12 +68,12 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc

fileEventSource := &el.FileEventSource
if fileEventSource.Polling {
if err := el.listenEventsPolling(ctx, stopCh, dispatch, log); err != nil {
if err := el.listenEventsPolling(ctx, dispatch, log); err != nil {
log.WithError(err).Errorln("failed to listen to events")
return err
}
} else {
if err := el.listenEvents(ctx, stopCh, dispatch, log); err != nil {
if err := el.listenEvents(ctx, dispatch, log); err != nil {
log.WithError(err).Errorln("failed to listen to events")
return err
}
Expand All @@ -82,7 +82,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
}

// listenEvents listen to file related events.
func (el *EventListener) listenEvents(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error, log *logrus.Entry) error {
func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte) error, log *logrus.Entry) error {
fileEventSource := &el.FileEventSource

// create new fs watcher
Expand Down Expand Up @@ -154,15 +154,15 @@ func (el *EventListener) listenEvents(ctx context.Context, stopCh <-chan struct{
}
case err := <-watcher.Errors:
return errors.Wrapf(err, "failed to process %s", el.GetEventName())
case <-stopCh:
case <-ctx.Done():
log.Infoln("event source has been stopped")
return nil
}
}
}

// listenEvents listen to file related events using polling.
func (el *EventListener) listenEventsPolling(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error, log *logrus.Entry) error {
func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte) error, log *logrus.Entry) error {
fileEventSource := &el.FileEventSource

// create new fs watcher
Expand Down Expand Up @@ -234,7 +234,7 @@ func (el *EventListener) listenEventsPolling(ctx context.Context, stopCh <-chan
case err := <-watcher.Error:
log.WithError(err).Errorf("failed to process %s", el.GetEventName())
return
case <-stopCh:
case <-ctx.Done():
log.Infoln("event source has been stopped")
return
}
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/gcppubsub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening listens to GCP PubSub events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
// In order to listen events from GCP PubSub,
// 1. Parse the event source that contains configuration to connect to GCP PubSub
// 2. Create a new PubSub client
Expand Down Expand Up @@ -169,7 +169,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
return errors.Wrapf(err, "failed to receive the messages for subscription %s and topic %s for %s", subscriptionName, pubsubEventSource.Topic, el.GetEventName())
}

<-stopCh
<-ctx.Done()

log.Infoln("event source has been stopped")

Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/github/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ func (router *Router) PostInactivate() error {
return nil
}

// StartListening starts an SNS event source
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
logger := logging.FromContext(ctx)
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/gitlab/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ func (router *Router) PostInactivate() error {
return nil
}

// StartListening starts an SNS event source
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
logger := logging.FromContext(ctx)
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/hdfs/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (w *WatchableHDFS) GetFileID(fi os.FileInfo) interface{} {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -164,7 +164,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
}
case err := <-watcher.Errors:
return errors.Wrapf(err, "failed to watch events for %s", el.GetEventName())
case <-stopCh:
case <-ctx.Done():
return nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/kafka/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func verifyPartitionAvailable(part int32, partitions []int32) bool {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -157,7 +157,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
case err := <-partitionConsumer.Errors():
return errors.Wrapf(err, "failed to consume messages for event source %s", el.GetEventName())

case <-stopCh:
case <-ctx.Done():
log.Infoln("event source is stopped, closing partition consumer")
err = partitionConsumer.Close()
if err != nil {
Expand Down
Loading

0 comments on commit a8afcb7

Please sign in to comment.