Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(command): command request from external MQTT to internal MessageBus #4153

Merged
merged 3 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 84 additions & 2 deletions internal/core/command/controller/messaging/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package external

import (
"context"
"encoding/json"
"fmt"
"strconv"
Expand All @@ -25,8 +26,12 @@ import (
)

const (
RequestQueryTopic = "RequestQueryTopic"
ResponseQueryTopic = "ResponseQueryTopic"
RequestQueryTopic = "RequestQueryTopic"
ResponseQueryTopic = "ResponseQueryTopic"
RequestCommandTopic = "RequestCommandTopic"
ResponseCommandTopicPrefix = "ResponseCommandTopicPrefix"
RequestTopicPrefix = "RequestTopicPrefix"
ResponseTopic = "ResponseTopic"
)

func OnConnectHandler(dic *di.Container) mqtt.OnConnectHandler {
Expand All @@ -41,7 +46,12 @@ func OnConnectHandler(dic *di.Container) mqtt.OnConnectHandler {
responseQueryTopic := externalTopics[ResponseQueryTopic]
if token := client.Subscribe(requestQueryTopic, qos, commandQueryHandler(responseQueryTopic, qos, retain, dic)); token.Wait() && token.Error() != nil {
lc.Errorf("could not subscribe to topic '%s': %s", responseQueryTopic, token.Error().Error())
return
}

requestCommandTopic := externalTopics[RequestCommandTopic]
if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(dic)); token.Wait() && token.Error() != nil {
lc.Errorf("could not subscribe to topic '%s': %s", responseQueryTopic, token.Error().Error())
return
}
}
Expand Down Expand Up @@ -124,6 +134,78 @@ func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Co
}
}

func commandRequestHandler(dic *di.Container) mqtt.MessageHandler {
return func(client mqtt.Client, message mqtt.Message) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue
qos := messageBusInfo.External.QoS
retain := messageBusInfo.External.Retain

// expected command request topic scheme: #/<device>/<command-name>/<method>
topicLevels := strings.Split(message.Topic(), "/")
length := len(topicLevels)
deviceName := topicLevels[length-3]
lenny-goodell marked this conversation as resolved.
Show resolved Hide resolved
commandName := topicLevels[length-2]
method := topicLevels[length-1]
// expected command response topic scheme: #/<device>/<command-name>/<method>
externalResponseTopic := messageBusInfo.External.Topics[ResponseCommandTopicPrefix] + strings.Join([]string{deviceName, commandName, method}, "/")
chr1shung marked this conversation as resolved.
Show resolved Hide resolved

requestEnvelope, err := types.NewMessageEnvelopeFromJSON(message.Payload())
if err != nil {
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error())
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}

if !strings.EqualFold(method, "get") && !strings.EqualFold(method, "set") {
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, "unknown command method")
chr1shung marked this conversation as resolved.
Show resolved Hide resolved
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}

// retrieve device information through Metadata DeviceClient
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
if dc == nil {
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, "nil Device Client")
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}
deviceResponse, err := dc.DeviceByName(context.Background(), deviceName)
if err != nil {
errorMessage := fmt.Sprintf("Failed to get Device by name %s: %v", deviceName, err)
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}

// retrieve device service information through Metadata DeviceClient
dsc := bootstrapContainer.DeviceServiceClientFrom(dic.Get)
if dsc == nil {
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, "nil DeviceService Client")
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}
deviceServiceResponse, err := dsc.DeviceServiceByName(context.Background(), deviceResponse.Device.ServiceName)
if err != nil {
errorMessage := fmt.Sprintf("Failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err)
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}

// expected internal command request topic scheme: #/<device-service>/<device>/<command-name>/<method>
internalRequestTopic := messageBusInfo.Internal.Topics[RequestTopicPrefix] + strings.Join([]string{deviceServiceResponse.Service.Name, deviceName, commandName, method}, "/")
chr1shung marked this conversation as resolved.
Show resolved Hide resolved
internalMessageBus := bootstrapContainer.MessagingClientFrom(dic.Get)
err = internalMessageBus.Publish(requestEnvelope, internalRequestTopic)
if err != nil {
errorMessage := fmt.Sprintf("Failed to send DeviceCommand request with internal MessageBus: %v", err)
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}
}
}

func publishMessage(client mqtt.Client, responseTopic string, qos byte, retain bool, message types.MessageEnvelope, lc logger.LoggingClient) {
if message.ErrorCode == 1 {
lc.Error(string(message.Payload))
Expand Down
183 changes: 154 additions & 29 deletions internal/core/command/controller/messaging/external/external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,33 @@ import (
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
commonDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/common"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/responses"
"github.com/edgexfoundry/go-mod-messaging/v2/messaging/mocks"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/edgexfoundry/edgex-go/internal/core/command/config"
"github.com/edgexfoundry/edgex-go/internal/core/command/container"
"github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/mocks"
mqttMocks "github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/mocks"
)

const (
mockHost = "127.0.0.1"
mockPort = 66666

testProfileName = "testProfile"
testResourceName = "testResource"
testDeviceName = "testDevice"
testProfileName = "testProfile"
testResourceName = "testResource"
testDeviceName = "testDevice"
testDeviceServiceName = "testService"

testRequestQueryTopic = "unittest/#"
testRequestQueryAllTopic = "unittest/all"
testRequestQueryByDeviceNameTopic = "unittest/testDevice"
testResponseTopic = "unittest/response"
testQueryRequestTopic = "unittest/#"
testQueryAllExample = "unittest/all"
testQueryByDeviceNameExample = "unittest/testDevice"
testQueryResponseTopic = "unittest/response"

testCommandRequestTopic = "unittest/external/request/#"
testCommandRequestExample = "unittest/external/request/testDevice/testCommand/get"
testCommandResponseTopicPrefix = "unittest/external/response/"
)

func TestOnConnectHandler(t *testing.T) {
Expand All @@ -54,8 +60,10 @@ func TestOnConnectHandler(t *testing.T) {
Required: false,
External: bootstrapConfig.ExternalMQTTInfo{
Topics: map[string]string{
RequestQueryTopic: testRequestQueryTopic,
ResponseQueryTopic: testResponseTopic,
RequestQueryTopic: testQueryRequestTopic,
ResponseQueryTopic: testQueryResponseTopic,
RequestCommandTopic: testCommandRequestTopic,
ResponseCommandTopicPrefix: testCommandResponseTopicPrefix,
},
QoS: 0,
Retain: true,
Expand All @@ -77,24 +85,27 @@ func TestOnConnectHandler(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
token := &mocks.Token{}
token := &mqttMocks.Token{}
token.On("Wait").Return(true)
if tt.expectedSucceed {
token.On("Error").Return(nil)
} else {
token.On("Error").Return(errors.New("error"))
}

client := &mocks.Client{}
client.On("Subscribe", testRequestQueryTopic, byte(0), mock.Anything).Return(token)
client := &mqttMocks.Client{}
client.On("Subscribe", testQueryRequestTopic, byte(0), mock.Anything).Return(token)
client.On("Subscribe", testCommandRequestTopic, byte(0), mock.Anything).Return(token)

fn := OnConnectHandler(dic)
fn(client)

client.AssertCalled(t, "Subscribe", testRequestQueryTopic, byte(0), mock.Anything)
if !tt.expectedSucceed {
lc.AssertCalled(t, "Errorf", mock.Anything, mock.Anything, mock.Anything)
if tt.expectedSucceed {
client.AssertNumberOfCalls(t, "Subscribe", 2)
return
}

lc.AssertCalled(t, "Errorf", mock.Anything, mock.Anything, mock.Anything)
})
}
}
Expand Down Expand Up @@ -163,10 +174,10 @@ func Test_commandQueryHandler(t *testing.T) {
},
})

validPayload := testPayload()
invalidRequestPayload := testPayload()
validPayload := testCommandQueryPayload()
invalidRequestPayload := testCommandQueryPayload()
invalidRequestPayload.ApiVersion = "v1"
invalidQueryParamsPayload := testPayload()
invalidQueryParamsPayload := testCommandQueryPayload()
invalidQueryParamsPayload.QueryParams[common.Offset] = "invalid"

tests := []struct {
Expand All @@ -175,28 +186,28 @@ func Test_commandQueryHandler(t *testing.T) {
payload types.MessageEnvelope
expectedError bool
}{
{"valid - query all", testRequestQueryAllTopic, validPayload, false},
{"valid - query by device name", testRequestQueryByDeviceNameTopic, validPayload, false},
{"invalid - invalid request json payload", testRequestQueryByDeviceNameTopic, invalidRequestPayload, true},
{"invalid - invalid query parameters", testRequestQueryAllTopic, invalidQueryParamsPayload, true},
{"valid - query all", testQueryAllExample, validPayload, false},
{"valid - query by device name", testQueryByDeviceNameExample, validPayload, false},
{"invalid - invalid request json payload", testQueryByDeviceNameExample, invalidRequestPayload, true},
{"invalid - invalid query parameters", testQueryAllExample, invalidQueryParamsPayload, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
payloadBytes, err := json.Marshal(tt.payload)
require.NoError(t, err)

message := &mocks.Message{}
message := &mqttMocks.Message{}
message.On("Payload").Return(payloadBytes)
message.On("Topic").Return(tt.requestQueryTopic)

token := &mocks.Token{}
token := &mqttMocks.Token{}
token.On("Wait").Return(true)
token.On("Error").Return(nil)

client := &mocks.Client{}
client.On("Publish", testResponseTopic, byte(0), true, mock.Anything).Return(token)
client := &mqttMocks.Client{}
client.On("Publish", testQueryResponseTopic, byte(0), true, mock.Anything).Return(token)

fn := commandQueryHandler(testResponseTopic, 0, true, dic)
fn := commandQueryHandler(testQueryResponseTopic, 0, true, dic)
fn(client, message)
lc.AssertCalled(t, "Debugf", mock.Anything, mock.Anything, mock.Anything)
if tt.expectedError {
Expand All @@ -206,8 +217,122 @@ func Test_commandQueryHandler(t *testing.T) {
}
}

func testPayload() types.MessageEnvelope {
func Test_commandRequestHandler(t *testing.T) {
deviceResponse := responses.DeviceResponse{
BaseResponse: commonDTO.NewBaseResponse("", "", http.StatusOK),
Device: dtos.Device{
Name: testDeviceName,
ProfileName: testProfileName,
ServiceName: testDeviceServiceName,
},
}
deviceServiceResponse := responses.DeviceServiceResponse{
BaseResponse: commonDTO.NewBaseResponse("", "", http.StatusOK),
Service: dtos.DeviceService{
Name: testDeviceServiceName,
},
}

lc := &lcMocks.LoggingClient{}
lc.On("Error", mock.Anything).Return(nil)
lc.On("Debugf", mock.Anything, mock.Anything, mock.Anything).Return(nil)
dc := &clientMocks.DeviceClient{}
dc.On("DeviceByName", context.Background(), testDeviceName).Return(deviceResponse, nil)
dsc := &clientMocks.DeviceServiceClient{}
dsc.On("DeviceServiceByName", context.Background(), testDeviceServiceName).Return(deviceServiceResponse, nil)
client := &mocks.MessageClient{}
client.On("Publish", mock.Anything, mock.Anything).Return(nil)
dic := di.NewContainer(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
return &config.ConfigurationStruct{
Service: bootstrapConfig.ServiceInfo{
Host: mockHost,
Port: mockPort,
MaxResultCount: 20,
},
MessageQueue: config.MessageQueue{
Required: true,
Internal: bootstrapConfig.MessageBusInfo{
Topics: map[string]string{
RequestTopicPrefix: "unittest/internal/request/",
lenny-goodell marked this conversation as resolved.
Show resolved Hide resolved
},
},
External: bootstrapConfig.ExternalMQTTInfo{
QoS: 0,
Retain: true,
Topics: map[string]string{
ResponseCommandTopicPrefix: testCommandResponseTopicPrefix,
lenny-goodell marked this conversation as resolved.
Show resolved Hide resolved
},
},
},
}
},
bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} {
return lc
},
bootstrapContainer.DeviceClientName: func(get di.Get) interface{} {
return dc
},
bootstrapContainer.DeviceServiceClientName: func(get di.Get) interface{} {
return dsc
},
bootstrapContainer.MessagingClientName: func(get di.Get) interface{} {
return client
},
})

validPayload := testCommandRequestPayload()
invalidRequestPayload := testCommandRequestPayload()
invalidRequestPayload.ApiVersion = "v1"

tests := []struct {
name string
commandRequestTopic string
payload types.MessageEnvelope
expectedError bool
}{
{"valid", testCommandRequestExample, validPayload, false},
{"invalid - invalid request json payload", testCommandRequestExample, invalidRequestPayload, true},
{"invalid - unknown command method", "unittest/request/testDevice/testCommand/invalid", validPayload, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
payloadBytes, err := json.Marshal(tt.payload)
require.NoError(t, err)

message := &mqttMocks.Message{}
message.On("Payload").Return(payloadBytes)
message.On("Topic").Return(tt.commandRequestTopic)

token := &mqttMocks.Token{}
token.On("Wait").Return(true)
token.On("Error").Return(nil)

mqttClient := &mqttMocks.Client{}
mqttClient.On("Publish", mock.Anything, byte(0), true, mock.Anything).Return(token)

fn := commandRequestHandler(dic)
fn(mqttClient, message)
if tt.expectedError {
lc.AssertCalled(t, "Error", mock.Anything)
} else {
client.AssertCalled(t, "Publish", tt.payload, mock.Anything)
}
})
}
}

func testCommandQueryPayload() types.MessageEnvelope {
payload := types.NewMessageEnvelopeForRequest(nil, nil)

return payload
}

func testCommandRequestPayload() types.MessageEnvelope {
payload := types.NewMessageEnvelopeForRequest(nil, map[string]string{
"ds-pushevent": "yes",
"ds-returnevent": "yes",
})

return payload
}