From 53ea08458c088211ea97e9c7c75ec93fecbf0733 Mon Sep 17 00:00:00 2001 From: Chris Hung Date: Fri, 23 Sep 2022 10:51:30 +0800 Subject: [PATCH 1/3] refactor(command): move external package to messaging Signed-off-by: Chris Hung --- .../command/controller/messaging/{external => }/external.go | 2 +- .../controller/messaging/{external => }/external_test.go | 2 +- internal/core/command/main.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) rename internal/core/command/controller/messaging/{external => }/external.go (99%) rename internal/core/command/controller/messaging/{external => }/external_test.go (99%) diff --git a/internal/core/command/controller/messaging/external/external.go b/internal/core/command/controller/messaging/external.go similarity index 99% rename from internal/core/command/controller/messaging/external/external.go rename to internal/core/command/controller/messaging/external.go index 90d5cc872b..e49462e3f8 100644 --- a/internal/core/command/controller/messaging/external/external.go +++ b/internal/core/command/controller/messaging/external.go @@ -3,7 +3,7 @@ // // SPDX-License-Identifier: Apache-2.0 -package external +package messaging import ( "context" diff --git a/internal/core/command/controller/messaging/external/external_test.go b/internal/core/command/controller/messaging/external_test.go similarity index 99% rename from internal/core/command/controller/messaging/external/external_test.go rename to internal/core/command/controller/messaging/external_test.go index 89881c9cd1..7a50539589 100644 --- a/internal/core/command/controller/messaging/external/external_test.go +++ b/internal/core/command/controller/messaging/external_test.go @@ -3,7 +3,7 @@ // // SPDX-License-Identifier: Apache-2.0 -package external +package messaging import ( "context" diff --git a/internal/core/command/main.go b/internal/core/command/main.go index d10a21281b..4cd78fb242 100644 --- a/internal/core/command/main.go +++ b/internal/core/command/main.go @@ -31,7 +31,7 @@ import ( "github.com/edgexfoundry/edgex-go/internal" "github.com/edgexfoundry/edgex-go/internal/core/command/config" "github.com/edgexfoundry/edgex-go/internal/core/command/container" - "github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/external" + "github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging" "github.com/edgexfoundry/edgex-go/internal/pkg/telemetry" "github.com/edgexfoundry/go-mod-core-contracts/v2/common" @@ -92,7 +92,7 @@ func MessageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startup if !handlers.MessagingBootstrapHandler(ctx, wg, startupTimer, dic) { return false } - if !handlers.NewExternalMQTT(external.OnConnectHandler(dic)).BootstrapHandler(ctx, wg, startupTimer, dic) { + if !handlers.NewExternalMQTT(messaging.OnConnectHandler(dic)).BootstrapHandler(ctx, wg, startupTimer, dic) { return false } } From 2588d96d47e90008de298c9702d0e0fb9468b1e8 Mon Sep 17 00:00:00 2001 From: Chris Hung Date: Fri, 23 Sep 2022 12:49:28 +0800 Subject: [PATCH 2/3] feat(command): publish device service response to external MQTT subscribe to device command response from device service via internal message bus and publish response back to 3rd-party requester Signed-off-by: Chris Hung --- .../command/controller/messaging/internal.go | 74 +++++++++++++++++++ internal/core/command/main.go | 6 ++ 2 files changed, 80 insertions(+) create mode 100644 internal/core/command/controller/messaging/internal.go diff --git a/internal/core/command/controller/messaging/internal.go b/internal/core/command/controller/messaging/internal.go new file mode 100644 index 0000000000..1cb7e955d7 --- /dev/null +++ b/internal/core/command/controller/messaging/internal.go @@ -0,0 +1,74 @@ +// +// Copyright (C) 2022 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package messaging + +import ( + "context" + "strings" + + bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" + "github.com/edgexfoundry/go-mod-bootstrap/v2/di" + "github.com/edgexfoundry/go-mod-core-contracts/v2/errors" + "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" + + "github.com/edgexfoundry/edgex-go/internal/core/command/container" +) + +func SubscribeCommandResponses(ctx context.Context, dic *di.Container) errors.EdgeX { + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue + internalResponseTopic := messageBusInfo.Internal.Topics[ResponseTopic] + externalResponseTopicPrefix := messageBusInfo.External.Topics[ResponseCommandTopicPrefix] + + messages := make(chan types.MessageEnvelope) + messageErrors := make(chan error) + topics := []types.TopicChannel{ + { + Topic: internalResponseTopic, + Messages: messages, + }, + } + + messageBus := bootstrapContainer.MessagingClientFrom(dic.Get) + err := messageBus.Subscribe(topics, messageErrors) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + + qos := messageBusInfo.External.QoS + retain := messageBusInfo.External.Retain + externalMQTT := bootstrapContainer.ExternalMQTTMessagingClientFrom(dic.Get) + go func() { + for { + select { + case <-ctx.Done(): + lc.Infof("Exiting waiting for MessageBus '%s' topic messages", internalResponseTopic) + return + case err := <-messageErrors: + lc.Error(err.Error()) + case msgEnvelope := <-messages: + lc.Debugf("Command response received on message queue. Topic: %s, Correlation-id: %s ", internalResponseTopic, msgEnvelope.CorrelationID) + + // expected internal command response topic scheme: #//// + topicLevels := strings.Split(msgEnvelope.ReceivedTopic, "/") + length := len(topicLevels) + if length < 4 { + lc.Error("Failed to parse and construct command response topic scheme, expected request topic scheme: '#////'") + continue + } + + // expected external command response topic scheme: #/// + deviceName := topicLevels[length-3] + commandName := topicLevels[length-2] + method := topicLevels[length-1] + externalResponseTopic := strings.Join([]string{externalResponseTopicPrefix, deviceName, commandName, method}, "/") + publishMessage(externalMQTT, externalResponseTopic, qos, retain, msgEnvelope, lc) + } + } + }() + + return nil +} diff --git a/internal/core/command/main.go b/internal/core/command/main.go index 4cd78fb242..39bee18522 100644 --- a/internal/core/command/main.go +++ b/internal/core/command/main.go @@ -21,6 +21,7 @@ import ( "sync" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap" + bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/flags" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/handlers" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces" @@ -95,6 +96,11 @@ func MessageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startup if !handlers.NewExternalMQTT(messaging.OnConnectHandler(dic)).BootstrapHandler(ctx, wg, startupTimer, dic) { return false } + if err := messaging.SubscribeCommandResponses(ctx, dic); err != nil { + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + lc.Errorf("Failed to subscribe commands from message bus, %v", err) + return false + } } // Not required so do nothing From 91c61b82ac015b80697b61be136491ee1ef0ae34 Mon Sep 17 00:00:00 2001 From: Chris Hung Date: Thu, 29 Sep 2022 11:27:25 +0800 Subject: [PATCH 3/3] feat(command): implement MessagingRouter Save active requests in two maps using requestId as keys. This allows command service to know where to route the response (to external MQTT or internal MessageBus) Signed-off-by: Chris Hung --- .../command/controller/messaging/external.go | 10 +-- .../controller/messaging/external_test.go | 29 +++++---- .../command/controller/messaging/internal.go | 25 +++---- .../messaging/mocks/MessagingRouter.go | 58 +++++++++++++++++ .../command/controller/messaging/router.go | 65 +++++++++++++++++++ internal/core/command/main.go | 5 +- 6 files changed, 157 insertions(+), 35 deletions(-) create mode 100644 internal/core/command/controller/messaging/mocks/MessagingRouter.go create mode 100644 internal/core/command/controller/messaging/router.go diff --git a/internal/core/command/controller/messaging/external.go b/internal/core/command/controller/messaging/external.go index e49462e3f8..694223ea8d 100644 --- a/internal/core/command/controller/messaging/external.go +++ b/internal/core/command/controller/messaging/external.go @@ -34,7 +34,7 @@ const ( ResponseTopic = "ResponseTopic" ) -func OnConnectHandler(dic *di.Container) mqtt.OnConnectHandler { +func OnConnectHandler(router MessagingRouter, dic *di.Container) mqtt.OnConnectHandler { return func(client mqtt.Client) { lc := bootstrapContainer.LoggingClientFrom(dic.Get) config := container.ConfigurationFrom(dic.Get) @@ -50,7 +50,7 @@ func OnConnectHandler(dic *di.Container) mqtt.OnConnectHandler { } requestCommandTopic := externalTopics[RequestCommandTopic] - if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(dic)); token.Wait() && token.Error() != nil { + if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(router, dic)); token.Wait() && token.Error() != nil { lc.Errorf("could not subscribe to topic '%s': %s", responseQueryTopic, token.Error().Error()) return } @@ -134,7 +134,7 @@ func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Co } } -func commandRequestHandler(dic *di.Container) mqtt.MessageHandler { +func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.MessageHandler { return func(client mqtt.Client, message mqtt.Message) { lc := bootstrapContainer.LoggingClientFrom(dic.Get) messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue @@ -151,7 +151,7 @@ func commandRequestHandler(dic *di.Container) mqtt.MessageHandler { // expected command request topic scheme: #/// topicLevels := strings.Split(message.Topic(), "/") length := len(topicLevels) - if length <= 3 { + if length < 3 { lc.Error("Failed to parse and construct response topic scheme, expected request topic scheme: '#///") lc.Warn("Not publishing error message back due to insufficient information on response topic") return @@ -207,6 +207,8 @@ func commandRequestHandler(dic *di.Container) mqtt.MessageHandler { publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc) return } + + router.SetResponseTopic(requestEnvelope.RequestID, externalResponseTopic, true) } } diff --git a/internal/core/command/controller/messaging/external_test.go b/internal/core/command/controller/messaging/external_test.go index 7a50539589..0afffb7839 100644 --- a/internal/core/command/controller/messaging/external_test.go +++ b/internal/core/command/controller/messaging/external_test.go @@ -23,14 +23,14 @@ import ( commonDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/common" "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/responses" edgexErr "github.com/edgexfoundry/go-mod-core-contracts/v2/errors" - "github.com/edgexfoundry/go-mod-messaging/v2/messaging/mocks" + internalMessagingMocks "github.com/edgexfoundry/go-mod-messaging/v2/messaging/mocks" "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/edgexfoundry/edgex-go/internal/core/command/config" "github.com/edgexfoundry/edgex-go/internal/core/command/container" - mqttMocks "github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/mocks" + "github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/mocks" ) const ( @@ -57,6 +57,7 @@ const ( ) func TestOnConnectHandler(t *testing.T) { + mockRouter := &mocks.MessagingRouter{} lc := &lcMocks.LoggingClient{} lc.On("Errorf", mock.Anything, mock.Anything, mock.Anything).Return(nil) dic := di.NewContainer(di.ServiceConstructorMap{ @@ -91,7 +92,7 @@ func TestOnConnectHandler(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - token := &mqttMocks.Token{} + token := &mocks.Token{} token.On("Wait").Return(true) if tt.expectedSucceed { token.On("Error").Return(nil) @@ -99,11 +100,11 @@ func TestOnConnectHandler(t *testing.T) { token.On("Error").Return(errors.New("error")) } - client := &mqttMocks.Client{} + client := &mocks.Client{} client.On("Subscribe", testQueryRequestTopic, byte(0), mock.Anything).Return(token) client.On("Subscribe", testExternalCommandRequestTopic, byte(0), mock.Anything).Return(token) - fn := OnConnectHandler(dic) + fn := OnConnectHandler(mockRouter, dic) fn(client) if tt.expectedSucceed { @@ -205,15 +206,15 @@ func Test_commandQueryHandler(t *testing.T) { payloadBytes, err := json.Marshal(tt.payload) require.NoError(t, err) - message := &mqttMocks.Message{} + message := &mocks.Message{} message.On("Payload").Return(payloadBytes) message.On("Topic").Return(tt.requestQueryTopic) - token := &mqttMocks.Token{} + token := &mocks.Token{} token.On("Wait").Return(true) token.On("Error").Return(nil) - mqttClient := &mqttMocks.Client{} + mqttClient := &mocks.Client{} mqttClient.On("Publish", testQueryResponseTopic, byte(0), true, mock.Anything).Return(token) fn := commandQueryHandler(testQueryResponseTopic, 0, true, dic) @@ -262,6 +263,8 @@ func Test_commandRequestHandler(t *testing.T) { }, } + mockRouter := &mocks.MessagingRouter{} + mockRouter.On("SetResponseTopic", mock.Anything, mock.Anything, mock.Anything).Return(nil) lc := &lcMocks.LoggingClient{} lc.On("Error", mock.Anything).Return(nil) lc.On("Errorf", mock.Anything, mock.Anything).Return(nil) @@ -274,7 +277,7 @@ func Test_commandRequestHandler(t *testing.T) { dsc := &clientMocks.DeviceServiceClient{} dsc.On("DeviceServiceByName", context.Background(), testDeviceServiceName).Return(deviceServiceResponse, nil) dsc.On("DeviceServiceByName", context.Background(), unknownService).Return(responses.DeviceServiceResponse{}, edgexErr.NewCommonEdgeX(edgexErr.KindEntityDoesNotExist, "unknown device service", nil)) - client := &mocks.MessageClient{} + client := &internalMessagingMocks.MessageClient{} client.On("Publish", mock.Anything, mock.Anything).Return(nil) dic := di.NewContainer(di.ServiceConstructorMap{ container.ConfigurationName: func(get di.Get) interface{} { @@ -338,18 +341,18 @@ func Test_commandRequestHandler(t *testing.T) { payloadBytes, err := json.Marshal(tt.payload) require.NoError(t, err) - message := &mqttMocks.Message{} + message := &mocks.Message{} message.On("Payload").Return(payloadBytes) message.On("Topic").Return(tt.commandRequestTopic) - token := &mqttMocks.Token{} + token := &mocks.Token{} token.On("Wait").Return(true) token.On("Error").Return(nil) - mqttClient := &mqttMocks.Client{} + mqttClient := &mocks.Client{} mqttClient.On("Publish", mock.Anything, byte(0), true, mock.Anything).Return(token) - fn := commandRequestHandler(dic) + fn := commandRequestHandler(mockRouter, dic) fn(mqttClient, message) if tt.expectedError { if tt.expectedPublishError { diff --git a/internal/core/command/controller/messaging/internal.go b/internal/core/command/controller/messaging/internal.go index 1cb7e955d7..9490f83e4c 100644 --- a/internal/core/command/controller/messaging/internal.go +++ b/internal/core/command/controller/messaging/internal.go @@ -7,7 +7,6 @@ package messaging import ( "context" - "strings" bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v2/di" @@ -17,11 +16,10 @@ import ( "github.com/edgexfoundry/edgex-go/internal/core/command/container" ) -func SubscribeCommandResponses(ctx context.Context, dic *di.Container) errors.EdgeX { +func SubscribeCommandResponses(ctx context.Context, router MessagingRouter, dic *di.Container) errors.EdgeX { lc := bootstrapContainer.LoggingClientFrom(dic.Get) messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue internalResponseTopic := messageBusInfo.Internal.Topics[ResponseTopic] - externalResponseTopicPrefix := messageBusInfo.External.Topics[ResponseCommandTopicPrefix] messages := make(chan types.MessageEnvelope) messageErrors := make(chan error) @@ -47,25 +45,20 @@ func SubscribeCommandResponses(ctx context.Context, dic *di.Container) errors.Ed case <-ctx.Done(): lc.Infof("Exiting waiting for MessageBus '%s' topic messages", internalResponseTopic) return - case err := <-messageErrors: + case err = <-messageErrors: lc.Error(err.Error()) case msgEnvelope := <-messages: - lc.Debugf("Command response received on message queue. Topic: %s, Correlation-id: %s ", internalResponseTopic, msgEnvelope.CorrelationID) + lc.Debugf("Command response received on message queue. Topic: %s, Correlation-id: %s ", msgEnvelope.ReceivedTopic, msgEnvelope.CorrelationID) - // expected internal command response topic scheme: #//// - topicLevels := strings.Split(msgEnvelope.ReceivedTopic, "/") - length := len(topicLevels) - if length < 4 { - lc.Error("Failed to parse and construct command response topic scheme, expected request topic scheme: '#////'") + responseTopic, external, err := router.ResponseTopic(msgEnvelope.RequestID) + if err != nil { + lc.Errorf("Received RequestEnvelope with unknown RequestId %s", msgEnvelope.RequestID) continue } - // expected external command response topic scheme: #/// - deviceName := topicLevels[length-3] - commandName := topicLevels[length-2] - method := topicLevels[length-1] - externalResponseTopic := strings.Join([]string{externalResponseTopicPrefix, deviceName, commandName, method}, "/") - publishMessage(externalMQTT, externalResponseTopic, qos, retain, msgEnvelope, lc) + if external { + publishMessage(externalMQTT, responseTopic, qos, retain, msgEnvelope, lc) + } } } }() diff --git a/internal/core/command/controller/messaging/mocks/MessagingRouter.go b/internal/core/command/controller/messaging/mocks/MessagingRouter.go new file mode 100644 index 0000000000..e55831937f --- /dev/null +++ b/internal/core/command/controller/messaging/mocks/MessagingRouter.go @@ -0,0 +1,58 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// MessagingRouter is an autogenerated mock type for the MessagingRouter type +type MessagingRouter struct { + mock.Mock +} + +// ResponseTopic provides a mock function with given fields: requestId +func (_m *MessagingRouter) ResponseTopic(requestId string) (string, bool, error) { + ret := _m.Called(requestId) + + var r0 string + if rf, ok := ret.Get(0).(func(string) string); ok { + r0 = rf(requestId) + } else { + r0 = ret.Get(0).(string) + } + + var r1 bool + if rf, ok := ret.Get(1).(func(string) bool); ok { + r1 = rf(requestId) + } else { + r1 = ret.Get(1).(bool) + } + + var r2 error + if rf, ok := ret.Get(2).(func(string) error); ok { + r2 = rf(requestId) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// SetResponseTopic provides a mock function with given fields: requestId, topic, external +func (_m *MessagingRouter) SetResponseTopic(requestId string, topic string, external bool) { + _m.Called(requestId, topic, external) +} + +type mockConstructorTestingTNewMessagingRouter interface { + mock.TestingT + Cleanup(func()) +} + +// NewMessagingRouter creates a new instance of MessagingRouter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMessagingRouter(t mockConstructorTestingTNewMessagingRouter) *MessagingRouter { + mock := &MessagingRouter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/core/command/controller/messaging/router.go b/internal/core/command/controller/messaging/router.go new file mode 100644 index 0000000000..2f57c04888 --- /dev/null +++ b/internal/core/command/controller/messaging/router.go @@ -0,0 +1,65 @@ +// +// Copyright (C) 2022 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package messaging + +import ( + "errors" + "sync" +) + +// MessagingRouter defines interface for Command Service to know +// where to route the receiving device command response. +type MessagingRouter interface { + // ResponseTopic returns the responseTopicPrefix by requestId, and a boolean value + // indicates its original source(external MQTT or internal MessageBus). + ResponseTopic(requestId string) (string, bool, error) + // SetResponseTopic sets the responseTopicPrefix with RequestId as the key + SetResponseTopic(requestId string, topic string, external bool) +} + +func NewMessagingRouter() MessagingRouter { + return &router{ + internalCommandRequestMap: make(map[string]string), + externalCommandRequestMap: make(map[string]string), + } +} + +type router struct { + mutex sync.Mutex + internalCommandRequestMap map[string]string + externalCommandRequestMap map[string]string +} + +func (r *router) ResponseTopic(requestId string) (string, bool, error) { + r.mutex.Lock() + defer r.mutex.Unlock() + + topic, ok := r.externalCommandRequestMap[requestId] + if ok { + delete(r.externalCommandRequestMap, requestId) + return topic, true, nil + } + + topic, ok = r.internalCommandRequestMap[requestId] + if ok { + delete(r.internalCommandRequestMap, requestId) + return topic, false, nil + } + + return "", false, errors.New("requestId not found") +} + +func (r *router) SetResponseTopic(requestId string, topic string, external bool) { + r.mutex.Lock() + defer r.mutex.Unlock() + + if external { + r.externalCommandRequestMap[requestId] = topic + return + } + + r.internalCommandRequestMap[requestId] = topic +} diff --git a/internal/core/command/main.go b/internal/core/command/main.go index 39bee18522..d8300d42be 100644 --- a/internal/core/command/main.go +++ b/internal/core/command/main.go @@ -90,13 +90,14 @@ func Main(ctx context.Context, cancel context.CancelFunc, router *mux.Router) { func MessageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool { configuration := container.ConfigurationFrom(dic.Get) if configuration.MessageQueue.Required { + router := messaging.NewMessagingRouter() if !handlers.MessagingBootstrapHandler(ctx, wg, startupTimer, dic) { return false } - if !handlers.NewExternalMQTT(messaging.OnConnectHandler(dic)).BootstrapHandler(ctx, wg, startupTimer, dic) { + if !handlers.NewExternalMQTT(messaging.OnConnectHandler(router, dic)).BootstrapHandler(ctx, wg, startupTimer, dic) { return false } - if err := messaging.SubscribeCommandResponses(ctx, dic); err != nil { + if err := messaging.SubscribeCommandResponses(ctx, router, dic); err != nil { lc := bootstrapContainer.LoggingClientFrom(dic.Get) lc.Errorf("Failed to subscribe commands from message bus, %v", err) return false