Skip to content

Commit

Permalink
feat: Add common Messaging bootstrap handler
Browse files Browse the repository at this point in the history
This is needed as other core/support service start using the MessageBus
for Service Metrics and System Events.

Signed-off-by: Leonard Goodell <[email protected]>
  • Loading branch information
Leonard Goodell committed Jul 21, 2022
1 parent 0d265ec commit 3e442f3
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 5 deletions.
139 changes: 139 additions & 0 deletions bootstrap/handlers/messaging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*******************************************************************************
* Copyright 2022 Intel Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*******************************************************************************/

package handlers

import (
"context"
"strings"
"sync"

"github.com/edgexfoundry/go-mod-messaging/v2/messaging"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
boostrapMessaging "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/messaging"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/startup"
"github.com/edgexfoundry/go-mod-bootstrap/v2/config"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
)

// MessagingBootstrapHandler fulfills the BootstrapHandler contract. If creates and initializes the Messaging client
// and adds it to the DIC
func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool {
lc := container.LoggingClientFrom(dic.Get)
configuration := container.ConfigurationFrom(dic.Get)

messageQueue := configuration.GetBootstrap().MessageQueue
if len(messageQueue.Host) == 0 {
lc.Error("MessageQueue configuration not set or missing from service's GetBootstrap() implementation")
return false
}

// Make sure the MessageBus password is not leaked into the Service Config that can be retrieved via the /config endpoint
messageBusInfo := deepCopy(messageQueue)

messageBusInfo.AuthMode = strings.ToLower(strings.TrimSpace(messageBusInfo.AuthMode))
if len(messageBusInfo.AuthMode) > 0 && messageBusInfo.AuthMode != boostrapMessaging.AuthModeNone {
if err := boostrapMessaging.SetOptionsAuthData(&messageBusInfo, lc, dic); err != nil {
lc.Error(err.Error())
return false
}
}

msgClient, err := messaging.NewMessageClient(
types.MessageBusConfig{
PublishHost: types.HostInfo{
Host: messageBusInfo.Host,
Port: messageBusInfo.Port,
Protocol: messageBusInfo.Protocol,
},
SubscribeHost: types.HostInfo{
Host: messageBusInfo.Host,
Port: messageBusInfo.Port,
Protocol: messageBusInfo.Protocol,
},
Type: messageBusInfo.Type,
Optional: messageBusInfo.Optional,
})

if err != nil {
lc.Errorf("Failed to create MessageClient: %v", err)
return false
}

for startupTimer.HasNotElapsed() {
select {
case <-ctx.Done():
return false
default:
err = msgClient.Connect()
if err != nil {
lc.Warnf("Unable to connect MessageBus: %w", err)
startupTimer.SleepForInterval()
continue
}

wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
if msgClient != nil {
_ = msgClient.Disconnect()
}
lc.Infof("Disconnected from MessageBus")
}()

dic.Update(di.ServiceConstructorMap{
container.MessagingClientName: func(get di.Get) interface{} {
return msgClient
},
})

lc.Infof(
"Connected to %s Message Bus @ %s://%s:%d publishing on '%s' prefix topic with AuthMode='%s'",
messageBusInfo.Type,
messageBusInfo.Protocol,
messageBusInfo.Host,
messageBusInfo.Port,
messageBusInfo.PublishTopicPrefix,
messageBusInfo.AuthMode)

return true
}
}

lc.Error("Connecting to MessageBus time out")
return false
}

func deepCopy(target config.MessageBusInfo) config.MessageBusInfo {
result := config.MessageBusInfo{
Type: target.Type,
Protocol: target.Protocol,
Host: target.Host,
Port: target.Port,
PublishTopicPrefix: target.PublishTopicPrefix,
SubscribeTopic: target.SubscribeTopic,
AuthMode: target.AuthMode,
SecretName: target.SecretName,
SubscribeEnabled: target.SubscribeEnabled,
}
result.Optional = make(map[string]string)
for key, value := range target.Optional {
result.Optional[key] = value
}

return result
}
107 changes: 107 additions & 0 deletions bootstrap/handlers/messaging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package handlers

import (
"context"
"os"
"sync"
"testing"

"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
"github.com/edgexfoundry/go-mod-messaging/v2/messaging"
"github.com/stretchr/testify/assert"

"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces/mocks"
boostrapMessaging "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/messaging"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/startup"
"github.com/edgexfoundry/go-mod-bootstrap/v2/config"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
)

var lc logger.LoggingClient
var dic *di.Container
var usernameSecretData = map[string]string{
boostrapMessaging.SecretUsernameKey: "username",
boostrapMessaging.SecretPasswordKey: "password",
}

func TestMain(m *testing.M) {
lc = logger.NewMockClient()

dic = di.NewContainer(di.ServiceConstructorMap{
container.LoggingClientInterfaceName: func(get di.Get) interface{} {
return lc
},
})

os.Exit(m.Run())
}

func TestBootstrapHandler(t *testing.T) {
validCreateClient := config.MessageBusInfo{
Type: messaging.Redis,
Protocol: "redis",
Host: "localhost",
Port: 6379,
PublishTopicPrefix: "edgex/events/#",
AuthMode: boostrapMessaging.AuthModeUsernamePassword,
SecretName: "redisdb",
}

invalidSecrets := config.MessageBusInfo{
AuthMode: boostrapMessaging.AuthModeCert,
SecretName: "redisdb",
}

invalidNoConnect := config.MessageBusInfo{
Type: messaging.MQTT, // This will cause no connection since broker not available
Protocol: "tcp",
Host: "localhost",
Port: 8765,
AuthMode: boostrapMessaging.AuthModeUsernamePassword,
SecretName: "redisdb",
}

tests := []struct {
Name string
MessageQueue config.MessageBusInfo
ExpectedResult bool
ExpectClient bool
}{
{"Valid - creates client", validCreateClient, true, true},
{"Invalid - secrets error", invalidSecrets, false, false},
{"Invalid - can't connect", invalidNoConnect, false, false},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
provider := &mocks.SecretProvider{}
provider.On("GetSecret", test.MessageQueue.SecretName).Return(usernameSecretData, nil)
configMock := &mocks.Configuration{}
configMock.On("GetBootstrap").Return(config.BootstrapConfiguration{
MessageQueue: test.MessageQueue,
})

dic.Update(di.ServiceConstructorMap{
container.ConfigurationInterfaceName: func(get di.Get) interface{} {
return configMock
},
container.SecretProviderName: func(get di.Get) interface{} {
return provider
},
container.MessagingClientName: func(get di.Get) interface{} {
return nil
},
})

actual := MessagingBootstrapHandler(context.Background(), &sync.WaitGroup{}, startup.NewTimer(1, 1), dic)
assert.Equal(t, test.ExpectedResult, actual)
assert.Empty(t, test.MessageQueue.Optional)
if test.ExpectClient {
assert.NotNil(t, container.MessagingClientFrom(dic.Get))
} else {
assert.Nil(t, container.MessagingClientFrom(dic.Get))
}
})
}
}
11 changes: 6 additions & 5 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,12 @@ type InsecureSecretsInfo struct {

// BootstrapConfiguration defines the configuration elements required by the bootstrap.
type BootstrapConfiguration struct {
Clients map[string]ClientInfo
Service ServiceInfo
Config ConfigProviderInfo
Registry RegistryInfo
SecretStore SecretStoreInfo
Clients map[string]ClientInfo
Service ServiceInfo
Config ConfigProviderInfo
Registry RegistryInfo
SecretStore SecretStoreInfo
MessageQueue MessageBusInfo
}

// MessageBusInfo provides parameters related to connecting to a message bus as a publisher
Expand Down

0 comments on commit 3e442f3

Please sign in to comment.