Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(command): command query via internal messaging #4182

Merged
merged 1 commit into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/core-command/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Type = "consul"
DeviceResponseTopic = "edgex/command/response/#" # for subscribing to device service responses
CommandRequestTopic = "/command/request/#" # for subscribing to internal command requests
CommandResponseTopicPrefix = "/command/response" # for publishing responses back to internal service /<device-name>/<command-name>/<method> will be added to this publish topic prefix
QueryRequestTopic = "/commandquery/request" # for subscribing to internal command query requests
QueryRequestTopic = "/commandquery/request/#" # for subscribing to internal command query requests
QueryResponseTopic = "/commandquery/response" # for publishing reponsses back to internal service
[MessageQueue.Internal.Optional]
# Default MQTT Specific options that need to be here to enable evnironment variable overrides of them
Expand Down
89 changes: 3 additions & 86 deletions internal/core/command/controller/messaging/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,17 @@
package messaging

import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"

mqtt "github.com/eclipse/paho.mqtt.golang"
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
edgexErr "github.com/edgexfoundry/go-mod-core-contracts/v2/errors"

"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

"github.com/edgexfoundry/edgex-go/internal/core/command/application"
"github.com/edgexfoundry/edgex-go/internal/core/command/container"
)

Expand Down Expand Up @@ -60,8 +54,6 @@ func OnConnectHandler(router MessagingRouter, dic *di.Container) mqtt.OnConnectH

func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Container) mqtt.MessageHandler {
return func(client mqtt.Client, message mqtt.Message) {
var errorMessage string
var responseEnvelope types.MessageEnvelope
lc := bootstrapContainer.LoggingClientFrom(dic.Get)

requestEnvelope, err := types.NewMessageEnvelopeFromJSON(message.Payload())
Expand All @@ -79,58 +71,11 @@ func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Co
deviceName = common.All
}

var commands any
var edgexErr edgexErr.EdgeX
switch deviceName {
case common.All:
offset, limit := common.DefaultOffset, common.DefaultLimit
if requestEnvelope.QueryParams != nil {
if offsetRaw, ok := requestEnvelope.QueryParams[common.Offset]; ok {
offset, err = strconv.Atoi(offsetRaw)
if err != nil {
errorMessage = fmt.Sprintf("Failed to convert 'offset' query parameter to intger: %s", err.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
return
}
}
if limitRaw, ok := requestEnvelope.QueryParams[common.Limit]; ok {
limit, err = strconv.Atoi(limitRaw)
if err != nil {
errorMessage = fmt.Sprintf("Failed to convert 'limit' query parameter to integer: %s", err.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
return
}
}
}

commands, _, edgexErr = application.AllCommands(offset, limit, dic)
if edgexErr != nil {
errorMessage = fmt.Sprintf("Failed to get all commands: %s", edgexErr.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
return
}
default:
commands, edgexErr = application.CommandsByDeviceName(deviceName, dic)
if edgexErr != nil {
errorMessage = fmt.Sprintf("Failed to get commands by device name '%s': %s", deviceName, edgexErr.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
return
}
responseEnvelope, edgexErr := getCommandQueryResponseEnvelope(requestEnvelope, deviceName, dic)
if edgexErr != nil {
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, edgexErr.Error())
}

payloadBytes, err := json.Marshal(commands)
if err != nil {
errorMessage = fmt.Sprintf("Failed to json encoding deviceCommands payload: %s", err.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
return
}

responseEnvelope, _ = types.NewMessageEnvelopeForResponse(payloadBytes, requestEnvelope.RequestID, requestEnvelope.CorrelationID, common.ContentTypeJSON)
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
}
}
Expand Down Expand Up @@ -203,31 +148,3 @@ func publishMessage(client mqtt.Client, responseTopic string, qos byte, retain b
lc.Debugf("Published response message to external message queue on topic '%s' with %d bytes", responseTopic, len(envelopeBytes))
}
}

// validateRequestTopic validates the request topic by checking the existence of device and device service,
// returns the internal device request topic to which the command request will be sent.
func validateRequestTopic(prefix string, deviceName string, commandName string, method string, dic *di.Container) (string, error) {
// retrieve device information through Metadata DeviceClient
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
if dc == nil {
return "", errors.New("nil Device Client")
}
deviceResponse, err := dc.DeviceByName(context.Background(), deviceName)
if err != nil {
return "", fmt.Errorf("failed to get Device by name %s: %v", deviceName, err)
}

// retrieve device service information through Metadata DeviceClient
dsc := bootstrapContainer.DeviceServiceClientFrom(dic.Get)
if dsc == nil {
return "", errors.New("nil DeviceService Client")
}
deviceServiceResponse, err := dsc.DeviceServiceByName(context.Background(), deviceResponse.Device.ServiceName)
if err != nil {
return "", fmt.Errorf("failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err)
}

// expected internal command request topic scheme: #/<device-service>/<device>/<command-name>/<method>
return strings.Join([]string{prefix, deviceServiceResponse.Service.Name, deviceName, commandName, method}, "/"), nil

}
63 changes: 63 additions & 0 deletions internal/core/command/controller/messaging/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
"github.com/edgexfoundry/go-mod-core-contracts/v2/errors"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

Expand Down Expand Up @@ -155,3 +156,65 @@ func SubscribeCommandRequests(ctx context.Context, router MessagingRouter, dic *

return nil
}

// SubscribeCommandQueryRequests subscribes command query requests from EdgeX service (e.g., Application Service)
// via internal MessageBus
func SubscribeCommandQueryRequests(ctx context.Context, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue
internalQueryRequestTopic := messageBusInfo.Internal.Topics[QueryRequestTopic]
internalQueryResponseTopic := messageBusInfo.Internal.Topics[QueryResponseTopic]

messages := make(chan types.MessageEnvelope)
messageErrors := make(chan error)
topics := []types.TopicChannel{
{
Topic: internalQueryRequestTopic,
Messages: messages,
},
}

messageBus := bootstrapContainer.MessagingClientFrom(dic.Get)
err := messageBus.Subscribe(topics, messageErrors)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}

go func() {
for {
select {
case <-ctx.Done():
lc.Infof("Exiting waiting for MessageBus '%s' topic messages", internalQueryRequestTopic)
return
case err = <-messageErrors:
lc.Error(err.Error())
case requestEnvelope := <-messages:
lc.Debugf("Command query request received on internal MessageBus. Topic: %s, Correlation-id: %s ", requestEnvelope.ReceivedTopic, requestEnvelope.CorrelationID)

// example topic scheme: /commandquery/request/<device>
// deviceName is expected to be at last topic level.
topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/")
deviceName := topicLevels[len(topicLevels)-1]
if strings.EqualFold(deviceName, common.All) {
deviceName = common.All
}

responseEnvelope, edgexErr := getCommandQueryResponseEnvelope(requestEnvelope, deviceName, dic)
if edgexErr != nil {
lc.Error(edgexErr.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, edgexErr.Error())
}

err = messageBus.Publish(responseEnvelope, internalQueryResponseTopic)
if err != nil {
lc.Errorf("Could not publish to topic '%s': %s", internalQueryResponseTopic, err.Error())
continue
}

lc.Debugf("Command query response sent to internal MessageBus. Topic: %s, Correlation-id: %s", internalQueryResponseTopic, requestEnvelope.CorrelationID)
}
}
}()

return nil
}
99 changes: 99 additions & 0 deletions internal/core/command/controller/messaging/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//
// Copyright (C) 2022 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package messaging

import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
edgexErr "github.com/edgexfoundry/go-mod-core-contracts/v2/errors"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

"github.com/edgexfoundry/edgex-go/internal/core/command/application"
)

// validateRequestTopic validates the request topic by checking the existence of device and device service,
// returns the internal device request topic to which the command request will be sent.
func validateRequestTopic(prefix string, deviceName string, commandName string, method string, dic *di.Container) (string, error) {
// retrieve device information through Metadata DeviceClient
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
if dc == nil {
return "", errors.New("nil Device Client")
}
deviceResponse, err := dc.DeviceByName(context.Background(), deviceName)
if err != nil {
return "", fmt.Errorf("failed to get Device by name %s: %v", deviceName, err)
}

// retrieve device service information through Metadata DeviceClient
dsc := bootstrapContainer.DeviceServiceClientFrom(dic.Get)
if dsc == nil {
return "", errors.New("nil DeviceService Client")
}
deviceServiceResponse, err := dsc.DeviceServiceByName(context.Background(), deviceResponse.Device.ServiceName)
if err != nil {
return "", fmt.Errorf("failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err)
}

// expected internal command request topic scheme: #/<device-service>/<device>/<command-name>/<method>
return strings.Join([]string{prefix, deviceServiceResponse.Service.Name, deviceName, commandName, method}, "/"), nil

}

// getCommandQueryResponseEnvelope returns the MessageEnvelope containing the DeviceCoreCommand payload bytes
func getCommandQueryResponseEnvelope(requestEnvelope types.MessageEnvelope, deviceName string, dic *di.Container) (types.MessageEnvelope, edgexErr.EdgeX) {
var err error
var commands any
var edgexError edgexErr.EdgeX

switch deviceName {
case common.All:
offset, limit := common.DefaultOffset, common.DefaultLimit
if requestEnvelope.QueryParams != nil {
if offsetRaw, ok := requestEnvelope.QueryParams[common.Offset]; ok {
offset, err = strconv.Atoi(offsetRaw)
if err != nil {
return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindContractInvalid, fmt.Sprintf("Failed to convert 'offset' query parameter to intger: %s", err.Error()), err)
}
}
if limitRaw, ok := requestEnvelope.QueryParams[common.Limit]; ok {
limit, err = strconv.Atoi(limitRaw)
if err != nil {
return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindContractInvalid, fmt.Sprintf("Failed to convert 'limit' query parameter to integer: %s", err.Error()), err)
}
}
}

commands, _, edgexError = application.AllCommands(offset, limit, dic)
if edgexError != nil {
return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindServerError, fmt.Sprintf("Failed to get all commands: %s", edgexError.Error()), edgexError)
}
default:
commands, edgexError = application.CommandsByDeviceName(deviceName, dic)
if edgexError != nil {
return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindServerError, fmt.Sprintf("Failed to get commands by device name '%s': %s", deviceName, edgexError.Error()), edgexError)
}
}

responseBytes, err := json.Marshal(commands)
if err != nil {
return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindServerError, fmt.Sprintf("Failed to json encoding device commands payload: %s", err.Error()), err)
}

responseEnvelope, err := types.NewMessageEnvelopeForResponse(responseBytes, requestEnvelope.RequestID, requestEnvelope.CorrelationID, common.ContentTypeJSON)
if err != nil {
return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindServerError, fmt.Sprintf("Failed to create response MessageEnvelope: %s", err.Error()), err)
}

return responseEnvelope, nil
}
7 changes: 5 additions & 2 deletions internal/core/command/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func Main(ctx context.Context, cancel context.CancelFunc, router *mux.Router) {
// This is required for backwards compatability with older versions of 2.x configuration
// TODO: Remove in EdgeX 3.0
func MessageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
configuration := container.ConfigurationFrom(dic.Get)
if configuration.MessageQueue.Required {
router := messaging.NewMessagingRouter()
Expand All @@ -98,15 +99,17 @@ func MessageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startup
return false
}
if err := messaging.SubscribeCommandRequests(ctx, router, dic); err != nil {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
lc.Errorf("Failed to subscribe commands request from internal message bus, %v", err)
return false
}
if err := messaging.SubscribeCommandResponses(ctx, router, dic); err != nil {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
lc.Errorf("Failed to subscribe commands response from internal message bus, %v", err)
return false
}
if err := messaging.SubscribeCommandQueryRequests(ctx, dic); err != nil {
lc.Errorf("Failed to subscribe command query request from internal message bus, %v", err)
return false
}
}

// Not required so do nothing
Expand Down