diff --git a/cmd/core-command/res/configuration.toml b/cmd/core-command/res/configuration.toml index 3daadd1a96..312a6ca5d6 100644 --- a/cmd/core-command/res/configuration.toml +++ b/cmd/core-command/res/configuration.toml @@ -54,7 +54,7 @@ Type = "consul" AuthMode = "usernamepassword" # required for redis messagebus (secure or insecure). SecretName = "redisdb" [MessageQueue.Internal.Topics] - RequestTopicPrefix = "edgex/command/request/" # for publishing requests to the device service; /// will be added to this publish topic prefix + RequestTopicPrefix = "edgex/command/request" # for publishing requests to the device service; /// will be added to this publish topic prefix ResponseTopic = "edgex/command/response/#" # for subscribing to device service responses InternalRequestCommandTopic = "/command/request/#" # for subscribing to internal command requests InternalResponseCommandTopicPrefix = "/command/response/" # for publishing responses back to internal service /// will be added to this publish topic prefix @@ -90,7 +90,7 @@ Type = "consul" AuthMode = "none" [MessageQueue.External.Topics] RequestCommandTopic = "edgex/command/request/#" # for subscribing to 3rd party command requests - ResponseCommandTopicPrefix = "edgex/command/response/" # for publishing responses back to 3rd party systems /// will be added to this publish topic prefix + ResponseCommandTopicPrefix = "edgex/command/response" # for publishing responses back to 3rd party systems /// will be added to this publish topic prefix RequestQueryTopic = "edgex/commandquery/request/#" # for subscribing to 3rd party command query request ResponseQueryTopic = "edgex/commandquery/response" # for publishing responses back to 3rd party systems diff --git a/internal/core/command/controller/messaging/external/external.go b/internal/core/command/controller/messaging/external/external.go index ff28328091..90d5cc872b 100644 --- a/internal/core/command/controller/messaging/external/external.go +++ b/internal/core/command/controller/messaging/external/external.go @@ -6,6 +6,7 @@ package external import ( + "context" "encoding/json" "fmt" "strconv" @@ -25,8 +26,12 @@ import ( ) const ( - RequestQueryTopic = "RequestQueryTopic" - ResponseQueryTopic = "ResponseQueryTopic" + RequestQueryTopic = "RequestQueryTopic" + ResponseQueryTopic = "ResponseQueryTopic" + RequestCommandTopic = "RequestCommandTopic" + ResponseCommandTopicPrefix = "ResponseCommandTopicPrefix" + RequestTopicPrefix = "RequestTopicPrefix" + ResponseTopic = "ResponseTopic" ) func OnConnectHandler(dic *di.Container) mqtt.OnConnectHandler { @@ -41,7 +46,12 @@ func OnConnectHandler(dic *di.Container) mqtt.OnConnectHandler { responseQueryTopic := externalTopics[ResponseQueryTopic] if token := client.Subscribe(requestQueryTopic, qos, commandQueryHandler(responseQueryTopic, qos, retain, dic)); token.Wait() && token.Error() != nil { lc.Errorf("could not subscribe to topic '%s': %s", responseQueryTopic, token.Error().Error()) + return + } + requestCommandTopic := externalTopics[RequestCommandTopic] + if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(dic)); token.Wait() && token.Error() != nil { + lc.Errorf("could not subscribe to topic '%s': %s", responseQueryTopic, token.Error().Error()) return } } @@ -55,8 +65,8 @@ func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Co requestEnvelope, err := types.NewMessageEnvelopeFromJSON(message.Payload()) if err != nil { - responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) - publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc) + lc.Errorf("Failed to decode request MessageEnvelope: %s", err.Error()) + lc.Warn("Not publishing error message back due to insufficient information on response topic") return } @@ -124,6 +134,82 @@ func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Co } } +func commandRequestHandler(dic *di.Container) mqtt.MessageHandler { + return func(client mqtt.Client, message mqtt.Message) { + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue + qos := messageBusInfo.External.QoS + retain := messageBusInfo.External.Retain + + requestEnvelope, err := types.NewMessageEnvelopeFromJSON(message.Payload()) + if err != nil { + lc.Errorf("Failed to decode request MessageEnvelope: %s", err.Error()) + lc.Warn("Not publishing error message back due to insufficient information on response topic") + return + } + + // expected command request topic scheme: #/// + topicLevels := strings.Split(message.Topic(), "/") + length := len(topicLevels) + if length <= 3 { + lc.Error("Failed to parse and construct response topic scheme, expected request topic scheme: '#///") + lc.Warn("Not publishing error message back due to insufficient information on response topic") + return + } + deviceName := topicLevels[length-3] + commandName := topicLevels[length-2] + method := topicLevels[length-1] + if !strings.EqualFold(method, "get") && !strings.EqualFold(method, "set") { + lc.Errorf("Unknown request method: %s, only 'get' or 'set' is allowed", method) + lc.Warn("Not publishing error message back due to insufficient information on response topic") + return + } + // expected command response topic scheme: #/// + externalResponseTopic := strings.Join([]string{messageBusInfo.External.Topics[ResponseCommandTopicPrefix], deviceName, commandName, method}, "/") + + // retrieve device information through Metadata DeviceClient + dc := bootstrapContainer.DeviceClientFrom(dic.Get) + if dc == nil { + responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, "nil Device Client") + publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc) + return + } + deviceResponse, err := dc.DeviceByName(context.Background(), deviceName) + if err != nil { + errorMessage := fmt.Sprintf("Failed to get Device by name %s: %v", deviceName, err) + responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage) + publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc) + return + } + + // retrieve device service information through Metadata DeviceClient + dsc := bootstrapContainer.DeviceServiceClientFrom(dic.Get) + if dsc == nil { + responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, "nil DeviceService Client") + publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc) + return + } + deviceServiceResponse, err := dsc.DeviceServiceByName(context.Background(), deviceResponse.Device.ServiceName) + if err != nil { + errorMessage := fmt.Sprintf("Failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err) + responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage) + publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc) + return + } + + // expected internal command request topic scheme: #//// + internalRequestTopic := strings.Join([]string{messageBusInfo.Internal.Topics[RequestTopicPrefix], deviceServiceResponse.Service.Name, deviceName, commandName, method}, "/") + internalMessageBus := bootstrapContainer.MessagingClientFrom(dic.Get) + err = internalMessageBus.Publish(requestEnvelope, internalRequestTopic) + if err != nil { + errorMessage := fmt.Sprintf("Failed to send DeviceCommand request with internal MessageBus: %v", err) + responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage) + publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc) + return + } + } +} + func publishMessage(client mqtt.Client, responseTopic string, qos byte, retain bool, message types.MessageEnvelope, lc logger.LoggingClient) { if message.ErrorCode == 1 { lc.Error(string(message.Payload)) diff --git a/internal/core/command/controller/messaging/external/external_test.go b/internal/core/command/controller/messaging/external/external_test.go index 189e8f2523..89881c9cd1 100644 --- a/internal/core/command/controller/messaging/external/external_test.go +++ b/internal/core/command/controller/messaging/external/external_test.go @@ -10,6 +10,7 @@ import ( "encoding/json" "errors" "net/http" + "strings" "testing" bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" @@ -21,27 +22,38 @@ import ( "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos" commonDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/common" "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/responses" + edgexErr "github.com/edgexfoundry/go-mod-core-contracts/v2/errors" + "github.com/edgexfoundry/go-mod-messaging/v2/messaging/mocks" "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/edgexfoundry/edgex-go/internal/core/command/config" "github.com/edgexfoundry/edgex-go/internal/core/command/container" - "github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/mocks" + mqttMocks "github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/mocks" ) const ( mockHost = "127.0.0.1" mockPort = 66666 - testProfileName = "testProfile" - testResourceName = "testResource" - testDeviceName = "testDevice" + testProfileName = "testProfile" + testResourceName = "testResource" + testDeviceName = "testDevice" + testDeviceServiceName = "testService" + testCommandName = "testCommand" + testMethod = "get" - testRequestQueryTopic = "unittest/#" - testRequestQueryAllTopic = "unittest/all" - testRequestQueryByDeviceNameTopic = "unittest/testDevice" - testResponseTopic = "unittest/response" + testQueryRequestTopic = "unittest/#" + testQueryAllExample = "unittest/all" + testQueryByDeviceNameExample = "unittest/testDevice" + testQueryResponseTopic = "unittest/response" + + testExternalCommandRequestTopic = "unittest/external/request/#" + testExternalCommandRequestTopicExample = "unittest/external/request/testDevice/testCommand/get" + testExternalCommandResponseTopicPrefix = "unittest/external/response" + + testInternalCommandRequestTopicPrefix = "unittest/internal/request" ) func TestOnConnectHandler(t *testing.T) { @@ -54,8 +66,10 @@ func TestOnConnectHandler(t *testing.T) { Required: false, External: bootstrapConfig.ExternalMQTTInfo{ Topics: map[string]string{ - RequestQueryTopic: testRequestQueryTopic, - ResponseQueryTopic: testResponseTopic, + RequestQueryTopic: testQueryRequestTopic, + ResponseQueryTopic: testQueryResponseTopic, + RequestCommandTopic: testExternalCommandRequestTopic, + ResponseCommandTopicPrefix: testExternalCommandResponseTopicPrefix, }, QoS: 0, Retain: true, @@ -77,7 +91,7 @@ func TestOnConnectHandler(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - token := &mocks.Token{} + token := &mqttMocks.Token{} token.On("Wait").Return(true) if tt.expectedSucceed { token.On("Error").Return(nil) @@ -85,16 +99,19 @@ func TestOnConnectHandler(t *testing.T) { token.On("Error").Return(errors.New("error")) } - client := &mocks.Client{} - client.On("Subscribe", testRequestQueryTopic, byte(0), mock.Anything).Return(token) + client := &mqttMocks.Client{} + client.On("Subscribe", testQueryRequestTopic, byte(0), mock.Anything).Return(token) + client.On("Subscribe", testExternalCommandRequestTopic, byte(0), mock.Anything).Return(token) fn := OnConnectHandler(dic) fn(client) - client.AssertCalled(t, "Subscribe", testRequestQueryTopic, byte(0), mock.Anything) - if !tt.expectedSucceed { - lc.AssertCalled(t, "Errorf", mock.Anything, mock.Anything, mock.Anything) + if tt.expectedSucceed { + client.AssertNumberOfCalls(t, "Subscribe", 2) + return } + + lc.AssertCalled(t, "Errorf", mock.Anything, mock.Anything, mock.Anything) }) } } @@ -136,7 +153,9 @@ func Test_commandQueryHandler(t *testing.T) { lc := &lcMocks.LoggingClient{} lc.On("Error", mock.Anything).Return(nil) + lc.On("Errorf", mock.Anything, mock.Anything).Return(nil) lc.On("Debugf", mock.Anything, mock.Anything, mock.Anything).Return(nil) + lc.On("Warn", mock.Anything).Return(nil) dc := &clientMocks.DeviceClient{} dc.On("AllDevices", context.Background(), []string(nil), common.DefaultOffset, common.DefaultLimit).Return(allDevicesResponse, nil) dc.On("DeviceByName", context.Background(), testDeviceName).Return(deviceResponse, nil) @@ -163,51 +182,202 @@ func Test_commandQueryHandler(t *testing.T) { }, }) - validPayload := testPayload() - invalidRequestPayload := testPayload() + validPayload := testCommandQueryPayload() + invalidRequestPayload := testCommandQueryPayload() invalidRequestPayload.ApiVersion = "v1" - invalidQueryParamsPayload := testPayload() + invalidQueryParamsPayload := testCommandQueryPayload() invalidQueryParamsPayload.QueryParams[common.Offset] = "invalid" tests := []struct { - name string - requestQueryTopic string - payload types.MessageEnvelope - expectedError bool + name string + requestQueryTopic string + payload types.MessageEnvelope + expectedError bool + expectedPublishError bool }{ - {"valid - query all", testRequestQueryAllTopic, validPayload, false}, - {"valid - query by device name", testRequestQueryByDeviceNameTopic, validPayload, false}, - {"invalid - invalid request json payload", testRequestQueryByDeviceNameTopic, invalidRequestPayload, true}, - {"invalid - invalid query parameters", testRequestQueryAllTopic, invalidQueryParamsPayload, true}, + {"valid - query all", testQueryAllExample, validPayload, false, false}, + {"valid - query by device name", testQueryByDeviceNameExample, validPayload, false, false}, + {"invalid - invalid request json payload", testQueryByDeviceNameExample, invalidRequestPayload, true, false}, + {"invalid - invalid query parameters", testQueryAllExample, invalidQueryParamsPayload, true, true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { payloadBytes, err := json.Marshal(tt.payload) require.NoError(t, err) - message := &mocks.Message{} + message := &mqttMocks.Message{} message.On("Payload").Return(payloadBytes) message.On("Topic").Return(tt.requestQueryTopic) - token := &mocks.Token{} + token := &mqttMocks.Token{} token.On("Wait").Return(true) token.On("Error").Return(nil) - client := &mocks.Client{} - client.On("Publish", testResponseTopic, byte(0), true, mock.Anything).Return(token) + mqttClient := &mqttMocks.Client{} + mqttClient.On("Publish", testQueryResponseTopic, byte(0), true, mock.Anything).Return(token) - fn := commandQueryHandler(testResponseTopic, 0, true, dic) - fn(client, message) + fn := commandQueryHandler(testQueryResponseTopic, 0, true, dic) + fn(mqttClient, message) + if tt.expectedError { + if tt.expectedPublishError { + lc.AssertCalled(t, "Error", mock.Anything) + mqttClient.AssertCalled(t, "Publish", testQueryResponseTopic, byte(0), true, mock.Anything) + return + } + lc.AssertCalled(t, "Warn", mock.Anything) + return + } + + mqttClient.AssertCalled(t, "Publish", testQueryResponseTopic, byte(0), true, mock.Anything) lc.AssertCalled(t, "Debugf", mock.Anything, mock.Anything, mock.Anything) + }) + } +} + +func Test_commandRequestHandler(t *testing.T) { + unknownDevice := "unknown-device" + unknownServiceDevice := "unknownService-device" + unknownService := "unknown-service" + + deviceResponse := responses.DeviceResponse{ + BaseResponse: commonDTO.NewBaseResponse("", "", http.StatusOK), + Device: dtos.Device{ + Name: testDeviceName, + ProfileName: testProfileName, + ServiceName: testDeviceServiceName, + }, + } + unknownServiceDeviceResponse := responses.DeviceResponse{ + BaseResponse: commonDTO.NewBaseResponse("", "", http.StatusOK), + Device: dtos.Device{ + Name: unknownServiceDevice, + ProfileName: testProfileName, + ServiceName: unknownService, + }, + } + deviceServiceResponse := responses.DeviceServiceResponse{ + BaseResponse: commonDTO.NewBaseResponse("", "", http.StatusOK), + Service: dtos.DeviceService{ + Name: testDeviceServiceName, + }, + } + + lc := &lcMocks.LoggingClient{} + lc.On("Error", mock.Anything).Return(nil) + lc.On("Errorf", mock.Anything, mock.Anything).Return(nil) + lc.On("Debugf", mock.Anything, mock.Anything, mock.Anything).Return(nil) + lc.On("Warn", mock.Anything).Return(nil) + dc := &clientMocks.DeviceClient{} + dc.On("DeviceByName", context.Background(), testDeviceName).Return(deviceResponse, nil) + dc.On("DeviceByName", context.Background(), unknownDevice).Return(responses.DeviceResponse{}, edgexErr.NewCommonEdgeX(edgexErr.KindEntityDoesNotExist, "unknown device", nil)) + dc.On("DeviceByName", context.Background(), unknownServiceDevice).Return(unknownServiceDeviceResponse, nil) + dsc := &clientMocks.DeviceServiceClient{} + dsc.On("DeviceServiceByName", context.Background(), testDeviceServiceName).Return(deviceServiceResponse, nil) + dsc.On("DeviceServiceByName", context.Background(), unknownService).Return(responses.DeviceServiceResponse{}, edgexErr.NewCommonEdgeX(edgexErr.KindEntityDoesNotExist, "unknown device service", nil)) + client := &mocks.MessageClient{} + client.On("Publish", mock.Anything, mock.Anything).Return(nil) + dic := di.NewContainer(di.ServiceConstructorMap{ + container.ConfigurationName: func(get di.Get) interface{} { + return &config.ConfigurationStruct{ + Service: bootstrapConfig.ServiceInfo{ + Host: mockHost, + Port: mockPort, + MaxResultCount: 20, + }, + MessageQueue: config.MessageQueue{ + Required: true, + Internal: bootstrapConfig.MessageBusInfo{ + Topics: map[string]string{ + RequestTopicPrefix: testInternalCommandRequestTopicPrefix, + }, + }, + External: bootstrapConfig.ExternalMQTTInfo{ + QoS: 0, + Retain: true, + Topics: map[string]string{ + ResponseCommandTopicPrefix: testExternalCommandResponseTopicPrefix, + }, + }, + }, + } + }, + bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} { + return lc + }, + bootstrapContainer.DeviceClientName: func(get di.Get) interface{} { + return dc + }, + bootstrapContainer.DeviceServiceClientName: func(get di.Get) interface{} { + return dsc + }, + bootstrapContainer.MessagingClientName: func(get di.Get) interface{} { + return client + }, + }) + + validPayload := testCommandRequestPayload() + invalidRequestPayload := testCommandRequestPayload() + invalidRequestPayload.ApiVersion = "v1" + + tests := []struct { + name string + commandRequestTopic string + payload types.MessageEnvelope + expectedError bool + expectedPublishError bool + }{ + {"valid", testExternalCommandRequestTopicExample, validPayload, false, false}, + {"invalid - invalid request json payload", testExternalCommandRequestTopicExample, invalidRequestPayload, true, false}, + {"invalid - invalid request topic scheme", "unittest/invalid", validPayload, true, false}, + {"invalid - unrecognized command method", "unittest/request/testDevice/testCommand/invalid", validPayload, true, false}, + {"invalid - device not found", "unittest/request/unknown-device/testCommand/get", validPayload, true, true}, + {"invalid - device service not found", "unittest/request/unknownService-device/testCommand/get", validPayload, true, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + payloadBytes, err := json.Marshal(tt.payload) + require.NoError(t, err) + + message := &mqttMocks.Message{} + message.On("Payload").Return(payloadBytes) + message.On("Topic").Return(tt.commandRequestTopic) + + token := &mqttMocks.Token{} + token.On("Wait").Return(true) + token.On("Error").Return(nil) + + mqttClient := &mqttMocks.Client{} + mqttClient.On("Publish", mock.Anything, byte(0), true, mock.Anything).Return(token) + + fn := commandRequestHandler(dic) + fn(mqttClient, message) if tt.expectedError { - lc.AssertCalled(t, "Error", mock.Anything) + if tt.expectedPublishError { + lc.AssertCalled(t, "Error", mock.Anything) + mqttClient.AssertCalled(t, "Publish", mock.Anything, byte(0), true, mock.Anything) + return + } + lc.AssertCalled(t, "Warn", mock.Anything) + return } + + expectedInternalRequestTopic := strings.Join([]string{testInternalCommandRequestTopicPrefix, testDeviceServiceName, testDeviceName, testCommandName, testMethod}, "/") + client.AssertCalled(t, "Publish", tt.payload, expectedInternalRequestTopic) }) } } -func testPayload() types.MessageEnvelope { +func testCommandQueryPayload() types.MessageEnvelope { payload := types.NewMessageEnvelopeForRequest(nil, nil) return payload } + +func testCommandRequestPayload() types.MessageEnvelope { + payload := types.NewMessageEnvelopeForRequest(nil, map[string]string{ + "ds-pushevent": "yes", + "ds-returnevent": "yes", + }) + + return payload +}