Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(security): add secure mqtt support to eKuiper #4155

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/security/secretstore/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (b *Bootstrap) BootstrapHandler(ctx context.Context, _ *sync.WaitGroup, _ s
}
}

err = ConfigureSecureMessageBus(configuration.SecureMessageBus, redisCredentials, lc)
err = ConfigureSecureMessageBus(configuration.SecureMessageBus, creds, lc)
if err != nil {
lc.Errorf("failed to configure for Secure Message Bus: %s", err.Error())
return false
Expand Down
73 changes: 46 additions & 27 deletions internal/security/secretstore/secure-messagebus.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ default:
optional:
Username: {{.User}}
Password: {{.Password}}
port: 6379
protocol: redis
port: {{.Port}}
protocol: {{.Protocol}}
server: localhost
connectionSelector: edgex.redisMsgBus
connectionSelector: {{.ConnectionSelector}}
topic: rules-events
type: redis
type: {{.Type}}
mqtt_conf:
optional:
ClientId: client1
Expand All @@ -55,11 +55,11 @@ mqtt_conf:

eKuiperConnectionsTemplate = `
edgex:
redisMsgBus: #connection key
protocol: redis
{{.ConnectorName}}: #connection key
protocol: {{.Protocol}}
server: localhost
port: 6379
type: redis
port: {{.Port}}
type: {{.Type}}
optional:
Username: {{.User}}
Password: {{.Password}}
Expand All @@ -71,26 +71,35 @@ edgex:
blankSecureMessageBusType = ""
)

func ConfigureSecureMessageBus(secureMessageBus config.SecureMessageBusInfo, redis5Pair UserPasswordPair, lc logger.LoggingClient) error {
type eKuiperFields struct {
User string
Password string
ConnectionSelector string
ConnectorName string
Protocol string
Type string
Port int
}

func ConfigureSecureMessageBus(secureMessageBus config.SecureMessageBusInfo, creds UserPasswordPair, lc logger.LoggingClient) error {
fields := eKuiperFields{
User: creds.User,
Password: creds.Password,
}
switch secureMessageBus.Type {
// Currently, only support Secure MessageBus when using the Redis implementation.
case redisSecureMessageBusType:
// eKuiper now has two configuration files (EdgeX Sources and Connections)
fields.ConnectionSelector = "edgex.redisMsgBus"
fields.ConnectorName = "redisMsgBus"
fields.Protocol = "redis"
fields.Type = redisSecureMessageBusType
fields.Port = 6379

err := configureKuiperForSecureMessageBus(redis5Pair, "EdgeX Source", eKuiperEdgeXSourceTemplate, secureMessageBus.KuiperConfigPath, lc)
if err != nil {
return err
}

err = configureKuiperForSecureMessageBus(redis5Pair, "Connections", eKuiperConnectionsTemplate, secureMessageBus.KuiperConnectionsPath, lc)
if err != nil {
return err
}

// TODO: Add support for secure MQTT MessageBus
case mqttSecureMessageBusType:
lc.Errorf("secure MQTT MessageBus not yet supported for eKuiper")
return nil
fields.ConnectionSelector = "edgex.mqttMsgBus"
fields.ConnectorName = "mqttMsgBus"
fields.Protocol = "tcp"
fields.Type = mqttSecureMessageBusType
fields.Port = 1883

case noneSecureMessageBusType, blankSecureMessageBusType:
return nil
Expand All @@ -99,10 +108,20 @@ func ConfigureSecureMessageBus(secureMessageBus config.SecureMessageBusInfo, red
return fmt.Errorf("invalid Secure MessageBus Type of '%s'", secureMessageBus.Type)
}

// eKuiper now has two configuration files (EdgeX Sources and Connections)
err := configureKuiperForSecureMessageBus(fields, "EdgeX Source", eKuiperEdgeXSourceTemplate, secureMessageBus.KuiperConfigPath, lc)
if err != nil {
return err
}

err = configureKuiperForSecureMessageBus(fields, "Connections", eKuiperConnectionsTemplate, secureMessageBus.KuiperConnectionsPath, lc)
if err != nil {
return err
}
return nil
}

func configureKuiperForSecureMessageBus(credentials UserPasswordPair, fileType string, fileTemplate string, path string, lc logger.LoggingClient) error {
func configureKuiperForSecureMessageBus(fields eKuiperFields, fileType string, fileTemplate string, path string, lc logger.LoggingClient) error {
// This capability depends on the eKuiper file existing, which depends on the version of eKuiper installed.
// If the file doesn't exist, then the eKuiper version installed doesn't use it, so skip the injection.
_, err := os.Stat(path)
Expand All @@ -125,12 +144,12 @@ func configureKuiperForSecureMessageBus(credentials UserPasswordPair, fileType s
_ = file.Close()
}()

err = tmpl.Execute(file, credentials)
err = tmpl.Execute(file, fields)
if err != nil {
return fmt.Errorf("failed to write eKuiper %s file %s: %w", fileType, path, err)
}

lc.Infof("Wrote eKuiper %s at %s with Secure MessageBus credentials", fileType, path)
lc.Infof("Wrote eKuiper %s at %s with Secure MessageBus credentials for %s", fileType, path, fields.Type)

return nil
}
103 changes: 87 additions & 16 deletions internal/security/secretstore/secure-messagebus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package secretstore

import (
"os"
"strings"
"testing"

"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
Expand All @@ -32,8 +31,80 @@ func TestConfigureSecureMessageBus(t *testing.T) {
KuiperConfigPath: "./testdata/edgex.yaml",
KuiperConnectionsPath: "./testdata/connection.yaml",
}

validExpected := UserPasswordPair{
expectedMqttConfigContent := `
application_conf:
port: 5571
protocol: tcp
server: localhost
topic: application
default:
optional:
Username: testUser
Password: testPassword
port: 1883
protocol: tcp
server: localhost
connectionSelector: edgex.mqttMsgBus
topic: rules-events
type: mqtt
mqtt_conf:
optional:
ClientId: client1
port: 1883
protocol: tcp
server: localhost
topic: events
type: mqtt
`
expectedRedisConfigContent := `
application_conf:
port: 5571
protocol: tcp
server: localhost
topic: application
default:
optional:
Username: testUser
Password: testPassword
port: 6379
protocol: redis
server: localhost
connectionSelector: edgex.redisMsgBus
topic: rules-events
type: redis
mqtt_conf:
optional:
ClientId: client1
port: 1883
protocol: tcp
server: localhost
topic: events
type: mqtt
`

expectedMqttConnectionsContent := `
edgex:
mqttMsgBus: #connection key
protocol: tcp
server: localhost
port: 1883
type: mqtt
optional:
Username: testUser
Password: testPassword
`
expectedRedisConnectionsContent := `
edgex:
redisMsgBus: #connection key
protocol: redis
server: localhost
port: 6379
type: redis
optional:
Username: testUser
Password: testPassword
`
creds := UserPasswordPair{
User: "testUser",
Password: "testPassword",
}
Expand All @@ -43,15 +114,17 @@ func TestConfigureSecureMessageBus(t *testing.T) {
Type string
ConnectionFileExists bool
Credentials UserPasswordPair
Expected *UserPasswordPair
ExpectedConfig *string
ExpectedConnnection *string
ExpectError bool
}{
{"valid redis - both files", redisSecureMessageBusType, true, validExpected, &validExpected, false},
{"valid redis - no connection file", redisSecureMessageBusType, false, validExpected, &validExpected, false},
{"valid blank", blankSecureMessageBusType, false, validExpected, nil, false},
{"valid none", noneSecureMessageBusType, false, validExpected, nil, false},
{"invalid type", "bogus", false, validExpected, nil, true},
{"invalid mqtt", mqttSecureMessageBusType, false, validExpected, nil, false},
{"valid redis - both files", redisSecureMessageBusType, true, creds, &expectedRedisConfigContent, &expectedRedisConnectionsContent, false},
{"valid redis - no connection file", redisSecureMessageBusType, false, creds, &expectedRedisConfigContent, nil, false},
{"valid mqtt - both files", mqttSecureMessageBusType, false, creds, &expectedMqttConfigContent, &expectedMqttConnectionsContent, false},
{"valid mqtt - no connection file", mqttSecureMessageBusType, false, creds, &expectedMqttConfigContent, nil, false},
{"valid blank", blankSecureMessageBusType, false, creds, nil, nil, false},
{"valid none", noneSecureMessageBusType, false, creds, nil, nil, false},
{"invalid type", "bogus", false, creds, nil, nil, true},
}
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
Expand All @@ -63,7 +136,7 @@ func TestConfigureSecureMessageBus(t *testing.T) {
_ = os.Remove(secureMessageBus.KuiperConnectionsPath)
}()

if test.Expected != nil {
if test.ExpectedConfig != nil {
_, err := os.Create(secureMessageBus.KuiperConfigPath)
require.NoError(t, err)

Expand All @@ -82,7 +155,7 @@ func TestConfigureSecureMessageBus(t *testing.T) {

require.NoError(t, err)

if test.Expected == nil {
if test.ExpectedConfig == nil {
// Source Config file should not have been written
_, err = os.Stat(secureMessageBus.KuiperConfigPath)
require.True(t, os.IsNotExist(err))
Expand All @@ -97,15 +170,13 @@ func TestConfigureSecureMessageBus(t *testing.T) {
// Source Config file should have been written
contents, err := os.ReadFile(secureMessageBus.KuiperConfigPath)
require.NoError(t, err)
assert.True(t, strings.Contains(string(contents), test.Expected.User))
assert.True(t, strings.Contains(string(contents), test.Expected.Password))
assert.Equal(t, *test.ExpectedConfig, string(contents))

if test.ConnectionFileExists {
// Connections file should have been written
contents, err = os.ReadFile(secureMessageBus.KuiperConnectionsPath)
require.NoError(t, err)
assert.True(t, strings.Contains(string(contents), test.Expected.User))
assert.True(t, strings.Contains(string(contents), test.Expected.Password))
assert.Equal(t, *test.ExpectedConnnection, string(contents))
} else {
// Connections file should not have been written
_, err = os.Stat(secureMessageBus.KuiperConnectionsPath)
Expand Down