diff --git a/cmd/core-command/res/configuration.toml b/cmd/core-command/res/configuration.toml index 39209618ec..8ea87cef46 100644 --- a/cmd/core-command/res/configuration.toml +++ b/cmd/core-command/res/configuration.toml @@ -58,7 +58,7 @@ Type = "consul" DeviceResponseTopic = "edgex/command/response/#" # for subscribing to device service responses CommandRequestTopic = "/command/request/#" # for subscribing to internal command requests CommandResponseTopicPrefix = "/command/response" # for publishing responses back to internal service /// will be added to this publish topic prefix - QueryRequestTopic = "/commandquery/request" # for subscribing to internal command query requests + QueryRequestTopic = "/commandquery/request/#" # for subscribing to internal command query requests QueryResponseTopic = "/commandquery/response" # for publishing reponsses back to internal service [MessageQueue.Internal.Optional] # Default MQTT Specific options that need to be here to enable evnironment variable overrides of them diff --git a/internal/core/command/controller/messaging/external.go b/internal/core/command/controller/messaging/external.go index 79cca3f763..121ed033c4 100644 --- a/internal/core/command/controller/messaging/external.go +++ b/internal/core/command/controller/messaging/external.go @@ -6,11 +6,8 @@ package messaging import ( - "context" "encoding/json" - "errors" "fmt" - "strconv" "strings" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -18,11 +15,8 @@ import ( "github.com/edgexfoundry/go-mod-bootstrap/v2/di" "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" "github.com/edgexfoundry/go-mod-core-contracts/v2/common" - edgexErr "github.com/edgexfoundry/go-mod-core-contracts/v2/errors" - "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" - "github.com/edgexfoundry/edgex-go/internal/core/command/application" "github.com/edgexfoundry/edgex-go/internal/core/command/container" ) @@ -60,8 +54,6 @@ func OnConnectHandler(router MessagingRouter, dic *di.Container) mqtt.OnConnectH func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Container) mqtt.MessageHandler { return func(client mqtt.Client, message mqtt.Message) { - var errorMessage string - var responseEnvelope types.MessageEnvelope lc := bootstrapContainer.LoggingClientFrom(dic.Get) requestEnvelope, err := types.NewMessageEnvelopeFromJSON(message.Payload()) @@ -79,58 +71,11 @@ func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Co deviceName = common.All } - var commands any - var edgexErr edgexErr.EdgeX - switch deviceName { - case common.All: - offset, limit := common.DefaultOffset, common.DefaultLimit - if requestEnvelope.QueryParams != nil { - if offsetRaw, ok := requestEnvelope.QueryParams[common.Offset]; ok { - offset, err = strconv.Atoi(offsetRaw) - if err != nil { - errorMessage = fmt.Sprintf("Failed to convert 'offset' query parameter to intger: %s", err.Error()) - responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage) - publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc) - return - } - } - if limitRaw, ok := requestEnvelope.QueryParams[common.Limit]; ok { - limit, err = strconv.Atoi(limitRaw) - if err != nil { - errorMessage = fmt.Sprintf("Failed to convert 'limit' query parameter to integer: %s", err.Error()) - responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage) - publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc) - return - } - } - } - - commands, _, edgexErr = application.AllCommands(offset, limit, dic) - if edgexErr != nil { - errorMessage = fmt.Sprintf("Failed to get all commands: %s", edgexErr.Error()) - responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage) - publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc) - return - } - default: - commands, edgexErr = application.CommandsByDeviceName(deviceName, dic) - if edgexErr != nil { - errorMessage = fmt.Sprintf("Failed to get commands by device name '%s': %s", deviceName, edgexErr.Error()) - responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage) - publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc) - return - } + responseEnvelope, edgexErr := getCommandQueryResponseEnvelope(requestEnvelope, deviceName, dic) + if edgexErr != nil { + responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, edgexErr.Error()) } - payloadBytes, err := json.Marshal(commands) - if err != nil { - errorMessage = fmt.Sprintf("Failed to json encoding deviceCommands payload: %s", err.Error()) - responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage) - publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc) - return - } - - responseEnvelope, _ = types.NewMessageEnvelopeForResponse(payloadBytes, requestEnvelope.RequestID, requestEnvelope.CorrelationID, common.ContentTypeJSON) publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc) } } @@ -203,31 +148,3 @@ func publishMessage(client mqtt.Client, responseTopic string, qos byte, retain b lc.Debugf("Published response message to external message queue on topic '%s' with %d bytes", responseTopic, len(envelopeBytes)) } } - -// validateRequestTopic validates the request topic by checking the existence of device and device service, -// returns the internal device request topic to which the command request will be sent. -func validateRequestTopic(prefix string, deviceName string, commandName string, method string, dic *di.Container) (string, error) { - // retrieve device information through Metadata DeviceClient - dc := bootstrapContainer.DeviceClientFrom(dic.Get) - if dc == nil { - return "", errors.New("nil Device Client") - } - deviceResponse, err := dc.DeviceByName(context.Background(), deviceName) - if err != nil { - return "", fmt.Errorf("failed to get Device by name %s: %v", deviceName, err) - } - - // retrieve device service information through Metadata DeviceClient - dsc := bootstrapContainer.DeviceServiceClientFrom(dic.Get) - if dsc == nil { - return "", errors.New("nil DeviceService Client") - } - deviceServiceResponse, err := dsc.DeviceServiceByName(context.Background(), deviceResponse.Device.ServiceName) - if err != nil { - return "", fmt.Errorf("failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err) - } - - // expected internal command request topic scheme: #//// - return strings.Join([]string{prefix, deviceServiceResponse.Service.Name, deviceName, commandName, method}, "/"), nil - -} diff --git a/internal/core/command/controller/messaging/internal.go b/internal/core/command/controller/messaging/internal.go index 29ab1c3a5f..2cc514fac0 100644 --- a/internal/core/command/controller/messaging/internal.go +++ b/internal/core/command/controller/messaging/internal.go @@ -11,6 +11,7 @@ import ( bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v2/di" + "github.com/edgexfoundry/go-mod-core-contracts/v2/common" "github.com/edgexfoundry/go-mod-core-contracts/v2/errors" "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" @@ -155,3 +156,65 @@ func SubscribeCommandRequests(ctx context.Context, router MessagingRouter, dic * return nil } + +// SubscribeCommandQueryRequests subscribes command query requests from EdgeX service (e.g., Application Service) +// via internal MessageBus +func SubscribeCommandQueryRequests(ctx context.Context, dic *di.Container) errors.EdgeX { + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue + internalQueryRequestTopic := messageBusInfo.Internal.Topics[QueryRequestTopic] + internalQueryResponseTopic := messageBusInfo.Internal.Topics[QueryResponseTopic] + + messages := make(chan types.MessageEnvelope) + messageErrors := make(chan error) + topics := []types.TopicChannel{ + { + Topic: internalQueryRequestTopic, + Messages: messages, + }, + } + + messageBus := bootstrapContainer.MessagingClientFrom(dic.Get) + err := messageBus.Subscribe(topics, messageErrors) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + + go func() { + for { + select { + case <-ctx.Done(): + lc.Infof("Exiting waiting for MessageBus '%s' topic messages", internalQueryRequestTopic) + return + case err = <-messageErrors: + lc.Error(err.Error()) + case requestEnvelope := <-messages: + lc.Debugf("Command query request received on internal MessageBus. Topic: %s, Correlation-id: %s ", requestEnvelope.ReceivedTopic, requestEnvelope.CorrelationID) + + // example topic scheme: /commandquery/request/ + // deviceName is expected to be at last topic level. + topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/") + deviceName := topicLevels[len(topicLevels)-1] + if strings.EqualFold(deviceName, common.All) { + deviceName = common.All + } + + responseEnvelope, edgexErr := getCommandQueryResponseEnvelope(requestEnvelope, deviceName, dic) + if edgexErr != nil { + lc.Error(edgexErr.Error()) + responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, edgexErr.Error()) + } + + err = messageBus.Publish(responseEnvelope, internalQueryResponseTopic) + if err != nil { + lc.Errorf("Could not publish to topic '%s': %s", internalQueryResponseTopic, err.Error()) + continue + } + + lc.Debugf("Command query response sent to internal MessageBus. Topic: %s, Correlation-id: %s", internalQueryResponseTopic, requestEnvelope.CorrelationID) + } + } + }() + + return nil +} diff --git a/internal/core/command/controller/messaging/utils.go b/internal/core/command/controller/messaging/utils.go new file mode 100644 index 0000000000..e0ecd1ecfd --- /dev/null +++ b/internal/core/command/controller/messaging/utils.go @@ -0,0 +1,99 @@ +// +// Copyright (C) 2022 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package messaging + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "strings" + + bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" + "github.com/edgexfoundry/go-mod-bootstrap/v2/di" + "github.com/edgexfoundry/go-mod-core-contracts/v2/common" + edgexErr "github.com/edgexfoundry/go-mod-core-contracts/v2/errors" + "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" + + "github.com/edgexfoundry/edgex-go/internal/core/command/application" +) + +// validateRequestTopic validates the request topic by checking the existence of device and device service, +// returns the internal device request topic to which the command request will be sent. +func validateRequestTopic(prefix string, deviceName string, commandName string, method string, dic *di.Container) (string, error) { + // retrieve device information through Metadata DeviceClient + dc := bootstrapContainer.DeviceClientFrom(dic.Get) + if dc == nil { + return "", errors.New("nil Device Client") + } + deviceResponse, err := dc.DeviceByName(context.Background(), deviceName) + if err != nil { + return "", fmt.Errorf("failed to get Device by name %s: %v", deviceName, err) + } + + // retrieve device service information through Metadata DeviceClient + dsc := bootstrapContainer.DeviceServiceClientFrom(dic.Get) + if dsc == nil { + return "", errors.New("nil DeviceService Client") + } + deviceServiceResponse, err := dsc.DeviceServiceByName(context.Background(), deviceResponse.Device.ServiceName) + if err != nil { + return "", fmt.Errorf("failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err) + } + + // expected internal command request topic scheme: #//// + return strings.Join([]string{prefix, deviceServiceResponse.Service.Name, deviceName, commandName, method}, "/"), nil + +} + +// getCommandQueryResponseEnvelope returns the MessageEnvelope containing the DeviceCoreCommand payload bytes +func getCommandQueryResponseEnvelope(requestEnvelope types.MessageEnvelope, deviceName string, dic *di.Container) (types.MessageEnvelope, edgexErr.EdgeX) { + var err error + var commands any + var edgexError edgexErr.EdgeX + + switch deviceName { + case common.All: + offset, limit := common.DefaultOffset, common.DefaultLimit + if requestEnvelope.QueryParams != nil { + if offsetRaw, ok := requestEnvelope.QueryParams[common.Offset]; ok { + offset, err = strconv.Atoi(offsetRaw) + if err != nil { + return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindContractInvalid, fmt.Sprintf("Failed to convert 'offset' query parameter to intger: %s", err.Error()), err) + } + } + if limitRaw, ok := requestEnvelope.QueryParams[common.Limit]; ok { + limit, err = strconv.Atoi(limitRaw) + if err != nil { + return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindContractInvalid, fmt.Sprintf("Failed to convert 'limit' query parameter to integer: %s", err.Error()), err) + } + } + } + + commands, _, edgexError = application.AllCommands(offset, limit, dic) + if edgexError != nil { + return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindServerError, fmt.Sprintf("Failed to get all commands: %s", edgexError.Error()), edgexError) + } + default: + commands, edgexError = application.CommandsByDeviceName(deviceName, dic) + if edgexError != nil { + return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindServerError, fmt.Sprintf("Failed to get commands by device name '%s': %s", deviceName, edgexError.Error()), edgexError) + } + } + + responseBytes, err := json.Marshal(commands) + if err != nil { + return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindServerError, fmt.Sprintf("Failed to json encoding device commands payload: %s", err.Error()), err) + } + + responseEnvelope, err := types.NewMessageEnvelopeForResponse(responseBytes, requestEnvelope.RequestID, requestEnvelope.CorrelationID, common.ContentTypeJSON) + if err != nil { + return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindServerError, fmt.Sprintf("Failed to create response MessageEnvelope: %s", err.Error()), err) + } + + return responseEnvelope, nil +} diff --git a/internal/core/command/main.go b/internal/core/command/main.go index 539856454e..ce480f3757 100644 --- a/internal/core/command/main.go +++ b/internal/core/command/main.go @@ -88,6 +88,7 @@ func Main(ctx context.Context, cancel context.CancelFunc, router *mux.Router) { // This is required for backwards compatability with older versions of 2.x configuration // TODO: Remove in EdgeX 3.0 func MessageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool { + lc := bootstrapContainer.LoggingClientFrom(dic.Get) configuration := container.ConfigurationFrom(dic.Get) if configuration.MessageQueue.Required { router := messaging.NewMessagingRouter() @@ -98,15 +99,17 @@ func MessageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startup return false } if err := messaging.SubscribeCommandRequests(ctx, router, dic); err != nil { - lc := bootstrapContainer.LoggingClientFrom(dic.Get) lc.Errorf("Failed to subscribe commands request from internal message bus, %v", err) return false } if err := messaging.SubscribeCommandResponses(ctx, router, dic); err != nil { - lc := bootstrapContainer.LoggingClientFrom(dic.Get) lc.Errorf("Failed to subscribe commands response from internal message bus, %v", err) return false } + if err := messaging.SubscribeCommandQueryRequests(ctx, dic); err != nil { + lc.Errorf("Failed to subscribe command query request from internal message bus, %v", err) + return false + } } // Not required so do nothing