Skip to content

Commit

Permalink
feat: Allow name field escape configurable
Browse files Browse the repository at this point in the history
Add config and modify topic build logic to allow name field escape configurable.

Close edgexfoundry#4673

Signed-off-by: bruce <[email protected]>
  • Loading branch information
weichou1229 committed Sep 14, 2023
1 parent 9317e2a commit 6380a38
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 41 deletions.
1 change: 1 addition & 0 deletions cmd/core-common-config-bootstrapper/res/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ all-services:
MaxResultCount: 1024
MaxRequestSize: 0 # Not currently used. Defines the maximum size of http request body in bytes
RequestTimeout: "5s"
EnableNameFieldEscape: false # The name field escape could allow the system to use special or Chinese characters in the different name fields, including device, profile, and so on. If the EnableNameFieldEscape is false, some special characters might cause system error. TODO: This is set to false by default to avoid breaking change and will be removed in EdgeX 4.0
CORSConfiguration:
EnableCORS: false
CORSAllowCredentials: false
Expand Down
22 changes: 11 additions & 11 deletions internal/core/command/controller/messaging/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func commandQueryHandler(dic *di.Container) mqtt.MessageHandler {
func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt.MessageHandler {
return func(client mqtt.Client, message mqtt.Message) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
config := container.ConfigurationFrom(dic.Get)
lc.Debugf("Received command request from external message broker on topic '%s' with %d bytes", message.Topic(), len(message.Payload()))

externalMQTTInfo := container.ConfigurationFrom(dic.Get).ExternalMQTT
Expand All @@ -117,17 +118,15 @@ func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt
}

// expected external command request/response topic scheme: #/<device-name>/<command-name>/<method>
deviceName := topicLevels[length-3]
unescapedDeviceName, err := url.PathUnescape(deviceName)
deviceName, err := url.PathUnescape(topicLevels[length-3])
if err != nil {
lc.Errorf("Failed to unescape device name '%s': %s", deviceName, err.Error())
lc.Errorf("Failed to unescape device name from '%s': %s", topicLevels[length-3], err.Error())
lc.Warn("Not publishing error message back due to insufficient information on response topic")
return
}
commandName := topicLevels[length-2]
unescapedCommandName, err := url.PathUnescape(commandName)
commandName, err := url.PathUnescape(topicLevels[length-2])
if err != nil {
lc.Errorf("Failed to unescape command name '%s': %s", commandName, err.Error())
lc.Errorf("Failed to unescape command name from '%s': %s", topicLevels[length-2], err.Error())
lc.Warn("Not publishing error message back due to insufficient information on response topic")
return
}
Expand All @@ -140,10 +139,10 @@ func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt

externalResponseTopic := common.BuildTopic(externalMQTTInfo.Topics[common.ExternalCommandResponseTopicPrefixKey], deviceName, commandName, method)

internalBaseTopic := container.ConfigurationFrom(dic.Get).MessageBus.GetBaseTopicPrefix()
internalBaseTopic := config.MessageBus.GetBaseTopicPrefix()
topicPrefix := common.BuildTopic(internalBaseTopic, common.CoreCommandDeviceRequestPublishTopic)

deviceServiceName, err := retrieveServiceNameByDevice(unescapedDeviceName, dic)
deviceServiceName, err := retrieveServiceNameByDevice(deviceName, dic)
if err != nil {
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error())
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
Expand All @@ -157,9 +156,10 @@ func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt
return
}

// escape again to ensure that the topic is valid in the internal message bus
deviceRequestTopic := common.BuildTopic(topicPrefix, common.URLEncode(deviceServiceName), common.URLEncode(unescapedDeviceName), common.URLEncode(unescapedCommandName), method)
deviceResponseTopicPrefix := common.BuildTopic(internalBaseTopic, common.ResponseTopic, common.URLEncode(deviceServiceName))
deviceRequestTopic := common.NewPathBuilder().EnableNameFieldEscape(config.Service.EnableNameFieldEscape).
SetPath(topicPrefix).SetNameFieldPath(deviceServiceName).SetNameFieldPath(deviceName).SetNameFieldPath(commandName).SetPath(method).BuildPath()
deviceResponseTopicPrefix := common.NewPathBuilder().EnableNameFieldEscape(config.Service.EnableNameFieldEscape).
SetPath(internalBaseTopic).SetPath(common.ResponseTopic).SetNameFieldPath(deviceServiceName).BuildPath()

lc.Debugf("Sending Command request to internal MessageBus. Topic: %s, Request-id: %s Correlation-id: %s", deviceRequestTopic, requestEnvelope.RequestID, requestEnvelope.CorrelationID)
lc.Debugf("Expecting response on topic: %s/%s", deviceResponseTopicPrefix, requestEnvelope.RequestID)
Expand Down
19 changes: 10 additions & 9 deletions internal/core/command/controller/messaging/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func processDeviceCommandRequest(
lc logger.LoggingClient,
dic *di.Container) {
var err error
config := container.ConfigurationFrom(dic.Get)

lc.Debugf("Command device request received on internal MessageBus. Topic: %s, Request-id: %s, Correlation-id: %s", requestEnvelope.ReceivedTopic, requestEnvelope.RequestID, requestEnvelope.CorrelationID)

Expand All @@ -98,17 +99,15 @@ func processDeviceCommandRequest(
}

// expected internal command request/response topic scheme: #/<device>/<command-name>/<method>
deviceName := topicLevels[length-3]
unescapedDeviceName, err := url.PathUnescape(deviceName)
deviceName, err := url.PathUnescape(topicLevels[length-3])
if err != nil {
lc.Errorf("Failed to unescape device name '%s': %s", deviceName, err.Error())
lc.Errorf("Failed to unescape device name from '%s': %s", topicLevels[length-3], err.Error())
lc.Warn("Not publishing error message back due to insufficient information on response topic")
return
}
commandName := topicLevels[length-2]
unescapedCommandName, err := url.PathUnescape(commandName)
commandName, err := url.PathUnescape(topicLevels[length-2])
if err != nil {
err = fmt.Errorf("failed to unescape command name '%s': %s", commandName, err.Error())
err = fmt.Errorf("failed to unescape command name from '%s': %s", topicLevels[length-2], err.Error())
lc.Error(err.Error())
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error())
err = messageBus.Publish(responseEnvelope, internalResponseTopic)
Expand All @@ -131,7 +130,7 @@ func processDeviceCommandRequest(

topicPrefix := common.BuildTopic(baseTopic, common.CoreCommandDeviceRequestPublishTopic)
// internal command request topic scheme: <DeviceRequestTopicPrefix>/<device-service>/<device>/<command-name>/<method>
deviceServiceName, err := retrieveServiceNameByDevice(unescapedDeviceName, dic)
deviceServiceName, err := retrieveServiceNameByDevice(deviceName, dic)
if err != nil {
err = fmt.Errorf("invalid request topic: %s", err.Error())
lc.Error(err.Error())
Expand All @@ -154,8 +153,10 @@ func processDeviceCommandRequest(
return
}

deviceRequestTopic := common.BuildTopic(topicPrefix, common.URLEncode(deviceServiceName), common.URLEncode(unescapedDeviceName), common.URLEncode(unescapedCommandName), method)
deviceResponseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, common.URLEncode(deviceServiceName))
deviceRequestTopic := common.NewPathBuilder().EnableNameFieldEscape(config.Service.EnableNameFieldEscape).
SetPath(topicPrefix).SetNameFieldPath(deviceServiceName).SetNameFieldPath(deviceName).SetNameFieldPath(commandName).SetPath(method).BuildPath()
deviceResponseTopicPrefix := common.NewPathBuilder().EnableNameFieldEscape(config.Service.EnableNameFieldEscape).
SetPath(baseTopic).SetPath(common.ResponseTopic).SetNameFieldPath(deviceServiceName).BuildPath()

lc.Debugf("Sending Command Device Request to internal MessageBus. Topic: %s, Correlation-id: %s", deviceRequestTopic, requestEnvelope.CorrelationID)
lc.Debugf("Expecting response on topic: %s/%s", deviceResponseTopicPrefix, requestEnvelope.RequestID)
Expand Down
9 changes: 7 additions & 2 deletions internal/core/command/controller/messaging/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func TestSubscribeCommandRequests(t *testing.T) {
expectedDevice := "device1"
expectedResource := "resource"
expectedMethod := "get"
expectedDeviceResponseTopicPrefix := strings.Join([]string{expectedResponseTopicPrefix, common.URLEncode(expectedServiceName)}, "/")
expectedDeviceResponseTopicPrefix := strings.Join([]string{expectedResponseTopicPrefix, expectedServiceName}, "/")
expectedCommandResponseTopic := strings.Join([]string{expectedResponseTopicPrefix, common.CoreCommandServiceKey, expectedRequestId}, "/")
expectedCommandRequestSubscribeTopic := common.BuildTopic(baseTopic, common.CoreCommandRequestSubscribeTopic)
expectedCommandRequestReceivedTopic := common.BuildTopic(strings.Replace(expectedCommandRequestSubscribeTopic, "/#", "", 1),
expectedServiceName, expectedDevice, expectedResource, expectedMethod)
expectedDeviceCommandRequestRequestTopic := common.BuildTopic(baseTopic, common.CoreCommandDeviceRequestPublishTopic, common.URLEncode(expectedServiceName), common.URLEncode(expectedDevice), common.URLEncode(expectedResource), expectedMethod)
expectedDeviceCommandRequestRequestTopic := common.BuildTopic(baseTopic, common.CoreCommandDeviceRequestPublishTopic, expectedServiceName, expectedDevice, expectedResource, expectedMethod)
mockLogger := &lcMocks.LoggingClient{}
mockDeviceClient := &mocks2.DeviceClient{}
mockDeviceProfileClient := &mocks2.DeviceProfileClient{}
Expand Down Expand Up @@ -128,6 +128,11 @@ func TestSubscribeCommandRequests(t *testing.T) {
bootstrapContainer.DeviceServiceClientName: func(get di.Get) interface{} {
return mockDeviceServiceClient
},
bootstrapContainer.ConfigurationInterfaceName: func(get di.Get) interface{} {
return &config.ConfigurationStruct{
Service: config2.ServiceInfo{EnableNameFieldEscape: false},
}
},
})

err := SubscribeCommandRequests(context.Background(), time.Second*5, dic)
Expand Down
7 changes: 4 additions & 3 deletions internal/core/command/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"
"sync"

"github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
"github.com/edgexfoundry/edgex-go/internal/core/command/container"
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/secret"
"github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/startup"
Expand All @@ -47,12 +47,13 @@ func NewBootstrap(router *echo.Echo, serviceName string) *Bootstrap {
// BootstrapHandler fulfills the BootstrapHandler contract and performs initialization needed by the command service.
func (b *Bootstrap) BootstrapHandler(ctx context.Context, wg *sync.WaitGroup, _ startup.Timer, dic *di.Container) bool {
LoadRestRoutes(b.router, dic, b.serviceName)
config := container.ConfigurationFrom(dic.Get)

// DeviceServiceCommandClient is not part of the common clients handled by the NewClientsBootstrap handler
dic.Update(di.ServiceConstructorMap{
bootstrapContainer.DeviceServiceCommandClientName: func(get di.Get) interface{} { // add API DeviceServiceCommandClient
jwtSecretProvider := secret.NewJWTSecretProvider(container.SecretProviderExtFrom(get))
return clients.NewDeviceServiceCommandClient(jwtSecretProvider)
jwtSecretProvider := secret.NewJWTSecretProvider(bootstrapContainer.SecretProviderExtFrom(get))
return clients.NewDeviceServiceCommandClient(jwtSecretProvider, config.Service.EnableNameFieldEscape)
},
})

Expand Down
3 changes: 2 additions & 1 deletion internal/core/data/application/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func (a *CoreDataApp) PublishEvent(data []byte, serviceName string, profileName
correlationId := correlation.FromContext(ctx)

basePrefix := configuration.MessageBus.GetBaseTopicPrefix()
publishTopic := common.BuildTopic(basePrefix, common.EventsPublishTopic, CoreDataEventTopicPrefix, common.URLEncode(serviceName), common.URLEncode(profileName), common.URLEncode(deviceName), common.URLEncode(sourceName))
publishTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape).
SetPath(basePrefix).SetPath(common.EventsPublishTopic).SetPath(CoreDataEventTopicPrefix).SetNameFieldPath(serviceName).SetNameFieldPath(profileName).SetNameFieldPath(deviceName).SetNameFieldPath(sourceName).BuildPath()
lc.Debugf("Publishing AddEventRequest to MessageBus. Topic: %s; %s: %s", publishTopic, common.CorrelationHeader, correlationId)

msgEnvelope := msgTypes.NewMessageEnvelope(data, ctx)
Expand Down
23 changes: 9 additions & 14 deletions internal/core/metadata/application/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ func validateDeviceCallback(device dtos.Device, dic *di.Container) errors.EdgeX
}

baseTopic := configuration.MessageBus.GetBaseTopicPrefix()
requestTopic := common.BuildTopic(baseTopic, common.URLEncode(device.ServiceName), common.ValidateDeviceSubscribeTopic)
responseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, common.URLEncode(device.ServiceName))
requestTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape).
SetPath(baseTopic).SetNameFieldPath(device.ServiceName).SetPath(common.ValidateDeviceSubscribeTopic).BuildPath()
responseTopicPrefix := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape).
SetPath(baseTopic).SetPath(common.ResponseTopic).SetNameFieldPath(device.ServiceName).BuildPath()
requestEnvelope := types.NewMessageEnvelopeForRequest(requestBytes, nil)

lc.Debugf("Sending Device Validation request for device=%s, CorrelationId=%s to topic: %s", device.Name, requestEnvelope.CorrelationID, requestTopic)
Expand Down Expand Up @@ -86,6 +88,7 @@ func publishUpdateDeviceProfileSystemEvent(profileDTO dtos.DeviceProfile, ctx co

func publishSystemEvent(eventType, action, owner string, dto any, ctx context.Context, dic *di.Container) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
config := container.ConfigurationFrom(dic.Get)
systemEvent := dtos.NewSystemEvent(eventType, action, common.CoreMetaDataServiceKey, owner, nil, dto)
messagingClient := bootstrapContainer.MessagingClientFrom(dic.Get)
if messagingClient == nil {
Expand Down Expand Up @@ -130,20 +133,12 @@ func publishSystemEvent(eventType, action, owner string, dto any, ctx context.Co
return
}

publishTopic := common.BuildTopic(
container.ConfigurationFrom(dic.Get).MessageBus.GetBaseTopicPrefix(),
common.SystemEventPublishTopic,
systemEvent.Source,
systemEvent.Type,
systemEvent.Action,
common.URLEncode(systemEvent.Owner),
)
topicPathBuilder := common.NewPathBuilder().EnableNameFieldEscape(config.Service.EnableNameFieldEscape)
publishTopic := topicPathBuilder.SetPath(config.MessageBus.GetBaseTopicPrefix()).SetPath(common.SystemEventPublishTopic).
SetPath(systemEvent.Source).SetPath(systemEvent.Type).SetPath(systemEvent.Action).SetNameFieldPath(systemEvent.Owner).BuildPath()

if profileName != "" {
publishTopic = common.BuildTopic(
publishTopic,
common.URLEncode(profileName),
)
publishTopic = topicPathBuilder.SetNameFieldPath(profileName).BuildPath()
}

payload, _ := json.Marshal(systemEvent)
Expand Down
3 changes: 2 additions & 1 deletion internal/core/metadata/application/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/edgexfoundry/edgex-go/internal/core/metadata/config"
"github.com/edgexfoundry/edgex-go/internal/core/metadata/container"
bootstrapConfig "github.com/edgexfoundry/go-mod-bootstrap/v3/config"
mocks2 "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger/mocks"
"github.com/edgexfoundry/go-mod-core-contracts/v3/common"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
Expand Down Expand Up @@ -75,7 +76,7 @@ func TestPublishSystemEvent(t *testing.T) {

dic := di.NewContainer(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
return &config.ConfigurationStruct{}
return &config.ConfigurationStruct{Service: bootstrapConfig.ServiceInfo{EnableNameFieldEscape: true}}
},
bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} {
return mockLogger
Expand Down

0 comments on commit 6380a38

Please sign in to comment.