From a657cc166d42a029b1a7c213c33c8d9dd53e4fcf Mon Sep 17 00:00:00 2001 From: Leonard Goodell Date: Tue, 24 Jan 2023 17:22:15 -0700 Subject: [PATCH] refactor!: Rework Core Commands via messaging to use new MessageBus Requst API BREAKING CHANGE: Topics configuration for Core Command has changed. Also the internal response topic is always edgex/response//. The prefix for this is now part of the standard MessageBus configuration. closes #4309 Signed-off-by: Leonard Goodell --- cmd/core-command/res/configuration.toml | 14 +- go.mod | 5 + go.sum | 4 - .../command/controller/messaging/external.go | 41 +-- .../controller/messaging/external_test.go | 38 +-- .../command/controller/messaging/internal.go | 283 +++++++++--------- .../controller/messaging/internal_test.go | 13 + .../messaging/mocks/MessagingRouter.go | 58 ---- .../command/controller/messaging/router.go | 65 ---- .../command/controller/messaging/utils.go | 13 +- internal/core/command/main.go | 20 +- 11 files changed, 227 insertions(+), 327 deletions(-) create mode 100644 internal/core/command/controller/messaging/internal_test.go delete mode 100644 internal/core/command/controller/messaging/mocks/MessagingRouter.go delete mode 100644 internal/core/command/controller/messaging/router.go diff --git a/cmd/core-command/res/configuration.toml b/cmd/core-command/res/configuration.toml index f244669714..ca2d887ea0 100644 --- a/cmd/core-command/res/configuration.toml +++ b/cmd/core-command/res/configuration.toml @@ -63,12 +63,10 @@ Port = 6379 AuthMode = "usernamepassword" # required for redis messagebus (secure or insecure). SecretName = "redisdb" [MessageBus.Topics] - DeviceRequestTopicPrefix = "edgex/device/command/request" # for publishing requests to the device service; /// will be added to this publish topic prefix - DeviceResponseTopic = "edgex/device/command/response/#" # for subscribing to device service responses - CommandRequestTopic = "edgex/core/command/request/#" # for subscribing to internal command requests - CommandResponseTopicPrefix = "edgex/core/command/response" # for publishing responses back to internal service /// will be added to this publish topic prefix - QueryRequestTopic = "edgex/core/commandquery/request/#" # for subscribing to internal command query requests - QueryResponseTopic = "edgex/core/commandquery/response" # for publishing reponsses back to internal service + DeviceCommandRequestTopicPrefix = "edgex/device/command/request" # for publishing requests to the device service; /// will be added to this publish topic prefix + CommandRequestTopic = "edgex/core/command/request/#" # for subscribing to internal command requests + CommandQueryRequestTopic = "edgex/core/commandquery/request/#" # for subscribing to internal command query requests + ResponseTopicPrefix="edgex/response" # for subscribing/publishing internal responses (used by MessageBus Request API) [MessageBus.Optional] # Default MQTT Specific options that need to be here to enable evnironment variable overrides of them ClientId ="core-command" @@ -103,5 +101,5 @@ AuthMode = "none" [ExternalMQTT.Topics] CommandRequestTopic = "edgex/command/request/#" # for subscribing to 3rd party command requests CommandResponseTopicPrefix = "edgex/command/response" # for publishing responses back to 3rd party systems /// will be added to this publish topic prefix - QueryRequestTopic = "edgex/commandquery/request/#" # for subscribing to 3rd party command query request - QueryResponseTopic = "edgex/commandquery/response" # for publishing responses back to 3rd party systems + CommandQueryRequestTopic = "edgex/commandquery/request/#" # for subscribing to 3rd party command query request + CommandQueryResponseTopic = "edgex/commandquery/response" # for publishing responses back to 3rd party systems diff --git a/go.mod b/go.mod index 88bd22d569..5b71476d69 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,10 @@ module github.com/edgexfoundry/edgex-go +replace ( + github.com/edgexfoundry/go-mod-bootstrap/v3 => ../MODS/go-mod-bootstrap + github.com/edgexfoundry/go-mod-messaging/v3 => ../MODS/go-mod-messaging +) + require ( bitbucket.org/bertimus9/systemstat v0.0.0-20180207000608-0eeff89b0690 github.com/eclipse/paho.mqtt.golang v1.4.2 diff --git a/go.sum b/go.sum index b4c149014b..c020b147de 100644 --- a/go.sum +++ b/go.sum @@ -28,14 +28,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= -github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.16 h1:MN6dOZHbYkW8JRDlEin/7T9nInOwpikZhthH0QaMksQ= -github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.16/go.mod h1:G2C3aUWZ96nZU03XRvCBntU13eUjXGIB9KqRKrhI8h8= github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2 h1:xp5MsP+qf/fuJxy8fT7k1N+c4j4C6w04qMCBXm6id7o= github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2/go.mod h1:1Vv4uWAo6r7k6jUlqVJW8JOL6YKVBc6sRL8Al3DrMck= github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.6 h1:RQFs/HjVOi1X3YxJ8sm4vuX8nhKgH0caSf9RtjQvdeI= github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.6/go.mod h1:7RwSq896VqelvSU7zYKs2tpZhgELVFECkiGf6XGLKfQ= -github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.4 h1:swPZOjoQ/IUIWSJpZCmQENtP/plFRx5tgiCEZgnfxFU= -github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.4/go.mod h1:8pxuYvh2zcq1GuKqmk1MAuH1yuN40iOMmL0g2myIfwk= github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3 h1:QgZF9f70Cwpvkjw3tP1aiVGHc+yNFJNzW6hO8pDs3fg= github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3/go.mod h1:2w8v0sv+i21nY+DY6JV4PFxsNTuxpGAjlNFlFMTfZkk= github.com/edgexfoundry/go-mod-secrets/v3 v3.0.0-dev.5 h1:tEo8BVH4OZuJ/q9ii1H4PdtxlXLh/kOKpRuWFTHOcBc= diff --git a/internal/core/command/controller/messaging/external.go b/internal/core/command/controller/messaging/external.go index fc47a72783..d1a0b5c6bc 100644 --- a/internal/core/command/controller/messaging/external.go +++ b/internal/core/command/controller/messaging/external.go @@ -1,5 +1,6 @@ // // Copyright (C) 2022 IOTech Ltd +// Copyright (C) 2023 Intel Inc. // // SPDX-License-Identifier: Apache-2.0 @@ -9,6 +10,7 @@ import ( "encoding/json" "fmt" "strings" + "time" mqtt "github.com/eclipse/paho.mqtt.golang" bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container" @@ -21,30 +23,29 @@ import ( ) const ( - QueryRequestTopic = "QueryRequestTopic" - QueryResponseTopic = "QueryResponseTopic" - CommandRequestTopic = "CommandRequestTopic" - CommandResponseTopicPrefix = "CommandResponseTopicPrefix" - DeviceRequestTopicPrefix = "DeviceRequestTopicPrefix" - DeviceResponseTopic = "DeviceResponseTopic" + CommandQueryRequestTopicKey = "CommandQueryRequestTopic" + CommandRequestTopicKey = "CommandRequestTopic" + DeviceCommandRequestTopicPrefixKey = "DeviceCommandRequestTopicPrefix" + ExternalCommandQueryResponseTopicKey = "CommandQueryResponseTopic" + ExternalCommandResponseTopicPrefixKey = "CommandResponseTopicPrefix" ) -func OnConnectHandler(router MessagingRouter, dic *di.Container) mqtt.OnConnectHandler { +func OnConnectHandler(requestTimeout time.Duration, dic *di.Container) mqtt.OnConnectHandler { return func(client mqtt.Client) { lc := bootstrapContainer.LoggingClientFrom(dic.Get) config := container.ConfigurationFrom(dic.Get) externalTopics := config.ExternalMQTT.Topics qos := config.ExternalMQTT.QoS - requestQueryTopic := externalTopics[QueryRequestTopic] + requestQueryTopic := externalTopics[CommandQueryRequestTopicKey] if token := client.Subscribe(requestQueryTopic, qos, commandQueryHandler(dic)); token.Wait() && token.Error() != nil { lc.Errorf("could not subscribe to topic '%s': %s", requestQueryTopic, token.Error().Error()) } else { lc.Debugf("Subscribed to topic '%s' on external MQTT broker", requestQueryTopic) } - requestCommandTopic := externalTopics[CommandRequestTopic] - if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(router, dic)); token.Wait() && token.Error() != nil { + requestCommandTopic := externalTopics[CommandRequestTopicKey] + if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(requestTimeout, dic)); token.Wait() && token.Error() != nil { lc.Errorf("could not subscribe to topic '%s': %s", requestCommandTopic, token.Error().Error()) } else { lc.Debugf("Subscribed to topic '%s' on external MQTT broker", requestCommandTopic) @@ -65,7 +66,7 @@ func commandQueryHandler(dic *di.Container) mqtt.MessageHandler { } externalMQTTInfo := container.ConfigurationFrom(dic.Get).ExternalMQTT - responseTopic := externalMQTTInfo.Topics[QueryResponseTopic] + responseTopic := externalMQTTInfo.Topics[ExternalCommandQueryResponseTopicKey] if responseTopic == "" { lc.Error("QueryResponseTopic not provided in External.Topics") lc.Warn("Not publishing error message back due to insufficient information on response topic") @@ -91,7 +92,7 @@ func commandQueryHandler(dic *di.Container) mqtt.MessageHandler { } } -func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.MessageHandler { +func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt.MessageHandler { return func(client mqtt.Client, message mqtt.Message) { lc := bootstrapContainer.LoggingClientFrom(dic.Get) lc.Debugf("Received command request from external message broker on topic '%s' with %d bytes", message.Topic(), len(message.Payload())) @@ -124,10 +125,11 @@ func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.Messa lc.Warn("Not publishing error message back due to insufficient information on response topic") return } - externalResponseTopic := strings.Join([]string{externalMQTTInfo.Topics[CommandResponseTopicPrefix], deviceName, commandName, method}, "/") + + externalResponseTopic := strings.Join([]string{externalMQTTInfo.Topics[ExternalCommandResponseTopicPrefixKey], deviceName, commandName, method}, "/") internalMessageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus - deviceRequestTopic, err := validateRequestTopic(internalMessageBusInfo.Topics[DeviceRequestTopicPrefix], deviceName, commandName, method, dic) + deviceServiceName, deviceRequestTopic, err := validateRequestTopic(internalMessageBusInfo.Topics[DeviceCommandRequestTopicPrefixKey], deviceName, commandName, method, dic) if err != nil { responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc) @@ -142,7 +144,11 @@ func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.Messa } internalMessageBus := bootstrapContainer.MessagingClientFrom(dic.Get) - err = internalMessageBus.Publish(requestEnvelope, deviceRequestTopic) + + lc.Debugf("Sending Command request to internal MessageBus. Topic: %s, Request-id: %s Correlation-id: %s", deviceRequestTopic, requestEnvelope.RequestID, requestEnvelope.CorrelationID) + + // Request waits for the response and returns it. + response, err := internalMessageBus.Request(requestEnvelope, deviceServiceName, deviceRequestTopic, requestTimeout) if err != nil { errorMessage := fmt.Sprintf("Failed to send DeviceCommand request with internal MessageBus: %v", err) responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage) @@ -150,8 +156,9 @@ func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.Messa return } - lc.Debugf("Command request sent to internal MessageBus. Topic: %s, Correlation-id: %s", deviceRequestTopic, requestEnvelope.CorrelationID) - router.SetResponseTopic(requestEnvelope.RequestID, externalResponseTopic, true) + lc.Debugf("Command response received from internal MessageBus. Topic: %s, Request-id: %s Correlation-id: %s", response.RequestID, response.CorrelationID) + + publishMessage(client, externalResponseTopic, qos, retain, *response, lc) } } diff --git a/internal/core/command/controller/messaging/external_test.go b/internal/core/command/controller/messaging/external_test.go index 0541818a53..7fc599f49f 100644 --- a/internal/core/command/controller/messaging/external_test.go +++ b/internal/core/command/controller/messaging/external_test.go @@ -1,5 +1,6 @@ // // Copyright (C) 2022-2023 IOTech Ltd +// Copyright (C) 2023 Intel Inc. // // SPDX-License-Identifier: Apache-2.0 @@ -12,6 +13,7 @@ import ( "net/http" "strings" "testing" + "time" clientMocks "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/interfaces/mocks" lcMocks "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger/mocks" @@ -53,12 +55,10 @@ const ( testExternalCommandRequestTopic = "unittest/external/request/#" testExternalCommandRequestTopicExample = "unittest/external/request/testDevice/testCommand/get" testExternalCommandResponseTopicPrefix = "unittest/external/response" - - testInternalCommandRequestTopicPrefix = "unittest/internal/request" + testInternalCommandRequestTopicPrefix = "unittest/internal/request" ) func TestOnConnectHandler(t *testing.T) { - mockRouter := &mocks.MessagingRouter{} lc := &lcMocks.LoggingClient{} lc.On("Errorf", mock.Anything, mock.Anything, mock.Anything).Return(nil) lc.On("Debugf", mock.Anything, mock.Anything).Return(nil) @@ -67,10 +67,10 @@ func TestOnConnectHandler(t *testing.T) { return &config.ConfigurationStruct{ ExternalMQTT: bootstrapConfig.ExternalMQTTInfo{ Topics: map[string]string{ - QueryRequestTopic: testQueryRequestTopic, - QueryResponseTopic: testQueryResponseTopic, - CommandRequestTopic: testExternalCommandRequestTopic, - CommandResponseTopicPrefix: testExternalCommandResponseTopicPrefix, + CommandRequestTopicKey: testExternalCommandRequestTopic, + ExternalCommandResponseTopicPrefixKey: testExternalCommandResponseTopicPrefix, + CommandQueryRequestTopicKey: testQueryRequestTopic, + ExternalCommandQueryResponseTopicKey: testQueryResponseTopic, }, QoS: 0, Retain: true, @@ -103,7 +103,7 @@ func TestOnConnectHandler(t *testing.T) { client.On("Subscribe", testQueryRequestTopic, byte(0), mock.Anything).Return(token) client.On("Subscribe", testExternalCommandRequestTopic, byte(0), mock.Anything).Return(token) - fn := OnConnectHandler(mockRouter, dic) + fn := OnConnectHandler(time.Second*10, dic) fn(client) if tt.expectedSucceed { @@ -124,7 +124,7 @@ func Test_commandQueryHandler(t *testing.T) { Name: testProfileName, }, DeviceResources: []dtos.DeviceResource{ - dtos.DeviceResource{ + { Name: testResourceName, Properties: dtos.ResourceProperties{ ValueType: common.ValueTypeString, @@ -144,7 +144,7 @@ func Test_commandQueryHandler(t *testing.T) { allDevicesResponse := responses.MultiDevicesResponse{ BaseWithTotalCountResponse: commonDTO.NewBaseWithTotalCountResponse("", "", http.StatusOK, 1), Devices: []dtos.Device{ - dtos.Device{ + { Name: testDeviceName, ProfileName: testProfileName, }, @@ -173,7 +173,7 @@ func Test_commandQueryHandler(t *testing.T) { QoS: 0, Retain: true, Topics: map[string]string{ - QueryResponseTopic: testQueryResponseTopic, + ExternalCommandQueryResponseTopicKey: testQueryResponseTopic, }, }, } @@ -269,12 +269,12 @@ func Test_commandRequestHandler(t *testing.T) { }, } - mockRouter := &mocks.MessagingRouter{} - mockRouter.On("SetResponseTopic", mock.Anything, mock.Anything, mock.Anything).Return(nil) + expectedResponse := &types.MessageEnvelope{} + lc := &lcMocks.LoggingClient{} lc.On("Error", mock.Anything).Return(nil) lc.On("Errorf", mock.Anything, mock.Anything).Return(nil) - lc.On("Debugf", mock.Anything, mock.Anything, mock.Anything).Return(nil) + lc.On("Debugf", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) lc.On("Warn", mock.Anything).Return(nil) dc := &clientMocks.DeviceClient{} dc.On("DeviceByName", context.Background(), testDeviceName).Return(deviceResponse, nil) @@ -284,7 +284,7 @@ func Test_commandRequestHandler(t *testing.T) { dsc.On("DeviceServiceByName", context.Background(), testDeviceServiceName).Return(deviceServiceResponse, nil) dsc.On("DeviceServiceByName", context.Background(), unknownService).Return(responses.DeviceServiceResponse{}, edgexErr.NewCommonEdgeX(edgexErr.KindEntityDoesNotExist, "unknown device service", nil)) client := &internalMessagingMocks.MessageClient{} - client.On("Publish", mock.Anything, mock.Anything).Return(nil) + client.On("Request", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedResponse, nil) dic := di.NewContainer(di.ServiceConstructorMap{ container.ConfigurationName: func(get di.Get) interface{} { return &config.ConfigurationStruct{ @@ -295,14 +295,14 @@ func Test_commandRequestHandler(t *testing.T) { }, MessageBus: bootstrapConfig.MessageBusInfo{ Topics: map[string]string{ - DeviceRequestTopicPrefix: testInternalCommandRequestTopicPrefix, + DeviceCommandRequestTopicPrefixKey: testInternalCommandRequestTopicPrefix, }, }, ExternalMQTT: bootstrapConfig.ExternalMQTTInfo{ QoS: 0, Retain: true, Topics: map[string]string{ - CommandResponseTopicPrefix: testExternalCommandResponseTopicPrefix, + ExternalCommandResponseTopicPrefixKey: testExternalCommandResponseTopicPrefix, }, }, } @@ -359,7 +359,7 @@ func Test_commandRequestHandler(t *testing.T) { mqttClient := &mocks.Client{} mqttClient.On("Publish", mock.Anything, byte(0), true, mock.Anything).Return(token) - fn := commandRequestHandler(mockRouter, dic) + fn := commandRequestHandler(time.Second*10, dic) fn(mqttClient, message) if tt.expectedError { if tt.expectedPublishError { @@ -372,7 +372,7 @@ func Test_commandRequestHandler(t *testing.T) { } expectedInternalRequestTopic := strings.Join([]string{testInternalCommandRequestTopicPrefix, testDeviceServiceName, testDeviceName, testCommandName, testMethod}, "/") - client.AssertCalled(t, "Publish", tt.payload, expectedInternalRequestTopic) + client.AssertCalled(t, "Request", tt.payload, testDeviceServiceName, expectedInternalRequestTopic, mock.Anything) }) } } diff --git a/internal/core/command/controller/messaging/internal.go b/internal/core/command/controller/messaging/internal.go index 2f8a590a5a..1d4b554878 100644 --- a/internal/core/command/controller/messaging/internal.go +++ b/internal/core/command/controller/messaging/internal.go @@ -1,5 +1,6 @@ // // Copyright (C) 2022-2023 IOTech Ltd +// Copyright (C) 2023 Intel Inc. // // SPDX-License-Identifier: Apache-2.0 @@ -7,10 +8,15 @@ package messaging import ( "context" + "fmt" "strings" + "time" + bootstrapMessaging "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/messaging" + "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger" "github.com/edgexfoundry/go-mod-core-contracts/v3/common" "github.com/edgexfoundry/go-mod-core-contracts/v3/errors" + "github.com/edgexfoundry/go-mod-messaging/v3/messaging" bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v3/di" @@ -20,17 +26,18 @@ import ( "github.com/edgexfoundry/edgex-go/internal/core/command/container" ) -// SubscribeCommandResponses subscribes command responses from device services via internal MessageBus -func SubscribeCommandResponses(ctx context.Context, router MessagingRouter, dic *di.Container) errors.EdgeX { +// SubscribeCommandRequests subscribes command requests from EdgeX service (e.g., Application Service) +// and forwards them to the appropriate Device Service via internal MessageBus +func SubscribeCommandRequests(ctx context.Context, requestTimeout time.Duration, dic *di.Container) errors.EdgeX { lc := bootstrapContainer.LoggingClientFrom(dic.Get) - internalMessageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus - internalResponseTopic := internalMessageBusInfo.Topics[DeviceResponseTopic] + messageBusTopics := container.ConfigurationFrom(dic.Get).MessageBus.Topics + requestCommandTopic := messageBusTopics[CommandRequestTopicKey] messages := make(chan types.MessageEnvelope) messageErrors := make(chan error) topics := []types.TopicChannel{ { - Topic: internalResponseTopic, + Topic: requestCommandTopic, Messages: messages, }, } @@ -41,40 +48,16 @@ func SubscribeCommandResponses(ctx context.Context, router MessagingRouter, dic return errors.NewCommonEdgeXWrapper(err) } - externalMQTTInfo := container.ConfigurationFrom(dic.Get).ExternalMQTT - qos := externalMQTTInfo.QoS - retain := externalMQTTInfo.Retain - externalMQTT := bootstrapContainer.ExternalMQTTMessagingClientFrom(dic.Get) go func() { for { select { case <-ctx.Done(): - lc.Infof("Exiting waiting for MessageBus '%s' topic messages", internalResponseTopic) + lc.Infof("Exiting waiting for MessageBus '%s' topic messages", requestCommandTopic) return case err = <-messageErrors: lc.Error(err.Error()) - case msgEnvelope := <-messages: - lc.Debugf("Command response received on internal MessageBus. Topic: %s, Correlation-id: %s", msgEnvelope.ReceivedTopic, msgEnvelope.CorrelationID) - - responseTopic, external, err := router.ResponseTopic(msgEnvelope.RequestID) - if err != nil { - lc.Errorf("Received RequestEnvelope with unknown RequestId %s", msgEnvelope.RequestID) - continue - } - - // original request is from external MQTT - if external { - publishMessage(externalMQTT, responseTopic, qos, retain, msgEnvelope, lc) - continue - } - - // original request is from internal MessageBus - err = messageBus.Publish(msgEnvelope, responseTopic) - if err != nil { - lc.Errorf("Could not publish to internal MessageBus topic '%s': %s", responseTopic, err.Error()) - continue - } - lc.Debugf("Command response sent to internal MessageBus. Topic: %s, Correlation-id: %s", responseTopic, msgEnvelope.CorrelationID) + case requestEnvelope := <-messages: + processDeviceCommandRequest(messageBus, requestEnvelope, messageBusTopics, requestTimeout, lc, dic) } } }() @@ -82,111 +65,108 @@ func SubscribeCommandResponses(ctx context.Context, router MessagingRouter, dic return nil } -// SubscribeCommandRequests subscribes command requests from EdgeX service (e.g., Application Service) -// via internal MessageBus -func SubscribeCommandRequests(ctx context.Context, router MessagingRouter, dic *di.Container) errors.EdgeX { - lc := bootstrapContainer.LoggingClientFrom(dic.Get) - internalMessageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus - internalRequestCommandTopic := internalMessageBusInfo.Topics[CommandRequestTopic] +func processDeviceCommandRequest( + messageBus messaging.MessageClient, + requestEnvelope types.MessageEnvelope, + messageBusTopics map[string]string, + requestTimeout time.Duration, + lc logger.LoggingClient, + dic *di.Container) { + var err error + + lc.Debugf("Command device request received on internal MessageBus. Topic: %s, Request-id: %s, Correlation-id: %s", requestEnvelope.ReceivedTopic, requestEnvelope.RequestID, requestEnvelope.CorrelationID) + + if len(strings.TrimSpace(requestEnvelope.RequestID)) == 0 { + lc.Errorf("RequestId not set in Command request received on internal MessageBus") + lc.Warn("Not publishing error message back due to insufficient information to publish on response topic") + return + } - messages := make(chan types.MessageEnvelope) - messageErrors := make(chan error) - topics := []types.TopicChannel{ - { - Topic: internalRequestCommandTopic, - Messages: messages, - }, + // internal response topic scheme: // + internalResponseTopic := strings.Join([]string{messageBusTopics[bootstrapMessaging.ResponseTopicPrefixKey], common.CoreCommandServiceKey, requestEnvelope.RequestID}, "/") + + topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/") + length := len(topicLevels) + if length < 3 { + err = fmt.Errorf("invalid internal command request topic scheme. Expected request topic scheme with >=3 levels: '///'") + lc.Error(err.Error()) + responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) + err = messageBus.Publish(responseEnvelope, internalResponseTopic) + if err != nil { + lc.Errorf("Could not publish to topic '%s': %s", internalResponseTopic, err.Error()) + } + return } - messageBus := bootstrapContainer.MessagingClientFrom(dic.Get) - err := messageBus.Subscribe(topics, messageErrors) + // expected internal command request/response topic scheme: #/// + deviceName := topicLevels[length-3] + commandName := topicLevels[length-2] + method := topicLevels[length-1] + if !strings.EqualFold(method, "get") && !strings.EqualFold(method, "set") { + err = fmt.Errorf("unknown request method: %s, only 'get' or 'set' is allowed", method) + lc.Error(err.Error()) + responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) + err = messageBus.Publish(responseEnvelope, internalResponseTopic) + if err != nil { + lc.Errorf("Could not publish to topic '%s': %s", internalResponseTopic, err.Error()) + } + return + } + + // internal command request topic scheme: //// + deviceServiceName, deviceRequestTopic, err := validateRequestTopic(messageBusTopics[DeviceCommandRequestTopicPrefixKey], deviceName, commandName, method, dic) if err != nil { - return errors.NewCommonEdgeXWrapper(err) + err = fmt.Errorf("invalid request topic: %s", err.Error()) + lc.Error(err.Error()) + responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) + err = messageBus.Publish(responseEnvelope, internalResponseTopic) + if err != nil { + lc.Errorf("Could not publish to topic '%s': %s", internalResponseTopic, err.Error()) + } + return } - go func() { - for { - select { - case <-ctx.Done(): - lc.Infof("Exiting waiting for MessageBus '%s' topic messages", internalRequestCommandTopic) - return - case err = <-messageErrors: - lc.Error(err.Error()) - case requestEnvelope := <-messages: - lc.Debugf("Command request received on internal MessageBus. Topic: %s, Correlation-id: %s", requestEnvelope.ReceivedTopic, requestEnvelope.CorrelationID) - - topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/") - length := len(topicLevels) - if length < 3 { - lc.Error("Failed to parse and construct internal command response topic scheme, expected request topic scheme: '#///'") - lc.Warn("Not publishing error message back due to insufficient information on response topic") - continue - } - - // expected internal command request/response topic scheme: #/// - deviceName := topicLevels[length-3] - commandName := topicLevels[length-2] - method := topicLevels[length-1] - if !strings.EqualFold(method, "get") && !strings.EqualFold(method, "set") { - lc.Errorf("Unknown request method: %s, only 'get' or 'set' is allowed", method) - lc.Warn("Not publishing error message back due to insufficient information on response topic") - continue - } - internalResponseTopic := strings.Join([]string{internalMessageBusInfo.Topics[CommandResponseTopicPrefix], deviceName, commandName, method}, "/") - - deviceRequestTopic, err := validateRequestTopic(internalMessageBusInfo.Topics[DeviceRequestTopicPrefix], deviceName, commandName, method, dic) - if err != nil { - lc.Errorf("invalid request topic: %s", err.Error()) - responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) - err = messageBus.Publish(responseEnvelope, internalResponseTopic) - if err != nil { - lc.Errorf("Could not publish to topic '%s': %s", internalResponseTopic, err.Error()) - } - - continue - } - - err = validateGetCommandQueryParameters(requestEnvelope.QueryParams) - if err != nil { - lc.Errorf(err.Error()) - responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) - err = messageBus.Publish(responseEnvelope, internalResponseTopic) - if err != nil { - lc.Errorf("Could not publish to topic '%s': %s", internalResponseTopic, err.Error()) - } - - continue - } - - // expected internal command request topic scheme: #//// - err = messageBus.Publish(requestEnvelope, deviceRequestTopic) - if err != nil { - lc.Errorf("Could not publish to topic '%s': %s", deviceRequestTopic, err.Error()) - continue - } - - lc.Debugf("Command request sent to internal MessageBus. Topic: %s, Correlation-id: %s", deviceRequestTopic, requestEnvelope.CorrelationID) - router.SetResponseTopic(requestEnvelope.RequestID, internalResponseTopic, false) - } + err = validateGetCommandQueryParameters(requestEnvelope.QueryParams) + if err != nil { + lc.Errorf(err.Error()) + responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) + err = messageBus.Publish(responseEnvelope, internalResponseTopic) + if err != nil { + lc.Errorf("Could not publish to topic '%s': %s", internalResponseTopic, err.Error()) } - }() + return + } - return nil + lc.Debugf("Sending Command Device Request to internal MessageBus. Topic: %s, Correlation-id: %s", deviceRequestTopic, requestEnvelope.CorrelationID) + + response, err := messageBus.Request(requestEnvelope, deviceServiceName, deviceRequestTopic, requestTimeout) + if err != nil { + lc.Errorf("Request to topic '%s' failed: %s", deviceRequestTopic, err.Error()) + return + } + + // original request is from internal MessageBus + err = messageBus.Publish(*response, internalResponseTopic) + if err != nil { + lc.Errorf("Could not publish to internal MessageBus topic '%s': %s", internalResponseTopic, err.Error()) + return + } + + lc.Debugf("Command response sent to internal MessageBus. Topic: %s, Correlation-id: %s", internalResponseTopic, response.CorrelationID) } // SubscribeCommandQueryRequests subscribes command query requests from EdgeX service (e.g., Application Service) // via internal MessageBus func SubscribeCommandQueryRequests(ctx context.Context, dic *di.Container) errors.EdgeX { lc := bootstrapContainer.LoggingClientFrom(dic.Get) - internalMessageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus - internalQueryRequestTopic := internalMessageBusInfo.Topics[QueryRequestTopic] - internalQueryResponseTopic := internalMessageBusInfo.Topics[QueryResponseTopic] + messageBusTopics := container.ConfigurationFrom(dic.Get).MessageBus.Topics + queryRequestTopic := messageBusTopics[CommandQueryRequestTopicKey] messages := make(chan types.MessageEnvelope) messageErrors := make(chan error) topics := []types.TopicChannel{ { - Topic: internalQueryRequestTopic, + Topic: queryRequestTopic, Messages: messages, }, } @@ -201,37 +181,56 @@ func SubscribeCommandQueryRequests(ctx context.Context, dic *di.Container) error for { select { case <-ctx.Done(): - lc.Infof("Exiting waiting for MessageBus '%s' topic messages", internalQueryRequestTopic) + lc.Infof("Exiting waiting for MessageBus '%s' topic messages", queryRequestTopic) return case err = <-messageErrors: lc.Error(err.Error()) case requestEnvelope := <-messages: - lc.Debugf("Command query request received on internal MessageBus. Topic: %s, Correlation-id: %s", requestEnvelope.ReceivedTopic, requestEnvelope.CorrelationID) - - // example topic scheme: /commandquery/request/ - // deviceName is expected to be at last topic level. - topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/") - deviceName := topicLevels[len(topicLevels)-1] - if strings.EqualFold(deviceName, common.All) { - deviceName = common.All - } - - responseEnvelope, err := getCommandQueryResponseEnvelope(requestEnvelope, deviceName, dic) - if err != nil { - lc.Error(err.Error()) - responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) - } - - err = messageBus.Publish(responseEnvelope, internalQueryResponseTopic) - if err != nil { - lc.Errorf("Could not publish to topic '%s': %s", internalQueryResponseTopic, err.Error()) - continue - } - - lc.Debugf("Command query response sent to internal MessageBus. Topic: %s, Correlation-id: %s", internalQueryResponseTopic, requestEnvelope.CorrelationID) + processCommandQueryRequest(messageBus, requestEnvelope, messageBusTopics, lc, dic) } } }() return nil } + +func processCommandQueryRequest( + messageBus messaging.MessageClient, + requestEnvelope types.MessageEnvelope, + messageBusTopics map[string]string, + lc logger.LoggingClient, + dic *di.Container, +) { + lc.Debugf("Command query request received on internal MessageBus. Topic: %s, Request-id: %s, Correlation-id: %s", requestEnvelope.ReceivedTopic, requestEnvelope.RequestID, requestEnvelope.CorrelationID) + + if len(strings.TrimSpace(requestEnvelope.RequestID)) == 0 { + lc.Errorf("RequestId not set in Command request received on internal MessageBus") + lc.Warn("Not publishing error message back due to insufficient information to publish on response topic") + return + } + + // internal response topic scheme: // + internalQueryResponseTopic := strings.Join([]string{messageBusTopics[bootstrapMessaging.ResponseTopicPrefixKey], common.CoreCommandServiceKey, requestEnvelope.RequestID}, "/") + + // example topic scheme: /commandquery/request/ + // deviceName is expected to be at last topic level. + topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/") + deviceName := topicLevels[len(topicLevels)-1] + if strings.EqualFold(deviceName, common.All) { + deviceName = common.All + } + + responseEnvelope, err := getCommandQueryResponseEnvelope(requestEnvelope, deviceName, dic) + if err != nil { + lc.Error(err.Error()) + responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) + } + + err = messageBus.Publish(responseEnvelope, internalQueryResponseTopic) + if err != nil { + lc.Errorf("Could not publish to topic '%s': %s", internalQueryResponseTopic, err.Error()) + return + } + + lc.Debugf("Command query response sent to internal MessageBus. Topic: %s, Correlation-id: %s", internalQueryResponseTopic, requestEnvelope.CorrelationID) +} diff --git a/internal/core/command/controller/messaging/internal_test.go b/internal/core/command/controller/messaging/internal_test.go new file mode 100644 index 0000000000..d8e6165ac5 --- /dev/null +++ b/internal/core/command/controller/messaging/internal_test.go @@ -0,0 +1,13 @@ +// Copyright (C) 2023 Intel Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +package messaging + +import "testing" + +func TestSubscribeCommandRequests(t *testing.T) { +} + +func TestSubscribeCommandQueryRequests(t *testing.T) { +} diff --git a/internal/core/command/controller/messaging/mocks/MessagingRouter.go b/internal/core/command/controller/messaging/mocks/MessagingRouter.go deleted file mode 100644 index e55831937f..0000000000 --- a/internal/core/command/controller/messaging/mocks/MessagingRouter.go +++ /dev/null @@ -1,58 +0,0 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. - -package mocks - -import mock "github.com/stretchr/testify/mock" - -// MessagingRouter is an autogenerated mock type for the MessagingRouter type -type MessagingRouter struct { - mock.Mock -} - -// ResponseTopic provides a mock function with given fields: requestId -func (_m *MessagingRouter) ResponseTopic(requestId string) (string, bool, error) { - ret := _m.Called(requestId) - - var r0 string - if rf, ok := ret.Get(0).(func(string) string); ok { - r0 = rf(requestId) - } else { - r0 = ret.Get(0).(string) - } - - var r1 bool - if rf, ok := ret.Get(1).(func(string) bool); ok { - r1 = rf(requestId) - } else { - r1 = ret.Get(1).(bool) - } - - var r2 error - if rf, ok := ret.Get(2).(func(string) error); ok { - r2 = rf(requestId) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - -// SetResponseTopic provides a mock function with given fields: requestId, topic, external -func (_m *MessagingRouter) SetResponseTopic(requestId string, topic string, external bool) { - _m.Called(requestId, topic, external) -} - -type mockConstructorTestingTNewMessagingRouter interface { - mock.TestingT - Cleanup(func()) -} - -// NewMessagingRouter creates a new instance of MessagingRouter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMessagingRouter(t mockConstructorTestingTNewMessagingRouter) *MessagingRouter { - mock := &MessagingRouter{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/internal/core/command/controller/messaging/router.go b/internal/core/command/controller/messaging/router.go deleted file mode 100644 index 2f57c04888..0000000000 --- a/internal/core/command/controller/messaging/router.go +++ /dev/null @@ -1,65 +0,0 @@ -// -// Copyright (C) 2022 IOTech Ltd -// -// SPDX-License-Identifier: Apache-2.0 - -package messaging - -import ( - "errors" - "sync" -) - -// MessagingRouter defines interface for Command Service to know -// where to route the receiving device command response. -type MessagingRouter interface { - // ResponseTopic returns the responseTopicPrefix by requestId, and a boolean value - // indicates its original source(external MQTT or internal MessageBus). - ResponseTopic(requestId string) (string, bool, error) - // SetResponseTopic sets the responseTopicPrefix with RequestId as the key - SetResponseTopic(requestId string, topic string, external bool) -} - -func NewMessagingRouter() MessagingRouter { - return &router{ - internalCommandRequestMap: make(map[string]string), - externalCommandRequestMap: make(map[string]string), - } -} - -type router struct { - mutex sync.Mutex - internalCommandRequestMap map[string]string - externalCommandRequestMap map[string]string -} - -func (r *router) ResponseTopic(requestId string) (string, bool, error) { - r.mutex.Lock() - defer r.mutex.Unlock() - - topic, ok := r.externalCommandRequestMap[requestId] - if ok { - delete(r.externalCommandRequestMap, requestId) - return topic, true, nil - } - - topic, ok = r.internalCommandRequestMap[requestId] - if ok { - delete(r.internalCommandRequestMap, requestId) - return topic, false, nil - } - - return "", false, errors.New("requestId not found") -} - -func (r *router) SetResponseTopic(requestId string, topic string, external bool) { - r.mutex.Lock() - defer r.mutex.Unlock() - - if external { - r.externalCommandRequestMap[requestId] = topic - return - } - - r.internalCommandRequestMap[requestId] = topic -} diff --git a/internal/core/command/controller/messaging/utils.go b/internal/core/command/controller/messaging/utils.go index 03fa82da96..5372975e8b 100644 --- a/internal/core/command/controller/messaging/utils.go +++ b/internal/core/command/controller/messaging/utils.go @@ -1,5 +1,6 @@ // // Copyright (C) 2022 IOTech Ltd +// Copyright (C) 2023 Intel Inc. // // SPDX-License-Identifier: Apache-2.0 @@ -25,29 +26,29 @@ import ( // validateRequestTopic validates the request topic by checking the existence of device and device service, // returns the internal device request topic to which the command request will be sent. -func validateRequestTopic(prefix string, deviceName string, commandName string, method string, dic *di.Container) (string, error) { +func validateRequestTopic(prefix string, deviceName string, commandName string, method string, dic *di.Container) (string, string, error) { // retrieve device information through Metadata DeviceClient dc := bootstrapContainer.DeviceClientFrom(dic.Get) if dc == nil { - return "", errors.New("nil Device Client") + return "", "", errors.New("nil Device Client") } deviceResponse, err := dc.DeviceByName(context.Background(), deviceName) if err != nil { - return "", fmt.Errorf("failed to get Device by name %s: %v", deviceName, err) + return "", "", fmt.Errorf("failed to get Device by name %s: %v", deviceName, err) } // retrieve device service information through Metadata DeviceClient dsc := bootstrapContainer.DeviceServiceClientFrom(dic.Get) if dsc == nil { - return "", errors.New("nil DeviceService Client") + return "", "", errors.New("nil DeviceService Client") } deviceServiceResponse, err := dsc.DeviceServiceByName(context.Background(), deviceResponse.Device.ServiceName) if err != nil { - return "", fmt.Errorf("failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err) + return "", "", fmt.Errorf("failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err) } // expected internal command request topic scheme: #//// - return strings.Join([]string{prefix, deviceServiceResponse.Service.Name, deviceName, commandName, method}, "/"), nil + return deviceServiceResponse.Service.Name, strings.Join([]string{prefix, deviceServiceResponse.Service.Name, deviceName, commandName, method}, "/"), nil } diff --git a/internal/core/command/main.go b/internal/core/command/main.go index 0b7827817a..66a3f38de1 100644 --- a/internal/core/command/main.go +++ b/internal/core/command/main.go @@ -1,6 +1,7 @@ /******************************************************************************* * Copyright 2020 Dell Inc. * Copyright 2022-2023 IOTech Ltd. + * Copyright 2023 Intel Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -19,6 +20,7 @@ import ( "context" "os" "sync" + "time" "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap" bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container" @@ -42,7 +44,7 @@ import ( func Main(ctx context.Context, cancel context.CancelFunc, router *mux.Router) { startupTimer := startup.NewStartUpTimer(common.CoreCommandServiceKey) - // All common command-line flags have been moved to DefaultCommonFlags. Service specific flags can be add here, + // All common command-line flags have been moved to DefaultCommonFlags. Service specific flags can be added here, // by inserting service specific flag prior to call to commonFlags.Parse(). // Example: // flags.FlagSet.StringVar(&myvar, "m", "", "Specify a ....") @@ -88,10 +90,15 @@ func Main(ctx context.Context, cancel context.CancelFunc, router *mux.Router) { func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool { lc := bootstrapContainer.LoggingClientFrom(dic.Get) configuration := container.ConfigurationFrom(dic.Get) - router := messaging.NewMessagingRouter() + + requestTimeout, err := time.ParseDuration(configuration.Service.RequestTimeout) + if err != nil { + lc.Errorf("Failed to parse Service.RequestTimeout configuration value: %v", err) + return false + } if configuration.ExternalMQTT.Enabled { - if !handlers.NewExternalMQTT(messaging.OnConnectHandler(router, dic)).BootstrapHandler(ctx, wg, startupTimer, dic) { + if !handlers.NewExternalMQTT(messaging.OnConnectHandler(requestTimeout, dic)).BootstrapHandler(ctx, wg, startupTimer, dic) { return false } } @@ -99,14 +106,11 @@ func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupT if !handlers.MessagingBootstrapHandler(ctx, wg, startupTimer, dic) { return false } - if err := messaging.SubscribeCommandRequests(ctx, router, dic); err != nil { + if err := messaging.SubscribeCommandRequests(ctx, requestTimeout, dic); err != nil { lc.Errorf("Failed to subscribe commands request from internal message bus, %v", err) return false } - if err := messaging.SubscribeCommandResponses(ctx, router, dic); err != nil { - lc.Errorf("Failed to subscribe commands response from internal message bus, %v", err) - return false - } + if err := messaging.SubscribeCommandQueryRequests(ctx, dic); err != nil { lc.Errorf("Failed to subscribe command query request from internal message bus, %v", err) return false