From acdbe2b7494d5d154afefe1aeb43ebb3b4ac809f Mon Sep 17 00:00:00 2001 From: intel Date: Wed, 14 Sep 2022 12:08:43 -0700 Subject: [PATCH] fix: add secure mqtt support to eKuiper Closes: #4147 Signed-off-by: intel --- internal/security/secretstore/init.go | 2 +- .../security/secretstore/secure-messagebus.go | 73 ++++++++----- .../secretstore/secure-messagebus_test.go | 103 +++++++++++++++--- 3 files changed, 134 insertions(+), 44 deletions(-) diff --git a/internal/security/secretstore/init.go b/internal/security/secretstore/init.go index 89e09e47dd..fbc16e5d36 100644 --- a/internal/security/secretstore/init.go +++ b/internal/security/secretstore/init.go @@ -499,7 +499,7 @@ func (b *Bootstrap) BootstrapHandler(ctx context.Context, _ *sync.WaitGroup, _ s } } - err = ConfigureSecureMessageBus(configuration.SecureMessageBus, redisCredentials, lc) + err = ConfigureSecureMessageBus(configuration.SecureMessageBus, creds, lc) if err != nil { lc.Errorf("failed to configure for Secure Message Bus: %s", err.Error()) return false diff --git a/internal/security/secretstore/secure-messagebus.go b/internal/security/secretstore/secure-messagebus.go index 92584a718c..fe99add260 100644 --- a/internal/security/secretstore/secure-messagebus.go +++ b/internal/security/secretstore/secure-messagebus.go @@ -37,12 +37,12 @@ default: optional: Username: {{.User}} Password: {{.Password}} - port: 6379 - protocol: redis + port: {{.Port}} + protocol: {{.Protocol}} server: localhost - connectionSelector: edgex.redisMsgBus + connectionSelector: {{.ConnectionSelector}} topic: rules-events - type: redis + type: {{.Type}} mqtt_conf: optional: ClientId: client1 @@ -55,11 +55,11 @@ mqtt_conf: eKuiperConnectionsTemplate = ` edgex: - redisMsgBus: #connection key - protocol: redis + {{.ConnectorName}}: #connection key + protocol: {{.Protocol}} server: localhost - port: 6379 - type: redis + port: {{.Port}} + type: {{.Type}} optional: Username: {{.User}} Password: {{.Password}} @@ -71,26 +71,35 @@ edgex: blankSecureMessageBusType = "" ) -func ConfigureSecureMessageBus(secureMessageBus config.SecureMessageBusInfo, redis5Pair UserPasswordPair, lc logger.LoggingClient) error { +type eKuiperFields struct { + User string + Password string + ConnectionSelector string + ConnectorName string + Protocol string + Type string + Port int +} + +func ConfigureSecureMessageBus(secureMessageBus config.SecureMessageBusInfo, creds UserPasswordPair, lc logger.LoggingClient) error { + fields := eKuiperFields{ + User: creds.User, + Password: creds.Password, + } switch secureMessageBus.Type { - // Currently, only support Secure MessageBus when using the Redis implementation. case redisSecureMessageBusType: - // eKuiper now has two configuration files (EdgeX Sources and Connections) + fields.ConnectionSelector = "edgex.redisMsgBus" + fields.ConnectorName = "redisMsgBus" + fields.Protocol = "redis" + fields.Type = redisSecureMessageBusType + fields.Port = 6379 - err := configureKuiperForSecureMessageBus(redis5Pair, "EdgeX Source", eKuiperEdgeXSourceTemplate, secureMessageBus.KuiperConfigPath, lc) - if err != nil { - return err - } - - err = configureKuiperForSecureMessageBus(redis5Pair, "Connections", eKuiperConnectionsTemplate, secureMessageBus.KuiperConnectionsPath, lc) - if err != nil { - return err - } - - // TODO: Add support for secure MQTT MessageBus case mqttSecureMessageBusType: - lc.Errorf("secure MQTT MessageBus not yet supported for eKuiper") - return nil + fields.ConnectionSelector = "edgex.mqttMsgBus" + fields.ConnectorName = "mqttMsgBus" + fields.Protocol = "tcp" + fields.Type = mqttSecureMessageBusType + fields.Port = 1883 case noneSecureMessageBusType, blankSecureMessageBusType: return nil @@ -99,10 +108,20 @@ func ConfigureSecureMessageBus(secureMessageBus config.SecureMessageBusInfo, red return fmt.Errorf("invalid Secure MessageBus Type of '%s'", secureMessageBus.Type) } + // eKuiper now has two configuration files (EdgeX Sources and Connections) + err := configureKuiperForSecureMessageBus(fields, "EdgeX Source", eKuiperEdgeXSourceTemplate, secureMessageBus.KuiperConfigPath, lc) + if err != nil { + return err + } + + err = configureKuiperForSecureMessageBus(fields, "Connections", eKuiperConnectionsTemplate, secureMessageBus.KuiperConnectionsPath, lc) + if err != nil { + return err + } return nil } -func configureKuiperForSecureMessageBus(credentials UserPasswordPair, fileType string, fileTemplate string, path string, lc logger.LoggingClient) error { +func configureKuiperForSecureMessageBus(fields eKuiperFields, fileType string, fileTemplate string, path string, lc logger.LoggingClient) error { // This capability depends on the eKuiper file existing, which depends on the version of eKuiper installed. // If the file doesn't exist, then the eKuiper version installed doesn't use it, so skip the injection. _, err := os.Stat(path) @@ -125,12 +144,12 @@ func configureKuiperForSecureMessageBus(credentials UserPasswordPair, fileType s _ = file.Close() }() - err = tmpl.Execute(file, credentials) + err = tmpl.Execute(file, fields) if err != nil { return fmt.Errorf("failed to write eKuiper %s file %s: %w", fileType, path, err) } - lc.Infof("Wrote eKuiper %s at %s with Secure MessageBus credentials", fileType, path) + lc.Infof("Wrote eKuiper %s at %s with Secure MessageBus credentials for %s", fileType, path, fields.Type) return nil } diff --git a/internal/security/secretstore/secure-messagebus_test.go b/internal/security/secretstore/secure-messagebus_test.go index eed88b5699..cdb07ae873 100644 --- a/internal/security/secretstore/secure-messagebus_test.go +++ b/internal/security/secretstore/secure-messagebus_test.go @@ -17,7 +17,6 @@ package secretstore import ( "os" - "strings" "testing" "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" @@ -32,8 +31,80 @@ func TestConfigureSecureMessageBus(t *testing.T) { KuiperConfigPath: "./testdata/edgex.yaml", KuiperConnectionsPath: "./testdata/connection.yaml", } - - validExpected := UserPasswordPair{ + expectedMqttConfigContent := ` +application_conf: + port: 5571 + protocol: tcp + server: localhost + topic: application +default: + optional: + Username: testUser + Password: testPassword + port: 1883 + protocol: tcp + server: localhost + connectionSelector: edgex.mqttMsgBus + topic: rules-events + type: mqtt +mqtt_conf: + optional: + ClientId: client1 + port: 1883 + protocol: tcp + server: localhost + topic: events + type: mqtt +` + expectedRedisConfigContent := ` +application_conf: + port: 5571 + protocol: tcp + server: localhost + topic: application +default: + optional: + Username: testUser + Password: testPassword + port: 6379 + protocol: redis + server: localhost + connectionSelector: edgex.redisMsgBus + topic: rules-events + type: redis +mqtt_conf: + optional: + ClientId: client1 + port: 1883 + protocol: tcp + server: localhost + topic: events + type: mqtt +` + + expectedMqttConnectionsContent := ` +edgex: + mqttMsgBus: #connection key + protocol: tcp + server: localhost + port: 1883 + type: mqtt + optional: + Username: testUser + Password: testPassword +` + expectedRedisConnectionsContent := ` +edgex: + redisMsgBus: #connection key + protocol: redis + server: localhost + port: 6379 + type: redis + optional: + Username: testUser + Password: testPassword +` + creds := UserPasswordPair{ User: "testUser", Password: "testPassword", } @@ -43,15 +114,17 @@ func TestConfigureSecureMessageBus(t *testing.T) { Type string ConnectionFileExists bool Credentials UserPasswordPair - Expected *UserPasswordPair + ExpectedConfig *string + ExpectedConnnection *string ExpectError bool }{ - {"valid redis - both files", redisSecureMessageBusType, true, validExpected, &validExpected, false}, - {"valid redis - no connection file", redisSecureMessageBusType, false, validExpected, &validExpected, false}, - {"valid blank", blankSecureMessageBusType, false, validExpected, nil, false}, - {"valid none", noneSecureMessageBusType, false, validExpected, nil, false}, - {"invalid type", "bogus", false, validExpected, nil, true}, - {"invalid mqtt", mqttSecureMessageBusType, false, validExpected, nil, false}, + {"valid redis - both files", redisSecureMessageBusType, true, creds, &expectedRedisConfigContent, &expectedRedisConnectionsContent, false}, + {"valid redis - no connection file", redisSecureMessageBusType, false, creds, &expectedRedisConfigContent, nil, false}, + {"valid mqtt - both files", mqttSecureMessageBusType, false, creds, &expectedMqttConfigContent, &expectedMqttConnectionsContent, false}, + {"valid mqtt - no connection file", mqttSecureMessageBusType, false, creds, &expectedMqttConfigContent, nil, false}, + {"valid blank", blankSecureMessageBusType, false, creds, nil, nil, false}, + {"valid none", noneSecureMessageBusType, false, creds, nil, nil, false}, + {"invalid type", "bogus", false, creds, nil, nil, true}, } for _, test := range tests { t.Run(test.Name, func(t *testing.T) { @@ -63,7 +136,7 @@ func TestConfigureSecureMessageBus(t *testing.T) { _ = os.Remove(secureMessageBus.KuiperConnectionsPath) }() - if test.Expected != nil { + if test.ExpectedConfig != nil { _, err := os.Create(secureMessageBus.KuiperConfigPath) require.NoError(t, err) @@ -82,7 +155,7 @@ func TestConfigureSecureMessageBus(t *testing.T) { require.NoError(t, err) - if test.Expected == nil { + if test.ExpectedConfig == nil { // Source Config file should not have been written _, err = os.Stat(secureMessageBus.KuiperConfigPath) require.True(t, os.IsNotExist(err)) @@ -97,15 +170,13 @@ func TestConfigureSecureMessageBus(t *testing.T) { // Source Config file should have been written contents, err := os.ReadFile(secureMessageBus.KuiperConfigPath) require.NoError(t, err) - assert.True(t, strings.Contains(string(contents), test.Expected.User)) - assert.True(t, strings.Contains(string(contents), test.Expected.Password)) + assert.Equal(t, *test.ExpectedConfig, string(contents)) if test.ConnectionFileExists { // Connections file should have been written contents, err = os.ReadFile(secureMessageBus.KuiperConnectionsPath) require.NoError(t, err) - assert.True(t, strings.Contains(string(contents), test.Expected.User)) - assert.True(t, strings.Contains(string(contents), test.Expected.Password)) + assert.Equal(t, *test.ExpectedConnnection, string(contents)) } else { // Connections file should not have been written _, err = os.Stat(secureMessageBus.KuiperConnectionsPath)