Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(command): publish device service response to external MQTT #4166

Merged
merged 3 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//
// SPDX-License-Identifier: Apache-2.0

package external
package messaging

import (
"context"
Expand Down Expand Up @@ -34,7 +34,7 @@ const (
ResponseTopic = "ResponseTopic"
)

func OnConnectHandler(dic *di.Container) mqtt.OnConnectHandler {
func OnConnectHandler(router MessagingRouter, dic *di.Container) mqtt.OnConnectHandler {
return func(client mqtt.Client) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
config := container.ConfigurationFrom(dic.Get)
Expand All @@ -50,7 +50,7 @@ func OnConnectHandler(dic *di.Container) mqtt.OnConnectHandler {
}

requestCommandTopic := externalTopics[RequestCommandTopic]
if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(dic)); token.Wait() && token.Error() != nil {
if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(router, dic)); token.Wait() && token.Error() != nil {
lc.Errorf("could not subscribe to topic '%s': %s", responseQueryTopic, token.Error().Error())
return
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Co
}
}

func commandRequestHandler(dic *di.Container) mqtt.MessageHandler {
func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.MessageHandler {
return func(client mqtt.Client, message mqtt.Message) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue
Expand All @@ -151,7 +151,7 @@ func commandRequestHandler(dic *di.Container) mqtt.MessageHandler {
// expected command request topic scheme: #/<device>/<command-name>/<method>
topicLevels := strings.Split(message.Topic(), "/")
length := len(topicLevels)
if length <= 3 {
if length < 3 {
lc.Error("Failed to parse and construct response topic scheme, expected request topic scheme: '#/<device-name>/<command-name>/<method>")
lc.Warn("Not publishing error message back due to insufficient information on response topic")
return
Expand Down Expand Up @@ -207,6 +207,8 @@ func commandRequestHandler(dic *di.Container) mqtt.MessageHandler {
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}

router.SetResponseTopic(requestEnvelope.RequestID, externalResponseTopic, true)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//
// SPDX-License-Identifier: Apache-2.0

package external
package messaging

import (
"context"
Expand All @@ -23,14 +23,14 @@ import (
commonDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/common"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/responses"
edgexErr "github.com/edgexfoundry/go-mod-core-contracts/v2/errors"
"github.com/edgexfoundry/go-mod-messaging/v2/messaging/mocks"
internalMessagingMocks "github.com/edgexfoundry/go-mod-messaging/v2/messaging/mocks"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/edgexfoundry/edgex-go/internal/core/command/config"
"github.com/edgexfoundry/edgex-go/internal/core/command/container"
mqttMocks "github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/mocks"
"github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/mocks"
)

const (
Expand All @@ -57,6 +57,7 @@ const (
)

func TestOnConnectHandler(t *testing.T) {
mockRouter := &mocks.MessagingRouter{}
lc := &lcMocks.LoggingClient{}
lc.On("Errorf", mock.Anything, mock.Anything, mock.Anything).Return(nil)
dic := di.NewContainer(di.ServiceConstructorMap{
Expand Down Expand Up @@ -91,19 +92,19 @@ func TestOnConnectHandler(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
token := &mqttMocks.Token{}
token := &mocks.Token{}
token.On("Wait").Return(true)
if tt.expectedSucceed {
token.On("Error").Return(nil)
} else {
token.On("Error").Return(errors.New("error"))
}

client := &mqttMocks.Client{}
client := &mocks.Client{}
client.On("Subscribe", testQueryRequestTopic, byte(0), mock.Anything).Return(token)
client.On("Subscribe", testExternalCommandRequestTopic, byte(0), mock.Anything).Return(token)

fn := OnConnectHandler(dic)
fn := OnConnectHandler(mockRouter, dic)
fn(client)

if tt.expectedSucceed {
Expand Down Expand Up @@ -205,15 +206,15 @@ func Test_commandQueryHandler(t *testing.T) {
payloadBytes, err := json.Marshal(tt.payload)
require.NoError(t, err)

message := &mqttMocks.Message{}
message := &mocks.Message{}
message.On("Payload").Return(payloadBytes)
message.On("Topic").Return(tt.requestQueryTopic)

token := &mqttMocks.Token{}
token := &mocks.Token{}
token.On("Wait").Return(true)
token.On("Error").Return(nil)

mqttClient := &mqttMocks.Client{}
mqttClient := &mocks.Client{}
mqttClient.On("Publish", testQueryResponseTopic, byte(0), true, mock.Anything).Return(token)

fn := commandQueryHandler(testQueryResponseTopic, 0, true, dic)
Expand Down Expand Up @@ -262,6 +263,8 @@ func Test_commandRequestHandler(t *testing.T) {
},
}

mockRouter := &mocks.MessagingRouter{}
mockRouter.On("SetResponseTopic", mock.Anything, mock.Anything, mock.Anything).Return(nil)
lc := &lcMocks.LoggingClient{}
lc.On("Error", mock.Anything).Return(nil)
lc.On("Errorf", mock.Anything, mock.Anything).Return(nil)
Expand All @@ -274,7 +277,7 @@ func Test_commandRequestHandler(t *testing.T) {
dsc := &clientMocks.DeviceServiceClient{}
dsc.On("DeviceServiceByName", context.Background(), testDeviceServiceName).Return(deviceServiceResponse, nil)
dsc.On("DeviceServiceByName", context.Background(), unknownService).Return(responses.DeviceServiceResponse{}, edgexErr.NewCommonEdgeX(edgexErr.KindEntityDoesNotExist, "unknown device service", nil))
client := &mocks.MessageClient{}
client := &internalMessagingMocks.MessageClient{}
client.On("Publish", mock.Anything, mock.Anything).Return(nil)
dic := di.NewContainer(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
Expand Down Expand Up @@ -338,18 +341,18 @@ func Test_commandRequestHandler(t *testing.T) {
payloadBytes, err := json.Marshal(tt.payload)
require.NoError(t, err)

message := &mqttMocks.Message{}
message := &mocks.Message{}
message.On("Payload").Return(payloadBytes)
message.On("Topic").Return(tt.commandRequestTopic)

token := &mqttMocks.Token{}
token := &mocks.Token{}
token.On("Wait").Return(true)
token.On("Error").Return(nil)

mqttClient := &mqttMocks.Client{}
mqttClient := &mocks.Client{}
mqttClient.On("Publish", mock.Anything, byte(0), true, mock.Anything).Return(token)

fn := commandRequestHandler(dic)
fn := commandRequestHandler(mockRouter, dic)
fn(mqttClient, message)
if tt.expectedError {
if tt.expectedPublishError {
Expand Down
67 changes: 67 additions & 0 deletions internal/core/command/controller/messaging/internal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//
// Copyright (C) 2022 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package messaging

import (
"context"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
"github.com/edgexfoundry/go-mod-core-contracts/v2/errors"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

"github.com/edgexfoundry/edgex-go/internal/core/command/container"
)

func SubscribeCommandResponses(ctx context.Context, router MessagingRouter, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue
internalResponseTopic := messageBusInfo.Internal.Topics[ResponseTopic]

messages := make(chan types.MessageEnvelope)
messageErrors := make(chan error)
topics := []types.TopicChannel{
{
Topic: internalResponseTopic,
Messages: messages,
},
}

messageBus := bootstrapContainer.MessagingClientFrom(dic.Get)
err := messageBus.Subscribe(topics, messageErrors)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}

qos := messageBusInfo.External.QoS
retain := messageBusInfo.External.Retain
externalMQTT := bootstrapContainer.ExternalMQTTMessagingClientFrom(dic.Get)
go func() {
for {
select {
case <-ctx.Done():
lc.Infof("Exiting waiting for MessageBus '%s' topic messages", internalResponseTopic)
return
case err = <-messageErrors:
lc.Error(err.Error())
case msgEnvelope := <-messages:
lc.Debugf("Command response received on message queue. Topic: %s, Correlation-id: %s ", msgEnvelope.ReceivedTopic, msgEnvelope.CorrelationID)

responseTopic, external, err := router.ResponseTopic(msgEnvelope.RequestID)
if err != nil {
lc.Errorf("Received RequestEnvelope with unknown RequestId %s", msgEnvelope.RequestID)
continue
}

if external {
publishMessage(externalMQTT, responseTopic, qos, retain, msgEnvelope, lc)
}
cloudxxx8 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}()

return nil
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions internal/core/command/controller/messaging/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//
// Copyright (C) 2022 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package messaging

import (
"errors"
"sync"
)

// MessagingRouter defines interface for Command Service to know
// where to route the receiving device command response.
type MessagingRouter interface {
// ResponseTopic returns the responseTopicPrefix by requestId, and a boolean value
// indicates its original source(external MQTT or internal MessageBus).
ResponseTopic(requestId string) (string, bool, error)
// SetResponseTopic sets the responseTopicPrefix with RequestId as the key
SetResponseTopic(requestId string, topic string, external bool)
}

func NewMessagingRouter() MessagingRouter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this Router determining which bus the response message goes to?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad I forgot the most cirtical problem ...

A quick thought came to my mind is to maintain 2 maps internally :

func (r *MessagingRouter) Get(id string) (string, bool) {
    topic, ok := r.ExternalMap[id]
    if ok {
        return topic, true
    }

    topic, _ = r.InternalMap[id]
    return topic, false
}

func (r *MessagingRouter) Set(id string, topic string, source string) {
    switch source {
    case "internal":
        r.InternalMap[id] = topic
    case "external":
        r.ExternalMap[id] = topic
    }
}

Will try to improve it or find if there's better solution tomorrow ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that was what I had in mind.

return &router{
internalCommandRequestMap: make(map[string]string),
externalCommandRequestMap: make(map[string]string),
}
}

type router struct {
mutex sync.Mutex
internalCommandRequestMap map[string]string
externalCommandRequestMap map[string]string
}

func (r *router) ResponseTopic(requestId string) (string, bool, error) {
r.mutex.Lock()
defer r.mutex.Unlock()

topic, ok := r.externalCommandRequestMap[requestId]
if ok {
delete(r.externalCommandRequestMap, requestId)
return topic, true, nil
}

topic, ok = r.internalCommandRequestMap[requestId]
if ok {
delete(r.internalCommandRequestMap, requestId)
cloudxxx8 marked this conversation as resolved.
Show resolved Hide resolved
return topic, false, nil
}

return "", false, errors.New("requestId not found")
}

func (r *router) SetResponseTopic(requestId string, topic string, external bool) {
r.mutex.Lock()
defer r.mutex.Unlock()

if external {
r.externalCommandRequestMap[requestId] = topic
return
}

r.internalCommandRequestMap[requestId] = topic
}
11 changes: 9 additions & 2 deletions internal/core/command/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"

"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap"
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/flags"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/handlers"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces"
Expand All @@ -31,7 +32,7 @@ import (
"github.com/edgexfoundry/edgex-go/internal"
"github.com/edgexfoundry/edgex-go/internal/core/command/config"
"github.com/edgexfoundry/edgex-go/internal/core/command/container"
"github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/external"
"github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging"
"github.com/edgexfoundry/edgex-go/internal/pkg/telemetry"

"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
Expand Down Expand Up @@ -89,10 +90,16 @@ func Main(ctx context.Context, cancel context.CancelFunc, router *mux.Router) {
func MessageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool {
configuration := container.ConfigurationFrom(dic.Get)
if configuration.MessageQueue.Required {
router := messaging.NewMessagingRouter()
if !handlers.MessagingBootstrapHandler(ctx, wg, startupTimer, dic) {
return false
}
if !handlers.NewExternalMQTT(external.OnConnectHandler(dic)).BootstrapHandler(ctx, wg, startupTimer, dic) {
if !handlers.NewExternalMQTT(messaging.OnConnectHandler(router, dic)).BootstrapHandler(ctx, wg, startupTimer, dic) {
return false
}
if err := messaging.SubscribeCommandResponses(ctx, router, dic); err != nil {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
lc.Errorf("Failed to subscribe commands from message bus, %v", err)
return false
}
}
Expand Down