Skip to content

Commit

Permalink
Merge pull request #4185 from hahattan/refine-configuration
Browse files Browse the repository at this point in the history
feat(command): refine north-south messaging configuration
  • Loading branch information
cloudxxx8 authored Oct 11, 2022
2 parents bd62954 + e842b92 commit 867487c
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 24 deletions.
19 changes: 12 additions & 7 deletions cmd/core-command/res/configuration.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# This is required for backwards compatibility so new version of sevice using older 2.x configuration will not fail bootstrapping
# This will default to false if not provided in old config. Messagebus is now needed by North-South Messaging
# TODO: Remove this setting EdgeX 3.0
RequireMessageBus = true

[Writable]
LogLevel = "INFO"
[Writable.InsecureSecrets]
Expand Down Expand Up @@ -45,7 +50,6 @@ Type = "consul"
Port = 59881

[MessageQueue]
Required = false
[MessageQueue.Internal]
Type = "redis"
Protocol = "redis"
Expand All @@ -54,12 +58,12 @@ Type = "consul"
AuthMode = "usernamepassword" # required for redis messagebus (secure or insecure).
SecretName = "redisdb"
[MessageQueue.Internal.Topics]
DeviceRequestTopicPrefix = "edgex/command/request" # for publishing requests to the device service; <device-service>/<device-name>/<command-name>/<method> will be added to this publish topic prefix
DeviceResponseTopic = "edgex/command/response/#" # for subscribing to device service responses
CommandRequestTopic = "/command/request/#" # for subscribing to internal command requests
CommandResponseTopicPrefix = "/command/response" # for publishing responses back to internal service /<device-name>/<command-name>/<method> will be added to this publish topic prefix
QueryRequestTopic = "/commandquery/request/#" # for subscribing to internal command query requests
QueryResponseTopic = "/commandquery/response" # for publishing reponsses back to internal service
DeviceRequestTopicPrefix = "edgex/device/command/request" # for publishing requests to the device service; <device-service>/<device-name>/<command-name>/<method> will be added to this publish topic prefix
DeviceResponseTopic = "edgex/device/command/response/#" # for subscribing to device service responses
CommandRequestTopic = "edgex/core/command/request/#" # for subscribing to internal command requests
CommandResponseTopicPrefix = "edgex/core/command/response" # for publishing responses back to internal service /<device-name>/<command-name>/<method> will be added to this publish topic prefix
QueryRequestTopic = "edgex/core/commandquery/request/#" # for subscribing to internal command query requests
QueryResponseTopic = "edgex/core/commandquery/response" # for publishing reponsses back to internal service
[MessageQueue.Internal.Optional]
# Default MQTT Specific options that need to be here to enable evnironment variable overrides of them
ClientId ="core-command"
Expand All @@ -79,6 +83,7 @@ Type = "consul"
DefaultPubRetryAttempts = "2"
Subject = "edgex/#" # Required for NATS Jetstram only for stream autoprovsioning
[MessageQueue.External]
Enabled = false
Url = "tcp://localhost:1883"
ClientId = "core-command"
ConnectTimeout = "5s"
Expand Down
19 changes: 9 additions & 10 deletions internal/core/command/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (

// ConfigurationStruct contains the configuration properties for the core-command service.
type ConfigurationStruct struct {
Writable WritableInfo
Clients map[string]bootstrapConfig.ClientInfo
Databases map[string]bootstrapConfig.Database
Registry bootstrapConfig.RegistryInfo
Service bootstrapConfig.ServiceInfo
MessageQueue MessageQueue
SecretStore bootstrapConfig.SecretStoreInfo
//TODO: Remove in EdgeX 3.0 - Is needed now for backward compatability in 2.0
RequireMessageBus bool
Writable WritableInfo
Clients map[string]bootstrapConfig.ClientInfo
Databases map[string]bootstrapConfig.Database
Registry bootstrapConfig.RegistryInfo
Service bootstrapConfig.ServiceInfo
MessageQueue MessageQueue
SecretStore bootstrapConfig.SecretStoreInfo
}

// WritableInfo contains configuration properties that can be updated and applied without restarting the service.
Expand All @@ -37,9 +39,6 @@ type WritableInfo struct {
}

type MessageQueue struct {
// This is required for backwards compatability with older versions of 2.x configuration
// TODO: remove 'Required' in EdgeX 3.0
Required bool
Internal bootstrapConfig.MessageBusInfo
External bootstrapConfig.ExternalMQTTInfo
}
Expand Down
2 changes: 0 additions & 2 deletions internal/core/command/controller/messaging/external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func TestOnConnectHandler(t *testing.T) {
container.ConfigurationName: func(get di.Get) interface{} {
return &config.ConfigurationStruct{
MessageQueue: config.MessageQueue{
Required: false,
External: bootstrapConfig.ExternalMQTTInfo{
Topics: map[string]string{
QueryRequestTopic: testQueryRequestTopic,
Expand Down Expand Up @@ -288,7 +287,6 @@ func Test_commandRequestHandler(t *testing.T) {
MaxResultCount: 20,
},
MessageQueue: config.MessageQueue{
Required: true,
Internal: bootstrapConfig.MessageBusInfo{
Topics: map[string]string{
DeviceRequestTopicPrefix: testInternalCommandRequestTopicPrefix,
Expand Down
13 changes: 8 additions & 5 deletions internal/core/command/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,12 @@ func Main(ctx context.Context, cancel context.CancelFunc, router *mux.Router) {
func MessageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
configuration := container.ConfigurationFrom(dic.Get)
if configuration.MessageQueue.Required {
router := messaging.NewMessagingRouter()
router := messaging.NewMessagingRouter()

if configuration.RequireMessageBus {
if !handlers.MessagingBootstrapHandler(ctx, wg, startupTimer, dic) {
return false
}
if !handlers.NewExternalMQTT(messaging.OnConnectHandler(router, dic)).BootstrapHandler(ctx, wg, startupTimer, dic) {
return false
}
if err := messaging.SubscribeCommandRequests(ctx, router, dic); err != nil {
lc.Errorf("Failed to subscribe commands request from internal message bus, %v", err)
return false
Expand All @@ -110,6 +108,11 @@ func MessageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startup
lc.Errorf("Failed to subscribe command query request from internal message bus, %v", err)
return false
}
if configuration.MessageQueue.External.Enabled {
if !handlers.NewExternalMQTT(messaging.OnConnectHandler(router, dic)).BootstrapHandler(ctx, wg, startupTimer, dic) {
return false
}
}
}

// Not required so do nothing
Expand Down

0 comments on commit 867487c

Please sign in to comment.