Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(eventbus): NATS eventbus auto reconnect and code refactory #749

Merged
merged 4 commits into from
Jul 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion eventbus/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ type Driver interface {
// SubscribeEventSources is used to subscribe multiple event source dependencies
// Parameter - ctx, context
// Parameter - conn, eventbus connection
// Parameter - closeCh, channel to indicate to close the subscription
// Parameter - dependencyExpr, example: "(dep1 || dep2) && dep3"
// Parameter - dependencies, array of dependencies information
// Parameter - filter, a function used to filter the message
// Parameter - action, a function to be triggered after all conditions meet
SubscribeEventSources(ctx context.Context, conn Connection, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error
SubscribeEventSources(ctx context.Context, conn Connection, closeCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error

// Publish a message
Publish(conn Connection, message []byte) error
Expand Down
40 changes: 23 additions & 17 deletions eventbus/driver/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package driver
import (
"context"
"encoding/json"
"os"
"os/signal"
"strings"
"time"

Expand Down Expand Up @@ -73,7 +71,16 @@ func NewNATSStreaming(url, clusterID, subject, clientID string, auth *Auth, logg

func (n *natsStreaming) Connect() (Connection, error) {
log := n.logger.WithField("clientID", n.clientID)
opts := []nats.Option{}
opts := []nats.Option{
nats.MaxReconnects(-1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the significance of -1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 means retry forever.

nats.ReconnectWait(3 * time.Second),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Errorf("NATS connection lost, reason: %v", err)
}),
nats.ReconnectHandler(func(nnc *nats.Conn) {
log.Info("Reconnected to NATS server")
}),
}
switch n.auth.Strategy {
case eventbusv1alpha1.AuthStrategyToken:
log.Info("NATS auth strategy: Token")
Expand All @@ -89,9 +96,10 @@ func (n *natsStreaming) Connect() (Connection, error) {
return nil, err
}
log.Info("Connected to NATS server.")
sc, err := stan.Connect(n.clusterID, n.clientID, stan.NatsConn(nc),

sc, err := stan.Connect(n.clusterID, n.clientID, stan.NatsConn(nc), stan.Pings(5, 60),
stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
log.Fatalf("Connection lost, reason: %v", reason)
log.Errorf("NATS streaming connection lost, reason: %v", reason)
}))
if err != nil {
log.Errorf("Failed to connect to NATS streaming server, %v", err)
Expand All @@ -114,7 +122,7 @@ func (n *natsStreaming) Publish(conn Connection, message []byte) error {
// Parameter - dependencies, array of dependencies information
// Parameter - filter, a function used to filter the message
// Parameter - action, a function to be triggered after all conditions meet
func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connection, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error {
func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connection, closeCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error {
log := n.logger.WithField("clientID", n.clientID)
msgHolder, err := newEventSourceMessageHolder(dependencyExpr, dependencies)
if err != nil {
Expand All @@ -138,20 +146,18 @@ func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connecti
return err
}
log.Infof("Subscribed to subject %s ...", n.subject)

signalChan := make(chan os.Signal, 1)
cleanupDone := make(chan bool)
signal.Notify(signalChan, os.Interrupt)
go func() {
for range signalChan {
log.Info("Received an interrupt, unsubscribing and closing connection...")
for {
select {
case <-ctx.Done():
log.Info("existing, unsubscribing and closing connection...")
_ = sub.Close()
log.Infof("subscription on subject %s closed", n.subject)
case <-closeCh:
log.Info("closing subscription...")
_ = sub.Close()
log.Infof("subscription on subject %s closed", n.subject)
cleanupDone <- true
}
}()
<-cleanupDone
return nil
}
}

func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourceMessageHolder, filter func(dependencyName string, event cloudevents.Event) bool, action func(map[string]cloudevents.Event), log *logrus.Entry) {
Expand Down
1 change: 1 addition & 0 deletions eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func GetDriver(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig, s
}
v.WatchConfig()
v.OnConfigChange(func(e fsnotify.Event) {
logger.Info("eventbus auth config file changed.")
err = v.Unmarshal(cred)
if err != nil {
logger.Errorf("failed to unmarshal auth.yaml after reloading, err: %v", err)
Expand Down
30 changes: 27 additions & 3 deletions eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type EventingServer interface {
GetEventSourceType() apicommon.EventSourceType

// Function to start listening events.
StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error
StartListening(ctx context.Context, dispatch func([]byte) error) error
}

// GetEventingServers returns the mapping of event source type and list of eventing servers
Expand Down Expand Up @@ -253,6 +253,31 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
logger.WithError(err).Errorln("failed to connect to eventbus")
return err
}
defer e.eventBusConn.Close()

// Daemon to reconnect
go func(ctx context.Context) {
logger.Info("starting eventbus connection daemon...")
for {
select {
case <-ctx.Done():
logger.Info("exiting eventbus connection daemon...")
return
default:
time.Sleep(3 * time.Second)
}
if e.eventBusConn == nil || e.eventBusConn.IsClosed() {
logger.Info("NATS connection lost, reconnecting...")
e.eventBusConn, err = driver.Connect()
if err != nil {
logger.WithError(err).Errorln("failed to reconnect to eventbus")
continue
}
logger.Info("reconnected the NATS streaming server...")
}
}
}(cctx)

for _, ss := range servers {
for _, server := range ss {
err := server.ValidateEventSource(cctx)
Expand All @@ -261,7 +286,7 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
return err
}
go func(s EventingServer) {
err := s.StartListening(cctx, stopCh, func(data []byte) error {
err := s.StartListening(cctx, func(data []byte) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can now remove // TODO: Need Retry

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

event := cloudevents.NewEvent()
event.SetID(fmt.Sprintf("%x", uuid.New()))
event.SetType(string(s.GetEventSourceType()))
Expand All @@ -280,7 +305,6 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
})
logger.WithField(logging.LabelEventSourceName, s.GetEventSourceName()).
WithField(logging.LabelEventName, s.GetEventName()).WithError(err).Errorln("failed to start service.")
// TODO: Need retry.
}(server)
}
}
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/amqp/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -141,7 +141,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
if err != nil {
log.WithError(err).Errorln("failed to dispatch event")
}
case <-stopCh:
case <-ctx.Done():
err = conn.Close()
if err != nil {
log.WithError(err).Info("failed to close connection")
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/awssns/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts an SNS event source
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
logger := logging.FromContext(ctx)
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/awssqs/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -96,7 +96,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
log.Infoln("listening for messages on the queue...")
for {
select {
case <-stopCh:
case <-ctx.Done():
log.Info("exiting SQS event listener...")
return nil
default:
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/azureeventshub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -123,7 +123,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
listenerHandles = append(listenerHandles, listenerHandle)
}

<-stopCh
<-ctx.Done()
log.Infoln("stopping listener handlers")

for _, handler := range listenerHandles {
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/calendar/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -126,7 +126,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
if err != nil {
log.WithError(err).Errorln("failed to dispatch event")
}
case <-stopCh:
case <-ctx.Done():
log.Info("exiting calendar event listener...")
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/emitter/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -130,7 +130,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
return errors.Wrapf(err, "failed to subscribe to channel %s", emitterEventSource.ChannelName)
}

<-stopCh
<-ctx.Done()

log.WithField("channel-name", emitterEventSource.ChannelName).Infoln("event source stopped, unsubscribe the channel")

Expand Down
14 changes: 7 additions & 7 deletions eventsources/sources/file/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand All @@ -68,12 +68,12 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc

fileEventSource := &el.FileEventSource
if fileEventSource.Polling {
if err := el.listenEventsPolling(ctx, stopCh, dispatch, log); err != nil {
if err := el.listenEventsPolling(ctx, dispatch, log); err != nil {
log.WithError(err).Errorln("failed to listen to events")
return err
}
} else {
if err := el.listenEvents(ctx, stopCh, dispatch, log); err != nil {
if err := el.listenEvents(ctx, dispatch, log); err != nil {
log.WithError(err).Errorln("failed to listen to events")
return err
}
Expand All @@ -82,7 +82,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
}

// listenEvents listen to file related events.
func (el *EventListener) listenEvents(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error, log *logrus.Entry) error {
func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte) error, log *logrus.Entry) error {
fileEventSource := &el.FileEventSource

// create new fs watcher
Expand Down Expand Up @@ -154,15 +154,15 @@ func (el *EventListener) listenEvents(ctx context.Context, stopCh <-chan struct{
}
case err := <-watcher.Errors:
return errors.Wrapf(err, "failed to process %s", el.GetEventName())
case <-stopCh:
case <-ctx.Done():
log.Infoln("event source has been stopped")
return nil
}
}
}

// listenEvents listen to file related events using polling.
func (el *EventListener) listenEventsPolling(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error, log *logrus.Entry) error {
func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte) error, log *logrus.Entry) error {
fileEventSource := &el.FileEventSource

// create new fs watcher
Expand Down Expand Up @@ -234,7 +234,7 @@ func (el *EventListener) listenEventsPolling(ctx context.Context, stopCh <-chan
case err := <-watcher.Error:
log.WithError(err).Errorf("failed to process %s", el.GetEventName())
return
case <-stopCh:
case <-ctx.Done():
log.Infoln("event source has been stopped")
return
}
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/gcppubsub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening listens to GCP PubSub events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
// In order to listen events from GCP PubSub,
// 1. Parse the event source that contains configuration to connect to GCP PubSub
// 2. Create a new PubSub client
Expand Down Expand Up @@ -169,7 +169,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
return errors.Wrapf(err, "failed to receive the messages for subscription %s and topic %s for %s", subscriptionName, pubsubEventSource.Topic, el.GetEventName())
}

<-stopCh
<-ctx.Done()

log.Infoln("event source has been stopped")

Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/github/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ func (router *Router) PostInactivate() error {
return nil
}

// StartListening starts an SNS event source
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
logger := logging.FromContext(ctx)
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/gitlab/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ func (router *Router) PostInactivate() error {
return nil
}

// StartListening starts an SNS event source
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
logger := logging.FromContext(ctx)
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/hdfs/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (w *WatchableHDFS) GetFileID(fi os.FileInfo) interface{} {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -164,7 +164,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
}
case err := <-watcher.Errors:
return errors.Wrapf(err, "failed to watch events for %s", el.GetEventName())
case <-stopCh:
case <-ctx.Done():
return nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/kafka/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func verifyPartitionAvailable(part int32, partitions []int32) bool {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struct{}, dispatch func([]byte) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte) error) error {
log := logging.FromContext(ctx).WithFields(map[string]interface{}{
logging.LabelEventSourceType: el.GetEventSourceType(),
logging.LabelEventSourceName: el.GetEventSourceName(),
Expand Down Expand Up @@ -157,7 +157,7 @@ func (el *EventListener) StartListening(ctx context.Context, stopCh <-chan struc
case err := <-partitionConsumer.Errors():
return errors.Wrapf(err, "failed to consume messages for event source %s", el.GetEventName())

case <-stopCh:
case <-ctx.Done():
log.Infoln("event source is stopped, closing partition consumer")
err = partitionConsumer.Close()
if err != nil {
Expand Down
Loading