Skip to content

Commit

Permalink
feat: Add capability to use messaging based Command Client (#384)
Browse files Browse the repository at this point in the history
* feat: Add capability to use messaging based Command Client

closes #382

Signed-off-by: Leonard Goodell <[email protected]>
  • Loading branch information
Lenny Goodell authored Oct 21, 2022
1 parent a43e448 commit 9ad12a8
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 75 deletions.
48 changes: 42 additions & 6 deletions bootstrap/handlers/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import (
"context"
"fmt"
"sync"
"time"

clients "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/http"
"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/interfaces"
"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
clientsMessaging "github.com/edgexfoundry/go-mod-messaging/v2/clients"
"github.com/edgexfoundry/go-mod-registry/v2/pkg/types"
"github.com/edgexfoundry/go-mod-registry/v2/registry"

Expand Down Expand Up @@ -55,10 +58,15 @@ func (cb *ClientsBootstrap) BootstrapHandler(
cb.registry = container.RegistryFrom(dic.Get)

for serviceKey, serviceInfo := range config.GetBootstrap().Clients {
url, err := cb.getClientUrl(serviceKey, serviceInfo.Url(), startupTimer, lc)
if err != nil {
lc.Error(err.Error())
return false
var url string
var err error

if !serviceInfo.UseMessageBus {
url, err = cb.getClientUrl(serviceKey, serviceInfo.Url(), startupTimer, lc)
if err != nil {
lc.Error(err.Error())
return false
}
}

switch serviceKey {
Expand All @@ -85,9 +93,37 @@ func (cb *ClientsBootstrap) BootstrapHandler(
})

case common.CoreCommandServiceKey:
var client interfaces.CommandClient

if serviceInfo.UseMessageBus {
// TODO: Move following outside loop when multiple messaging based clients exist
messageClient := container.MessagingClientFrom(dic.Get)
if messageClient == nil {
lc.Errorf("Unable to create Command client using MessageBus: %s", "MessageBus Client was not created")
return false
}

// TODO: Move following outside loop when multiple messaging based clients exist
timeout, err := time.ParseDuration(config.GetBootstrap().Service.RequestTimeout)
if err != nil {
lc.Errorf("Unable to parse Service.RequestTimeout as a time duration: %v", err)
return false
}

client, err = clientsMessaging.NewCommandClient(messageClient, serviceInfo.Topics, timeout)
if err != nil {
lc.Errorf("Unable to create messaging based Command Client: %v", err)
return false
}

lc.Infof("Using messaging for '%s' clients", serviceKey)
} else {
client = clients.NewCommandClient(url)
}

dic.Update(di.ServiceConstructorMap{
container.CommandClientName: func(get di.Get) interface{} {
return clients.NewCommandClient(url)
return client
},
})

Expand Down Expand Up @@ -121,7 +157,7 @@ func (cb *ClientsBootstrap) BootstrapHandler(

func (cb *ClientsBootstrap) getClientUrl(serviceKey string, defaultUrl string, startupTimer startup.Timer, lc logger.LoggingClient) (string, error) {
if cb.registry == nil {
lc.Infof("Using configuration for URL for '%s': %s", serviceKey, defaultUrl)
lc.Infof("Using REST for '%s' clients @ %s", serviceKey, defaultUrl)
return defaultUrl, nil
}

Expand Down
124 changes: 116 additions & 8 deletions bootstrap/handlers/clients_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
"testing"

"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
loggerMocks "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger/mocks"
"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
messagingMocks "github.com/edgexfoundry/go-mod-messaging/v2/messaging/mocks"
"github.com/edgexfoundry/go-mod-registry/v2/pkg/types"
"github.com/edgexfoundry/go-mod-registry/v2/registry"
registryMocks "github.com/edgexfoundry/go-mod-registry/v2/registry/mocks"
"github.com/stretchr/testify/mock"

"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces/mocks"
Expand All @@ -52,12 +55,22 @@ func TestClientsBootstrapHandler(t *testing.T) {
Protocol: "http",
}

commandClientInfo := config.ClientInfo{
commandHttpClientInfo := config.ClientInfo{
Host: "localhost",
Port: 59882,
Protocol: "http",
}

commandMessagingClientInfo := config.ClientInfo{
UseMessageBus: true,
Topics: map[string]string{
"QueryRequestTopic": "queryRequest",
"QueryResponseTopic": "queryResponse",
"CommandRequestTopicPrefix": "commandRequest",
"CommandResponseTopic": "commandResponse",
},
}

notificationClientInfo := config.ClientInfo{
Host: "localhost",
Port: 59860,
Expand Down Expand Up @@ -95,7 +108,7 @@ func TestClientsBootstrapHandler(t *testing.T) {
{
Name: "All ClientsBootstrap",
CoreDataClientInfo: &coreDataClientInfo,
CommandClientInfo: &commandClientInfo,
CommandClientInfo: &commandHttpClientInfo,
MetadataClientInfo: &metadataClientInfo,
NotificationClientInfo: &notificationClientInfo,
SchedulerClientInfo: &subscriberClientInfo,
Expand All @@ -105,7 +118,7 @@ func TestClientsBootstrapHandler(t *testing.T) {
{
Name: "All ClientsBootstrap using registry",
CoreDataClientInfo: &coreDataClientInfo,
CommandClientInfo: &commandClientInfo,
CommandClientInfo: &commandHttpClientInfo,
MetadataClientInfo: &metadataClientInfo,
NotificationClientInfo: &notificationClientInfo,
SchedulerClientInfo: &subscriberClientInfo,
Expand Down Expand Up @@ -152,39 +165,55 @@ func TestClientsBootstrapHandler(t *testing.T) {
Registry: nil,
ExpectedResult: true,
},
{
Name: "Only Messaging based Command ClientsBootstrap",
CoreDataClientInfo: nil,
CommandClientInfo: &commandMessagingClientInfo,
MetadataClientInfo: nil,
NotificationClientInfo: nil,
SchedulerClientInfo: nil,
Registry: nil,
ExpectedResult: true,
},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
clients := make(map[string]config.ClientInfo)

if test.CoreDataClientInfo != nil {
clients[common.CoreDataServiceKey] = coreDataClientInfo
clients[common.CoreDataServiceKey] = *test.CoreDataClientInfo
}

if test.CommandClientInfo != nil {
clients[common.CoreCommandServiceKey] = commandClientInfo
clients[common.CoreCommandServiceKey] = *test.CommandClientInfo
}

if test.MetadataClientInfo != nil {
clients[common.CoreMetaDataServiceKey] = metadataClientInfo
clients[common.CoreMetaDataServiceKey] = *test.MetadataClientInfo
}

if test.NotificationClientInfo != nil {
clients[common.SupportNotificationsServiceKey] = notificationClientInfo
clients[common.SupportNotificationsServiceKey] = *test.NotificationClientInfo
}

if test.SchedulerClientInfo != nil {
clients[common.SupportSchedulerServiceKey] = subscriberClientInfo
clients[common.SupportSchedulerServiceKey] = *test.SchedulerClientInfo
}

bootstrapConfig := config.BootstrapConfiguration{
Service: config.ServiceInfo{
RequestTimeout: "30s",
},
Clients: clients,
}

configMock := &mocks.Configuration{}
configMock.On("GetBootstrap").Return(bootstrapConfig)

messageClient := &messagingMocks.MessageClient{}
messageClient.On("Subscribe", mock.Anything, mock.Anything).Return(nil)

dic := di.NewContainer(di.ServiceConstructorMap{
container.LoggingClientInterfaceName: func(get di.Get) interface{} {
return lc
Expand All @@ -195,6 +224,9 @@ func TestClientsBootstrapHandler(t *testing.T) {
container.ConfigurationInterfaceName: func(get di.Get) interface{} {
return configMock
},
container.MessagingClientName: func(get di.Get) interface{} {
return messageClient
},
})

actualResult := NewClientsBootstrap().BootstrapHandler(context.Background(), &sync.WaitGroup{}, startupTimer, dic)
Expand Down Expand Up @@ -260,3 +292,79 @@ func TestClientsBootstrapHandler(t *testing.T) {
})
}
}

func TestCommandMessagingClientErrors(t *testing.T) {
validDuration := "30s"
invalidDuration := "xyz"

validTopics := map[string]string{
"QueryRequestTopic": "queryRequest",
"QueryResponseTopic": "queryResponse",
"CommandRequestTopicPrefix": "commandRequest",
"CommandResponseTopic": "commandResponse",
}

tests := []struct {
Name string
MessagingClientPresent bool
TimeoutDuration string
SubscribeError bool
}{
{"Missing Messaging Client", false, validDuration, false},
{"Error creating Messaging Client", true, validDuration, true},
{"Bad Timeout duration", true, invalidDuration, false},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
mockLogger := &loggerMocks.LoggingClient{}
mockLogger.On("Errorf", mock.Anything)
mockLogger.On("Errorf", mock.Anything, mock.Anything)

mockMessaging := &messagingMocks.MessageClient{}
if test.SubscribeError {
mockMessaging.On("Subscribe", mock.Anything, mock.Anything).Return(errors.New("failed"))
} else {
mockMessaging.On("Subscribe", mock.Anything, mock.Anything).Return(nil)
}

clients := make(map[string]config.ClientInfo)
clients[common.CoreCommandServiceKey] = config.ClientInfo{
UseMessageBus: true,
Topics: validTopics,
}

bootstrapConfig := config.BootstrapConfiguration{
Service: config.ServiceInfo{
RequestTimeout: test.TimeoutDuration,
},
Clients: clients,
}

configMock := &mocks.Configuration{}
configMock.On("GetBootstrap").Return(bootstrapConfig)

dic := di.NewContainer(di.ServiceConstructorMap{
container.LoggingClientInterfaceName: func(get di.Get) interface{} {
return mockLogger
},
container.ConfigurationInterfaceName: func(get di.Get) interface{} {
return configMock
},
container.MessagingClientName: func(get di.Get) interface{} {
if test.MessagingClientPresent {
return mockMessaging
} else {
return nil
}
},
})

startupTimer := startup.NewTimer(1, 1)
actualResult := NewClientsBootstrap().BootstrapHandler(context.Background(), &sync.WaitGroup{}, startupTimer, dic)
require.False(t, actualResult)

mockLogger.AssertNumberOfCalls(t, "Errorf", 1)
})
}
}
4 changes: 4 additions & 0 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ type ClientInfo struct {
Port int
// Protocol indicates the protocol to use when accessing a given service
Protocol string
// UseMessageBus indicates weather to use Messaging version of client
UseMessageBus bool
// Topics holds the MessageBus topics used by the client to communicate to the service
Topics map[string]string
}

func (c ClientInfo) Url() string {
Expand Down
34 changes: 18 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ go 1.18

require (
github.com/eclipse/paho.mqtt.golang v1.4.1
github.com/edgexfoundry/go-mod-configuration/v2 v2.2.0
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0
github.com/edgexfoundry/go-mod-messaging/v2 v2.2.0
github.com/edgexfoundry/go-mod-registry/v2 v2.2.0
github.com/edgexfoundry/go-mod-configuration/v2 v2.3.0-dev.14
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.3.0-dev.18
github.com/edgexfoundry/go-mod-messaging/v2 v2.3.0-dev.23
github.com/edgexfoundry/go-mod-registry/v2 v2.3.0-dev.10
github.com/edgexfoundry/go-mod-secrets/v2 v2.3.0-dev.8
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
Expand All @@ -19,27 +19,27 @@ require (

require (
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/armon/go-metrics v0.3.10 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.9.0 // indirect
github.com/fxamacker/cbor/v2 v2.4.0 // indirect
github.com/go-kit/log v0.2.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.1 // indirect
github.com/go-playground/validator/v10 v10.11.1 // indirect
github.com/go-redis/redis/v7 v7.3.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/consul/api v1.9.1 // indirect
github.com/hashicorp/consul/api v1.15.2 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-hclog v0.12.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-hclog v0.14.1 // indirect
github.com/hashicorp/go-immutable-radix v1.3.0 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/hashicorp/serf v0.9.5 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/serf v0.9.7 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
Expand All @@ -48,17 +48,19 @@ require (
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.0 // indirect
github.com/nats-io/nats.go v1.18.0 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pebbe/zmq4 v1.2.7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spiffe/go-spiffe/v2 v2.1.1 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/errs v1.2.2 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2 // indirect
golang.org/x/net v0.0.0-20211216030914-fe4d6282115f // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d // indirect
golang.org/x/sys v0.0.0-20221010170243-090e33056c14 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.46.0 // indirect
Expand Down
Loading

0 comments on commit 9ad12a8

Please sign in to comment.