From aa964d3e44bebd8684ac83297f4df5e5a5ce4272 Mon Sep 17 00:00:00 2001 From: lenny Date: Fri, 3 Apr 2020 13:55:22 -0700 Subject: [PATCH] fix: Add call to Message Bus Connect() Core data was missing call to connect to message bus and needed latest go-mod-messging for MQTT Message Bus to work. closes #2466 Signed-off-by: lenny --- go.mod | 2 +- internal/core/data/init.go | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 85e49f7364..1625fa08b1 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/edgexfoundry/go-mod-bootstrap v0.0.26 github.com/edgexfoundry/go-mod-configuration v0.0.3 github.com/edgexfoundry/go-mod-core-contracts v0.1.52 - github.com/edgexfoundry/go-mod-messaging v0.1.14 + github.com/edgexfoundry/go-mod-messaging v0.1.16 github.com/edgexfoundry/go-mod-registry v0.1.17 github.com/edgexfoundry/go-mod-secrets v0.0.17 github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 diff --git a/internal/core/data/init.go b/internal/core/data/init.go index 6ee35ccab4..d59235e178 100644 --- a/internal/core/data/init.go +++ b/internal/core/data/init.go @@ -54,6 +54,7 @@ func (b *Bootstrap) BootstrapHandler(ctx context.Context, wg *sync.WaitGroup, _ configuration := dataContainer.ConfigurationFrom(dic.Get) registryClient := container.RegistryFrom(dic.Get) + lc := container.LoggingClientFrom(dic.Get) mdc := metadata.NewDeviceClient( urlclient.New( @@ -90,12 +91,42 @@ func (b *Bootstrap) BootstrapHandler(ctx context.Context, wg *sync.WaitGroup, _ Optional: configuration.MessageQueue.Optional, }) - lc := container.LoggingClientFrom(dic.Get) if err != nil { lc.Error(fmt.Sprintf("failed to create messaging client: %s", err.Error())) return false } + err = msgClient.Connect() + if err != nil { + lc.Error(fmt.Sprintf("failed to connect to message bus: %s", err.Error())) + return false + } + + // Setup special "defer" go func that will disconnect from the message bus when the service is exiting + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + if err := msgClient.Disconnect(); err != nil { + lc.Error("failed to disconnect from the Message Bus") + return + } + lc.Info("Message Bus disconnected") + return + } + } + }() + + lc.Info(fmt.Sprintf( + "Connected to %s Message Bus @ %s://%s:%d publishing on '%s' topic", + configuration.MessageQueue.Type, + configuration.MessageQueue.Protocol, + configuration.MessageQueue.Host, + configuration.MessageQueue.Port, + configuration.MessageQueue.Topic)) + chEvents := make(chan interface{}, 100) // initialize event handlers initEventHandlers(lc, chEvents, mdc, msc, configuration)