Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(command): command request from external MQTT to internal MessageBus #4153

Merged
merged 3 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/core-command/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Type = "consul"
AuthMode = "usernamepassword" # required for redis messagebus (secure or insecure).
SecretName = "redisdb"
[MessageQueue.Internal.Topics]
RequestTopicPrefix = "edgex/command/request/" # for publishing requests to the device service; <device-service>/<device-name>/<command-name>/<method> will be added to this publish topic prefix
RequestTopicPrefix = "edgex/command/request" # for publishing requests to the device service; <device-service>/<device-name>/<command-name>/<method> will be added to this publish topic prefix
ResponseTopic = "edgex/command/response/#" # for subscribing to device service responses
InternalRequestCommandTopic = "/command/request/#" # for subscribing to internal command requests
InternalResponseCommandTopicPrefix = "/command/response/" # for publishing responses back to internal service /<device-name>/<command-name>/<method> will be added to this publish topic prefix
Expand Down Expand Up @@ -90,7 +90,7 @@ Type = "consul"
AuthMode = "none"
[MessageQueue.External.Topics]
RequestCommandTopic = "edgex/command/request/#" # for subscribing to 3rd party command requests
ResponseCommandTopicPrefix = "edgex/command/response/" # for publishing responses back to 3rd party systems /<device-name>/<command-name>/<method> will be added to this publish topic prefix
ResponseCommandTopicPrefix = "edgex/command/response" # for publishing responses back to 3rd party systems /<device-name>/<command-name>/<method> will be added to this publish topic prefix
RequestQueryTopic = "edgex/commandquery/request/#" # for subscribing to 3rd party command query request
ResponseQueryTopic = "edgex/commandquery/response" # for publishing responses back to 3rd party systems

Expand Down
86 changes: 84 additions & 2 deletions internal/core/command/controller/messaging/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package external

import (
"context"
"encoding/json"
"fmt"
"strconv"
Expand All @@ -25,8 +26,12 @@ import (
)

const (
RequestQueryTopic = "RequestQueryTopic"
ResponseQueryTopic = "ResponseQueryTopic"
RequestQueryTopic = "RequestQueryTopic"
ResponseQueryTopic = "ResponseQueryTopic"
RequestCommandTopic = "RequestCommandTopic"
ResponseCommandTopicPrefix = "ResponseCommandTopicPrefix"
RequestTopicPrefix = "RequestTopicPrefix"
ResponseTopic = "ResponseTopic"
)

func OnConnectHandler(dic *di.Container) mqtt.OnConnectHandler {
Expand All @@ -41,7 +46,12 @@ func OnConnectHandler(dic *di.Container) mqtt.OnConnectHandler {
responseQueryTopic := externalTopics[ResponseQueryTopic]
if token := client.Subscribe(requestQueryTopic, qos, commandQueryHandler(responseQueryTopic, qos, retain, dic)); token.Wait() && token.Error() != nil {
lc.Errorf("could not subscribe to topic '%s': %s", responseQueryTopic, token.Error().Error())
return
}

requestCommandTopic := externalTopics[RequestCommandTopic]
if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(dic)); token.Wait() && token.Error() != nil {
lc.Errorf("could not subscribe to topic '%s': %s", responseQueryTopic, token.Error().Error())
return
}
}
Expand Down Expand Up @@ -124,6 +134,78 @@ func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Co
}
}

func commandRequestHandler(dic *di.Container) mqtt.MessageHandler {
return func(client mqtt.Client, message mqtt.Message) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue
qos := messageBusInfo.External.QoS
retain := messageBusInfo.External.Retain

// expected command request topic scheme: #/<device>/<command-name>/<method>
topicLevels := strings.Split(message.Topic(), "/")
length := len(topicLevels)
deviceName := topicLevels[length-3]
lenny-goodell marked this conversation as resolved.
Show resolved Hide resolved
commandName := topicLevels[length-2]
method := topicLevels[length-1]
// expected command response topic scheme: #/<device>/<command-name>/<method>
externalResponseTopic := strings.Join([]string{messageBusInfo.External.Topics[ResponseCommandTopicPrefix], deviceName, commandName, method}, "/")

requestEnvelope, err := types.NewMessageEnvelopeFromJSON(message.Payload())
if err != nil {
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error())
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}

if !strings.EqualFold(method, "get") && !strings.EqualFold(method, "set") {
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, fmt.Sprintf("unknown command method %s received", method))
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}

// retrieve device information through Metadata DeviceClient
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
if dc == nil {
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, "nil Device Client")
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}
deviceResponse, err := dc.DeviceByName(context.Background(), deviceName)
if err != nil {
errorMessage := fmt.Sprintf("Failed to get Device by name %s: %v", deviceName, err)
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}

// retrieve device service information through Metadata DeviceClient
dsc := bootstrapContainer.DeviceServiceClientFrom(dic.Get)
if dsc == nil {
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, "nil DeviceService Client")
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}
deviceServiceResponse, err := dsc.DeviceServiceByName(context.Background(), deviceResponse.Device.ServiceName)
if err != nil {
errorMessage := fmt.Sprintf("Failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err)
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}

// expected internal command request topic scheme: #/<device-service>/<device>/<command-name>/<method>
internalRequestTopic := strings.Join([]string{messageBusInfo.Internal.Topics[RequestTopicPrefix], deviceServiceResponse.Service.Name, deviceName, commandName, method}, "/")
internalMessageBus := bootstrapContainer.MessagingClientFrom(dic.Get)
err = internalMessageBus.Publish(requestEnvelope, internalRequestTopic)
if err != nil {
errorMessage := fmt.Sprintf("Failed to send DeviceCommand request with internal MessageBus: %v", err)
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}
}
}

func publishMessage(client mqtt.Client, responseTopic string, qos byte, retain bool, message types.MessageEnvelope, lc logger.LoggingClient) {
if message.ErrorCode == 1 {
lc.Error(string(message.Payload))
Expand Down
Loading