Skip to content

Commit

Permalink
fix: add secure mqtt support to eKuiper (#4155)
Browse files Browse the repository at this point in the history
Closes: #4147

Signed-off-by: intel <[email protected]>
  • Loading branch information
joshua-silverio authored Sep 15, 2022
1 parent a0e04e5 commit b7de7b7
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 44 deletions.
2 changes: 1 addition & 1 deletion internal/security/secretstore/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (b *Bootstrap) BootstrapHandler(ctx context.Context, _ *sync.WaitGroup, _ s
}
}

err = ConfigureSecureMessageBus(configuration.SecureMessageBus, redisCredentials, lc)
err = ConfigureSecureMessageBus(configuration.SecureMessageBus, creds, lc)
if err != nil {
lc.Errorf("failed to configure for Secure Message Bus: %s", err.Error())
return false
Expand Down
73 changes: 46 additions & 27 deletions internal/security/secretstore/secure-messagebus.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ default:
optional:
Username: {{.User}}
Password: {{.Password}}
port: 6379
protocol: redis
port: {{.Port}}
protocol: {{.Protocol}}
server: localhost
connectionSelector: edgex.redisMsgBus
connectionSelector: {{.ConnectionSelector}}
topic: rules-events
type: redis
type: {{.Type}}
mqtt_conf:
optional:
ClientId: client1
Expand All @@ -55,11 +55,11 @@ mqtt_conf:

eKuiperConnectionsTemplate = `
edgex:
redisMsgBus: #connection key
protocol: redis
{{.ConnectorName}}: #connection key
protocol: {{.Protocol}}
server: localhost
port: 6379
type: redis
port: {{.Port}}
type: {{.Type}}
optional:
Username: {{.User}}
Password: {{.Password}}
Expand All @@ -71,26 +71,35 @@ edgex:
blankSecureMessageBusType = ""
)

func ConfigureSecureMessageBus(secureMessageBus config.SecureMessageBusInfo, redis5Pair UserPasswordPair, lc logger.LoggingClient) error {
type eKuiperFields struct {
User string
Password string
ConnectionSelector string
ConnectorName string
Protocol string
Type string
Port int
}

func ConfigureSecureMessageBus(secureMessageBus config.SecureMessageBusInfo, creds UserPasswordPair, lc logger.LoggingClient) error {
fields := eKuiperFields{
User: creds.User,
Password: creds.Password,
}
switch secureMessageBus.Type {
// Currently, only support Secure MessageBus when using the Redis implementation.
case redisSecureMessageBusType:
// eKuiper now has two configuration files (EdgeX Sources and Connections)
fields.ConnectionSelector = "edgex.redisMsgBus"
fields.ConnectorName = "redisMsgBus"
fields.Protocol = "redis"
fields.Type = redisSecureMessageBusType
fields.Port = 6379

err := configureKuiperForSecureMessageBus(redis5Pair, "EdgeX Source", eKuiperEdgeXSourceTemplate, secureMessageBus.KuiperConfigPath, lc)
if err != nil {
return err
}

err = configureKuiperForSecureMessageBus(redis5Pair, "Connections", eKuiperConnectionsTemplate, secureMessageBus.KuiperConnectionsPath, lc)
if err != nil {
return err
}

// TODO: Add support for secure MQTT MessageBus
case mqttSecureMessageBusType:
lc.Errorf("secure MQTT MessageBus not yet supported for eKuiper")
return nil
fields.ConnectionSelector = "edgex.mqttMsgBus"
fields.ConnectorName = "mqttMsgBus"
fields.Protocol = "tcp"
fields.Type = mqttSecureMessageBusType
fields.Port = 1883

case noneSecureMessageBusType, blankSecureMessageBusType:
return nil
Expand All @@ -99,10 +108,20 @@ func ConfigureSecureMessageBus(secureMessageBus config.SecureMessageBusInfo, red
return fmt.Errorf("invalid Secure MessageBus Type of '%s'", secureMessageBus.Type)
}

// eKuiper now has two configuration files (EdgeX Sources and Connections)
err := configureKuiperForSecureMessageBus(fields, "EdgeX Source", eKuiperEdgeXSourceTemplate, secureMessageBus.KuiperConfigPath, lc)
if err != nil {
return err
}

err = configureKuiperForSecureMessageBus(fields, "Connections", eKuiperConnectionsTemplate, secureMessageBus.KuiperConnectionsPath, lc)
if err != nil {
return err
}
return nil
}

func configureKuiperForSecureMessageBus(credentials UserPasswordPair, fileType string, fileTemplate string, path string, lc logger.LoggingClient) error {
func configureKuiperForSecureMessageBus(fields eKuiperFields, fileType string, fileTemplate string, path string, lc logger.LoggingClient) error {
// This capability depends on the eKuiper file existing, which depends on the version of eKuiper installed.
// If the file doesn't exist, then the eKuiper version installed doesn't use it, so skip the injection.
_, err := os.Stat(path)
Expand All @@ -125,12 +144,12 @@ func configureKuiperForSecureMessageBus(credentials UserPasswordPair, fileType s
_ = file.Close()
}()

err = tmpl.Execute(file, credentials)
err = tmpl.Execute(file, fields)
if err != nil {
return fmt.Errorf("failed to write eKuiper %s file %s: %w", fileType, path, err)
}

lc.Infof("Wrote eKuiper %s at %s with Secure MessageBus credentials", fileType, path)
lc.Infof("Wrote eKuiper %s at %s with Secure MessageBus credentials for %s", fileType, path, fields.Type)

return nil
}
103 changes: 87 additions & 16 deletions internal/security/secretstore/secure-messagebus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package secretstore

import (
"os"
"strings"
"testing"

"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
Expand All @@ -32,8 +31,80 @@ func TestConfigureSecureMessageBus(t *testing.T) {
KuiperConfigPath: "./testdata/edgex.yaml",
KuiperConnectionsPath: "./testdata/connection.yaml",
}

validExpected := UserPasswordPair{
expectedMqttConfigContent := `
application_conf:
port: 5571
protocol: tcp
server: localhost
topic: application
default:
optional:
Username: testUser
Password: testPassword
port: 1883
protocol: tcp
server: localhost
connectionSelector: edgex.mqttMsgBus
topic: rules-events
type: mqtt
mqtt_conf:
optional:
ClientId: client1
port: 1883
protocol: tcp
server: localhost
topic: events
type: mqtt
`
expectedRedisConfigContent := `
application_conf:
port: 5571
protocol: tcp
server: localhost
topic: application
default:
optional:
Username: testUser
Password: testPassword
port: 6379
protocol: redis
server: localhost
connectionSelector: edgex.redisMsgBus
topic: rules-events
type: redis
mqtt_conf:
optional:
ClientId: client1
port: 1883
protocol: tcp
server: localhost
topic: events
type: mqtt
`

expectedMqttConnectionsContent := `
edgex:
mqttMsgBus: #connection key
protocol: tcp
server: localhost
port: 1883
type: mqtt
optional:
Username: testUser
Password: testPassword
`
expectedRedisConnectionsContent := `
edgex:
redisMsgBus: #connection key
protocol: redis
server: localhost
port: 6379
type: redis
optional:
Username: testUser
Password: testPassword
`
creds := UserPasswordPair{
User: "testUser",
Password: "testPassword",
}
Expand All @@ -43,15 +114,17 @@ func TestConfigureSecureMessageBus(t *testing.T) {
Type string
ConnectionFileExists bool
Credentials UserPasswordPair
Expected *UserPasswordPair
ExpectedConfig *string
ExpectedConnnection *string
ExpectError bool
}{
{"valid redis - both files", redisSecureMessageBusType, true, validExpected, &validExpected, false},
{"valid redis - no connection file", redisSecureMessageBusType, false, validExpected, &validExpected, false},
{"valid blank", blankSecureMessageBusType, false, validExpected, nil, false},
{"valid none", noneSecureMessageBusType, false, validExpected, nil, false},
{"invalid type", "bogus", false, validExpected, nil, true},
{"invalid mqtt", mqttSecureMessageBusType, false, validExpected, nil, false},
{"valid redis - both files", redisSecureMessageBusType, true, creds, &expectedRedisConfigContent, &expectedRedisConnectionsContent, false},
{"valid redis - no connection file", redisSecureMessageBusType, false, creds, &expectedRedisConfigContent, nil, false},
{"valid mqtt - both files", mqttSecureMessageBusType, false, creds, &expectedMqttConfigContent, &expectedMqttConnectionsContent, false},
{"valid mqtt - no connection file", mqttSecureMessageBusType, false, creds, &expectedMqttConfigContent, nil, false},
{"valid blank", blankSecureMessageBusType, false, creds, nil, nil, false},
{"valid none", noneSecureMessageBusType, false, creds, nil, nil, false},
{"invalid type", "bogus", false, creds, nil, nil, true},
}
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
Expand All @@ -63,7 +136,7 @@ func TestConfigureSecureMessageBus(t *testing.T) {
_ = os.Remove(secureMessageBus.KuiperConnectionsPath)
}()

if test.Expected != nil {
if test.ExpectedConfig != nil {
_, err := os.Create(secureMessageBus.KuiperConfigPath)
require.NoError(t, err)

Expand All @@ -82,7 +155,7 @@ func TestConfigureSecureMessageBus(t *testing.T) {

require.NoError(t, err)

if test.Expected == nil {
if test.ExpectedConfig == nil {
// Source Config file should not have been written
_, err = os.Stat(secureMessageBus.KuiperConfigPath)
require.True(t, os.IsNotExist(err))
Expand All @@ -97,15 +170,13 @@ func TestConfigureSecureMessageBus(t *testing.T) {
// Source Config file should have been written
contents, err := os.ReadFile(secureMessageBus.KuiperConfigPath)
require.NoError(t, err)
assert.True(t, strings.Contains(string(contents), test.Expected.User))
assert.True(t, strings.Contains(string(contents), test.Expected.Password))
assert.Equal(t, *test.ExpectedConfig, string(contents))

if test.ConnectionFileExists {
// Connections file should have been written
contents, err = os.ReadFile(secureMessageBus.KuiperConnectionsPath)
require.NoError(t, err)
assert.True(t, strings.Contains(string(contents), test.Expected.User))
assert.True(t, strings.Contains(string(contents), test.Expected.Password))
assert.Equal(t, *test.ExpectedConnnection, string(contents))
} else {
// Connections file should not have been written
_, err = os.Stat(secureMessageBus.KuiperConnectionsPath)
Expand Down

0 comments on commit b7de7b7

Please sign in to comment.