Skip to content

Commit

Permalink
feat(command): 3rd party command query via messaging (#4135)
Browse files Browse the repository at this point in the history
* feat(command): implement 3rd party command query via messaging

allow core command to subscribe query request and publish
commands response to 3rd party via external MQTT

Signed-off-by: Chris Hung <[email protected]>
  • Loading branch information
Chris Hung authored Sep 9, 2022
1 parent 21503d9 commit a0e04e5
Show file tree
Hide file tree
Showing 7 changed files with 724 additions and 2 deletions.
2 changes: 1 addition & 1 deletion cmd/core-command/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Type = "consul"
[MessageQueue.External.Topics]
RequestCommandTopic = "edgex/command/request/#" # for subscribing to 3rd party command requests
ResponseCommandTopicPrefix = "edgex/command/response/" # for publishing responses back to 3rd party systems /<device-name>/<command-name>/<method> will be added to this publish topic prefix
RequestQueryTopic = "edgex/commandquery/request" # for subscribing to 3rd party command query request
RequestQueryTopic = "edgex/commandquery/request/#" # for subscribing to 3rd party command query request
ResponseQueryTopic = "edgex/commandquery/response" # for publishing responses back to 3rd party systems

[SecretStore]
Expand Down
139 changes: 139 additions & 0 deletions internal/core/command/controller/messaging/external/external.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
//
// Copyright (C) 2022 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package external

import (
"encoding/json"
"fmt"
"strconv"
"strings"

mqtt "github.com/eclipse/paho.mqtt.golang"
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
"github.com/edgexfoundry/go-mod-core-contracts/v2/errors"

"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

"github.com/edgexfoundry/edgex-go/internal/core/command/application"
"github.com/edgexfoundry/edgex-go/internal/core/command/container"
)

const (
RequestQueryTopic = "RequestQueryTopic"
ResponseQueryTopic = "ResponseQueryTopic"
)

func OnConnectHandler(dic *di.Container) mqtt.OnConnectHandler {
return func(client mqtt.Client) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
config := container.ConfigurationFrom(dic.Get)
externalTopics := config.MessageQueue.External.Topics
qos := config.MessageQueue.External.QoS
retain := config.MessageQueue.External.Retain

requestQueryTopic := externalTopics[RequestQueryTopic]
responseQueryTopic := externalTopics[ResponseQueryTopic]
if token := client.Subscribe(requestQueryTopic, qos, commandQueryHandler(responseQueryTopic, qos, retain, dic)); token.Wait() && token.Error() != nil {
lc.Errorf("could not subscribe to topic '%s': %s", responseQueryTopic, token.Error().Error())

return
}
}
}

func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Container) mqtt.MessageHandler {
return func(client mqtt.Client, message mqtt.Message) {
var errorMessage string
var responseEnvelope types.MessageEnvelope
lc := bootstrapContainer.LoggingClientFrom(dic.Get)

requestEnvelope, err := types.NewMessageEnvelopeFromJSON(message.Payload())
if err != nil {
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error())
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
return
}

// example topic scheme: edgex/commandquery/request/<device>
// deviceName is expected to be at last topic level.
topicLevels := strings.Split(message.Topic(), "/")
deviceName := topicLevels[len(topicLevels)-1]
if strings.EqualFold(deviceName, common.All) {
deviceName = common.All
}

var commands any
var edgexErr errors.EdgeX
switch deviceName {
case common.All:
offset, limit := common.DefaultOffset, common.DefaultLimit
if requestEnvelope.QueryParams != nil {
if offsetRaw, ok := requestEnvelope.QueryParams[common.Offset]; ok {
offset, err = strconv.Atoi(offsetRaw)
if err != nil {
errorMessage = fmt.Sprintf("Failed to convert 'offset' query parameter to intger: %s", err.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
return
}
}
if limitRaw, ok := requestEnvelope.QueryParams[common.Limit]; ok {
limit, err = strconv.Atoi(limitRaw)
if err != nil {
errorMessage = fmt.Sprintf("Failed to convert 'limit' query parameter to integer: %s", err.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
return
}
}
}

commands, _, edgexErr = application.AllCommands(offset, limit, dic)
if edgexErr != nil {
errorMessage = fmt.Sprintf("Failed to get all commands: %s", edgexErr.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
return
}
default:
commands, edgexErr = application.CommandsByDeviceName(deviceName, dic)
if edgexErr != nil {
errorMessage = fmt.Sprintf("Failed to get commands by device name '%s': %s", deviceName, edgexErr.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
return
}
}

payloadBytes, err := json.Marshal(commands)
if err != nil {
errorMessage = fmt.Sprintf("Failed to json encoding deviceCommands payload: %s", err.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
return
}

responseEnvelope, _ = types.NewMessageEnvelopeForResponse(payloadBytes, requestEnvelope.RequestID, requestEnvelope.CorrelationID, common.ContentTypeJSON)
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
}
}

func publishMessage(client mqtt.Client, responseTopic string, qos byte, retain bool, message types.MessageEnvelope, lc logger.LoggingClient) {
if message.ErrorCode == 1 {
lc.Error(string(message.Payload))
}

envelopeBytes, _ := json.Marshal(&message)

if token := client.Publish(responseTopic, qos, retain, envelopeBytes); token.Wait() && token.Error() != nil {
lc.Errorf("Could not publish to topic '%s': %s", responseTopic, token.Error())
} else {
lc.Debugf("Published response message on topic '%s' with %d bytes", responseTopic, len(envelopeBytes))
}
}
213 changes: 213 additions & 0 deletions internal/core/command/controller/messaging/external/external_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
//
// Copyright (C) 2022 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package external

import (
"context"
"encoding/json"
"errors"
"net/http"
"testing"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
bootstrapConfig "github.com/edgexfoundry/go-mod-bootstrap/v2/config"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
clientMocks "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/interfaces/mocks"
lcMocks "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger/mocks"
"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
commonDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/common"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/responses"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/edgexfoundry/edgex-go/internal/core/command/config"
"github.com/edgexfoundry/edgex-go/internal/core/command/container"
"github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/mocks"
)

const (
mockHost = "127.0.0.1"
mockPort = 66666

testProfileName = "testProfile"
testResourceName = "testResource"
testDeviceName = "testDevice"

testRequestQueryTopic = "unittest/#"
testRequestQueryAllTopic = "unittest/all"
testRequestQueryByDeviceNameTopic = "unittest/testDevice"
testResponseTopic = "unittest/response"
)

func TestOnConnectHandler(t *testing.T) {
lc := &lcMocks.LoggingClient{}
lc.On("Errorf", mock.Anything, mock.Anything, mock.Anything).Return(nil)
dic := di.NewContainer(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
return &config.ConfigurationStruct{
MessageQueue: config.MessageQueue{
Required: false,
External: bootstrapConfig.ExternalMQTTInfo{
Topics: map[string]string{
RequestQueryTopic: testRequestQueryTopic,
ResponseQueryTopic: testResponseTopic,
},
QoS: 0,
Retain: true,
},
},
}
},
bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} {
return lc
},
})

tests := []struct {
name string
expectedSucceed bool
}{
{"subscribe succeed", true},
{"subscribe fail", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
token := &mocks.Token{}
token.On("Wait").Return(true)
if tt.expectedSucceed {
token.On("Error").Return(nil)
} else {
token.On("Error").Return(errors.New("error"))
}

client := &mocks.Client{}
client.On("Subscribe", testRequestQueryTopic, byte(0), mock.Anything).Return(token)

fn := OnConnectHandler(dic)
fn(client)

client.AssertCalled(t, "Subscribe", testRequestQueryTopic, byte(0), mock.Anything)
if !tt.expectedSucceed {
lc.AssertCalled(t, "Errorf", mock.Anything, mock.Anything, mock.Anything)
}
})
}
}

func Test_commandQueryHandler(t *testing.T) {
profileResponse := responses.DeviceProfileResponse{
BaseResponse: commonDTO.NewBaseResponse("", "", http.StatusOK),
Profile: dtos.DeviceProfile{
DeviceProfileBasicInfo: dtos.DeviceProfileBasicInfo{
Name: testProfileName,
},
DeviceResources: []dtos.DeviceResource{
dtos.DeviceResource{
Name: testResourceName,
Properties: dtos.ResourceProperties{
ValueType: common.ValueTypeString,
ReadWrite: common.ReadWrite_RW,
},
},
},
},
}
deviceResponse := responses.DeviceResponse{
BaseResponse: commonDTO.NewBaseResponse("", "", http.StatusOK),
Device: dtos.Device{
Name: testDeviceName,
ProfileName: testProfileName,
},
}
allDevicesResponse := responses.MultiDevicesResponse{
BaseWithTotalCountResponse: commonDTO.NewBaseWithTotalCountResponse("", "", http.StatusOK, 1),
Devices: []dtos.Device{
dtos.Device{
Name: testDeviceName,
ProfileName: testProfileName,
},
},
}

lc := &lcMocks.LoggingClient{}
lc.On("Error", mock.Anything).Return(nil)
lc.On("Debugf", mock.Anything, mock.Anything, mock.Anything).Return(nil)
dc := &clientMocks.DeviceClient{}
dc.On("AllDevices", context.Background(), []string(nil), common.DefaultOffset, common.DefaultLimit).Return(allDevicesResponse, nil)
dc.On("DeviceByName", context.Background(), testDeviceName).Return(deviceResponse, nil)
dpc := &clientMocks.DeviceProfileClient{}
dpc.On("DeviceProfileByName", context.Background(), testProfileName).Return(profileResponse, nil)
dic := di.NewContainer(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
return &config.ConfigurationStruct{
Service: bootstrapConfig.ServiceInfo{
Host: mockHost,
Port: mockPort,
MaxResultCount: 20,
},
}
},
bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} {
return lc
},
bootstrapContainer.DeviceClientName: func(get di.Get) interface{} {
return dc
},
bootstrapContainer.DeviceProfileClientName: func(get di.Get) interface{} {
return dpc
},
})

validPayload := testPayload()
invalidRequestPayload := testPayload()
invalidRequestPayload.ApiVersion = "v1"
invalidQueryParamsPayload := testPayload()
invalidQueryParamsPayload.QueryParams[common.Offset] = "invalid"

tests := []struct {
name string
requestQueryTopic string
payload types.MessageEnvelope
expectedError bool
}{
{"valid - query all", testRequestQueryAllTopic, validPayload, false},
{"valid - query by device name", testRequestQueryByDeviceNameTopic, validPayload, false},
{"invalid - invalid request json payload", testRequestQueryByDeviceNameTopic, invalidRequestPayload, true},
{"invalid - invalid query parameters", testRequestQueryAllTopic, invalidQueryParamsPayload, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
payloadBytes, err := json.Marshal(tt.payload)
require.NoError(t, err)

message := &mocks.Message{}
message.On("Payload").Return(payloadBytes)
message.On("Topic").Return(tt.requestQueryTopic)

token := &mocks.Token{}
token.On("Wait").Return(true)
token.On("Error").Return(nil)

client := &mocks.Client{}
client.On("Publish", testResponseTopic, byte(0), true, mock.Anything).Return(token)

fn := commandQueryHandler(testResponseTopic, 0, true, dic)
fn(client, message)
lc.AssertCalled(t, "Debugf", mock.Anything, mock.Anything, mock.Anything)
if tt.expectedError {
lc.AssertCalled(t, "Error", mock.Anything)
}
})
}
}

func testPayload() types.MessageEnvelope {
payload := types.NewMessageEnvelopeForRequest(nil, nil)

return payload
}
Loading

0 comments on commit a0e04e5

Please sign in to comment.