Skip to content

Commit

Permalink
refactor!: Rework Core Commands via messaging to use new MessageBus R…
Browse files Browse the repository at this point in the history
…equst API

BREAKING CHANGE: Topics configuration for Core Command has changed. Also the internal response topic is always edgex/response/<responding-service-name>/<request-id>. The prefix for this is now part of the standard MessageBus configuration.

closes edgexfoundry#4309

Signed-off-by: Leonard Goodell <[email protected]>
  • Loading branch information
Leonard Goodell committed Jan 25, 2023
1 parent 228e514 commit a657cc1
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 327 deletions.
14 changes: 6 additions & 8 deletions cmd/core-command/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ Port = 6379
AuthMode = "usernamepassword" # required for redis messagebus (secure or insecure).
SecretName = "redisdb"
[MessageBus.Topics]
DeviceRequestTopicPrefix = "edgex/device/command/request" # for publishing requests to the device service; <device-service>/<device-name>/<command-name>/<method> will be added to this publish topic prefix
DeviceResponseTopic = "edgex/device/command/response/#" # for subscribing to device service responses
CommandRequestTopic = "edgex/core/command/request/#" # for subscribing to internal command requests
CommandResponseTopicPrefix = "edgex/core/command/response" # for publishing responses back to internal service /<device-name>/<command-name>/<method> will be added to this publish topic prefix
QueryRequestTopic = "edgex/core/commandquery/request/#" # for subscribing to internal command query requests
QueryResponseTopic = "edgex/core/commandquery/response" # for publishing reponsses back to internal service
DeviceCommandRequestTopicPrefix = "edgex/device/command/request" # for publishing requests to the device service; <device-service>/<device-name>/<command-name>/<method> will be added to this publish topic prefix
CommandRequestTopic = "edgex/core/command/request/#" # for subscribing to internal command requests
CommandQueryRequestTopic = "edgex/core/commandquery/request/#" # for subscribing to internal command query requests
ResponseTopicPrefix="edgex/response" # for subscribing/publishing internal responses (used by MessageBus Request API)
[MessageBus.Optional]
# Default MQTT Specific options that need to be here to enable evnironment variable overrides of them
ClientId ="core-command"
Expand Down Expand Up @@ -103,5 +101,5 @@ AuthMode = "none"
[ExternalMQTT.Topics]
CommandRequestTopic = "edgex/command/request/#" # for subscribing to 3rd party command requests
CommandResponseTopicPrefix = "edgex/command/response" # for publishing responses back to 3rd party systems /<device-name>/<command-name>/<method> will be added to this publish topic prefix
QueryRequestTopic = "edgex/commandquery/request/#" # for subscribing to 3rd party command query request
QueryResponseTopic = "edgex/commandquery/response" # for publishing responses back to 3rd party systems
CommandQueryRequestTopic = "edgex/commandquery/request/#" # for subscribing to 3rd party command query request
CommandQueryResponseTopic = "edgex/commandquery/response" # for publishing responses back to 3rd party systems
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
module github.com/edgexfoundry/edgex-go

replace (
github.com/edgexfoundry/go-mod-bootstrap/v3 => ../MODS/go-mod-bootstrap
github.com/edgexfoundry/go-mod-messaging/v3 => ../MODS/go-mod-messaging
)

require (
bitbucket.org/bertimus9/systemstat v0.0.0-20180207000608-0eeff89b0690
github.com/eclipse/paho.mqtt.golang v1.4.2
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4=
github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.16 h1:MN6dOZHbYkW8JRDlEin/7T9nInOwpikZhthH0QaMksQ=
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.16/go.mod h1:G2C3aUWZ96nZU03XRvCBntU13eUjXGIB9KqRKrhI8h8=
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2 h1:xp5MsP+qf/fuJxy8fT7k1N+c4j4C6w04qMCBXm6id7o=
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2/go.mod h1:1Vv4uWAo6r7k6jUlqVJW8JOL6YKVBc6sRL8Al3DrMck=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.6 h1:RQFs/HjVOi1X3YxJ8sm4vuX8nhKgH0caSf9RtjQvdeI=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.6/go.mod h1:7RwSq896VqelvSU7zYKs2tpZhgELVFECkiGf6XGLKfQ=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.4 h1:swPZOjoQ/IUIWSJpZCmQENtP/plFRx5tgiCEZgnfxFU=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.4/go.mod h1:8pxuYvh2zcq1GuKqmk1MAuH1yuN40iOMmL0g2myIfwk=
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3 h1:QgZF9f70Cwpvkjw3tP1aiVGHc+yNFJNzW6hO8pDs3fg=
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3/go.mod h1:2w8v0sv+i21nY+DY6JV4PFxsNTuxpGAjlNFlFMTfZkk=
github.com/edgexfoundry/go-mod-secrets/v3 v3.0.0-dev.5 h1:tEo8BVH4OZuJ/q9ii1H4PdtxlXLh/kOKpRuWFTHOcBc=
Expand Down
41 changes: 24 additions & 17 deletions internal/core/command/controller/messaging/external.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//
// Copyright (C) 2022 IOTech Ltd
// Copyright (C) 2023 Intel Inc.
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -9,6 +10,7 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
Expand All @@ -21,30 +23,29 @@ import (
)

const (
QueryRequestTopic = "QueryRequestTopic"
QueryResponseTopic = "QueryResponseTopic"
CommandRequestTopic = "CommandRequestTopic"
CommandResponseTopicPrefix = "CommandResponseTopicPrefix"
DeviceRequestTopicPrefix = "DeviceRequestTopicPrefix"
DeviceResponseTopic = "DeviceResponseTopic"
CommandQueryRequestTopicKey = "CommandQueryRequestTopic"
CommandRequestTopicKey = "CommandRequestTopic"
DeviceCommandRequestTopicPrefixKey = "DeviceCommandRequestTopicPrefix"
ExternalCommandQueryResponseTopicKey = "CommandQueryResponseTopic"
ExternalCommandResponseTopicPrefixKey = "CommandResponseTopicPrefix"
)

func OnConnectHandler(router MessagingRouter, dic *di.Container) mqtt.OnConnectHandler {
func OnConnectHandler(requestTimeout time.Duration, dic *di.Container) mqtt.OnConnectHandler {
return func(client mqtt.Client) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
config := container.ConfigurationFrom(dic.Get)
externalTopics := config.ExternalMQTT.Topics
qos := config.ExternalMQTT.QoS

requestQueryTopic := externalTopics[QueryRequestTopic]
requestQueryTopic := externalTopics[CommandQueryRequestTopicKey]
if token := client.Subscribe(requestQueryTopic, qos, commandQueryHandler(dic)); token.Wait() && token.Error() != nil {
lc.Errorf("could not subscribe to topic '%s': %s", requestQueryTopic, token.Error().Error())
} else {
lc.Debugf("Subscribed to topic '%s' on external MQTT broker", requestQueryTopic)
}

requestCommandTopic := externalTopics[CommandRequestTopic]
if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(router, dic)); token.Wait() && token.Error() != nil {
requestCommandTopic := externalTopics[CommandRequestTopicKey]
if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(requestTimeout, dic)); token.Wait() && token.Error() != nil {
lc.Errorf("could not subscribe to topic '%s': %s", requestCommandTopic, token.Error().Error())
} else {
lc.Debugf("Subscribed to topic '%s' on external MQTT broker", requestCommandTopic)
Expand All @@ -65,7 +66,7 @@ func commandQueryHandler(dic *di.Container) mqtt.MessageHandler {
}

externalMQTTInfo := container.ConfigurationFrom(dic.Get).ExternalMQTT
responseTopic := externalMQTTInfo.Topics[QueryResponseTopic]
responseTopic := externalMQTTInfo.Topics[ExternalCommandQueryResponseTopicKey]
if responseTopic == "" {
lc.Error("QueryResponseTopic not provided in External.Topics")
lc.Warn("Not publishing error message back due to insufficient information on response topic")
Expand All @@ -91,7 +92,7 @@ func commandQueryHandler(dic *di.Container) mqtt.MessageHandler {
}
}

func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.MessageHandler {
func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt.MessageHandler {
return func(client mqtt.Client, message mqtt.Message) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
lc.Debugf("Received command request from external message broker on topic '%s' with %d bytes", message.Topic(), len(message.Payload()))
Expand Down Expand Up @@ -124,10 +125,11 @@ func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.Messa
lc.Warn("Not publishing error message back due to insufficient information on response topic")
return
}
externalResponseTopic := strings.Join([]string{externalMQTTInfo.Topics[CommandResponseTopicPrefix], deviceName, commandName, method}, "/")

externalResponseTopic := strings.Join([]string{externalMQTTInfo.Topics[ExternalCommandResponseTopicPrefixKey], deviceName, commandName, method}, "/")

internalMessageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus
deviceRequestTopic, err := validateRequestTopic(internalMessageBusInfo.Topics[DeviceRequestTopicPrefix], deviceName, commandName, method, dic)
deviceServiceName, deviceRequestTopic, err := validateRequestTopic(internalMessageBusInfo.Topics[DeviceCommandRequestTopicPrefixKey], deviceName, commandName, method, dic)
if err != nil {
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error())
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
Expand All @@ -142,16 +144,21 @@ func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.Messa
}

internalMessageBus := bootstrapContainer.MessagingClientFrom(dic.Get)
err = internalMessageBus.Publish(requestEnvelope, deviceRequestTopic)

lc.Debugf("Sending Command request to internal MessageBus. Topic: %s, Request-id: %s Correlation-id: %s", deviceRequestTopic, requestEnvelope.RequestID, requestEnvelope.CorrelationID)

// Request waits for the response and returns it.
response, err := internalMessageBus.Request(requestEnvelope, deviceServiceName, deviceRequestTopic, requestTimeout)
if err != nil {
errorMessage := fmt.Sprintf("Failed to send DeviceCommand request with internal MessageBus: %v", err)
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}

lc.Debugf("Command request sent to internal MessageBus. Topic: %s, Correlation-id: %s", deviceRequestTopic, requestEnvelope.CorrelationID)
router.SetResponseTopic(requestEnvelope.RequestID, externalResponseTopic, true)
lc.Debugf("Command response received from internal MessageBus. Topic: %s, Request-id: %s Correlation-id: %s", response.RequestID, response.CorrelationID)

publishMessage(client, externalResponseTopic, qos, retain, *response, lc)
}
}

Expand Down
38 changes: 19 additions & 19 deletions internal/core/command/controller/messaging/external_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//
// Copyright (C) 2022-2023 IOTech Ltd
// Copyright (C) 2023 Intel Inc.
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -12,6 +13,7 @@ import (
"net/http"
"strings"
"testing"
"time"

clientMocks "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/interfaces/mocks"
lcMocks "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger/mocks"
Expand Down Expand Up @@ -53,12 +55,10 @@ const (
testExternalCommandRequestTopic = "unittest/external/request/#"
testExternalCommandRequestTopicExample = "unittest/external/request/testDevice/testCommand/get"
testExternalCommandResponseTopicPrefix = "unittest/external/response"

testInternalCommandRequestTopicPrefix = "unittest/internal/request"
testInternalCommandRequestTopicPrefix = "unittest/internal/request"
)

func TestOnConnectHandler(t *testing.T) {
mockRouter := &mocks.MessagingRouter{}
lc := &lcMocks.LoggingClient{}
lc.On("Errorf", mock.Anything, mock.Anything, mock.Anything).Return(nil)
lc.On("Debugf", mock.Anything, mock.Anything).Return(nil)
Expand All @@ -67,10 +67,10 @@ func TestOnConnectHandler(t *testing.T) {
return &config.ConfigurationStruct{
ExternalMQTT: bootstrapConfig.ExternalMQTTInfo{
Topics: map[string]string{
QueryRequestTopic: testQueryRequestTopic,
QueryResponseTopic: testQueryResponseTopic,
CommandRequestTopic: testExternalCommandRequestTopic,
CommandResponseTopicPrefix: testExternalCommandResponseTopicPrefix,
CommandRequestTopicKey: testExternalCommandRequestTopic,
ExternalCommandResponseTopicPrefixKey: testExternalCommandResponseTopicPrefix,
CommandQueryRequestTopicKey: testQueryRequestTopic,
ExternalCommandQueryResponseTopicKey: testQueryResponseTopic,
},
QoS: 0,
Retain: true,
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestOnConnectHandler(t *testing.T) {
client.On("Subscribe", testQueryRequestTopic, byte(0), mock.Anything).Return(token)
client.On("Subscribe", testExternalCommandRequestTopic, byte(0), mock.Anything).Return(token)

fn := OnConnectHandler(mockRouter, dic)
fn := OnConnectHandler(time.Second*10, dic)
fn(client)

if tt.expectedSucceed {
Expand All @@ -124,7 +124,7 @@ func Test_commandQueryHandler(t *testing.T) {
Name: testProfileName,
},
DeviceResources: []dtos.DeviceResource{
dtos.DeviceResource{
{
Name: testResourceName,
Properties: dtos.ResourceProperties{
ValueType: common.ValueTypeString,
Expand All @@ -144,7 +144,7 @@ func Test_commandQueryHandler(t *testing.T) {
allDevicesResponse := responses.MultiDevicesResponse{
BaseWithTotalCountResponse: commonDTO.NewBaseWithTotalCountResponse("", "", http.StatusOK, 1),
Devices: []dtos.Device{
dtos.Device{
{
Name: testDeviceName,
ProfileName: testProfileName,
},
Expand Down Expand Up @@ -173,7 +173,7 @@ func Test_commandQueryHandler(t *testing.T) {
QoS: 0,
Retain: true,
Topics: map[string]string{
QueryResponseTopic: testQueryResponseTopic,
ExternalCommandQueryResponseTopicKey: testQueryResponseTopic,
},
},
}
Expand Down Expand Up @@ -269,12 +269,12 @@ func Test_commandRequestHandler(t *testing.T) {
},
}

mockRouter := &mocks.MessagingRouter{}
mockRouter.On("SetResponseTopic", mock.Anything, mock.Anything, mock.Anything).Return(nil)
expectedResponse := &types.MessageEnvelope{}

lc := &lcMocks.LoggingClient{}
lc.On("Error", mock.Anything).Return(nil)
lc.On("Errorf", mock.Anything, mock.Anything).Return(nil)
lc.On("Debugf", mock.Anything, mock.Anything, mock.Anything).Return(nil)
lc.On("Debugf", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
lc.On("Warn", mock.Anything).Return(nil)
dc := &clientMocks.DeviceClient{}
dc.On("DeviceByName", context.Background(), testDeviceName).Return(deviceResponse, nil)
Expand All @@ -284,7 +284,7 @@ func Test_commandRequestHandler(t *testing.T) {
dsc.On("DeviceServiceByName", context.Background(), testDeviceServiceName).Return(deviceServiceResponse, nil)
dsc.On("DeviceServiceByName", context.Background(), unknownService).Return(responses.DeviceServiceResponse{}, edgexErr.NewCommonEdgeX(edgexErr.KindEntityDoesNotExist, "unknown device service", nil))
client := &internalMessagingMocks.MessageClient{}
client.On("Publish", mock.Anything, mock.Anything).Return(nil)
client.On("Request", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(expectedResponse, nil)
dic := di.NewContainer(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
return &config.ConfigurationStruct{
Expand All @@ -295,14 +295,14 @@ func Test_commandRequestHandler(t *testing.T) {
},
MessageBus: bootstrapConfig.MessageBusInfo{
Topics: map[string]string{
DeviceRequestTopicPrefix: testInternalCommandRequestTopicPrefix,
DeviceCommandRequestTopicPrefixKey: testInternalCommandRequestTopicPrefix,
},
},
ExternalMQTT: bootstrapConfig.ExternalMQTTInfo{
QoS: 0,
Retain: true,
Topics: map[string]string{
CommandResponseTopicPrefix: testExternalCommandResponseTopicPrefix,
ExternalCommandResponseTopicPrefixKey: testExternalCommandResponseTopicPrefix,
},
},
}
Expand Down Expand Up @@ -359,7 +359,7 @@ func Test_commandRequestHandler(t *testing.T) {
mqttClient := &mocks.Client{}
mqttClient.On("Publish", mock.Anything, byte(0), true, mock.Anything).Return(token)

fn := commandRequestHandler(mockRouter, dic)
fn := commandRequestHandler(time.Second*10, dic)
fn(mqttClient, message)
if tt.expectedError {
if tt.expectedPublishError {
Expand All @@ -372,7 +372,7 @@ func Test_commandRequestHandler(t *testing.T) {
}

expectedInternalRequestTopic := strings.Join([]string{testInternalCommandRequestTopicPrefix, testDeviceServiceName, testDeviceName, testCommandName, testMethod}, "/")
client.AssertCalled(t, "Publish", tt.payload, expectedInternalRequestTopic)
client.AssertCalled(t, "Request", tt.payload, testDeviceServiceName, expectedInternalRequestTopic, mock.Anything)
})
}
}
Expand Down
Loading

0 comments on commit a657cc1

Please sign in to comment.