diff --git a/appsdk/sdk.go b/appsdk/sdk.go index ddc652be4..a831dcda4 100644 --- a/appsdk/sdk.go +++ b/appsdk/sdk.go @@ -56,6 +56,7 @@ import ( "github.com/edgexfoundry/app-functions-sdk-go/internal/trigger" "github.com/edgexfoundry/app-functions-sdk-go/internal/trigger/http" "github.com/edgexfoundry/app-functions-sdk-go/internal/trigger/messagebus" + "github.com/edgexfoundry/app-functions-sdk-go/internal/trigger/mqtt" "github.com/edgexfoundry/app-functions-sdk-go/internal/webserver" "github.com/edgexfoundry/app-functions-sdk-go/pkg/util" ) @@ -150,13 +151,18 @@ func (sdk *AppFunctionsSDK) MakeItRun() error { sdk.runtime.Initialize(sdk.storeClient, sdk.secretProvider) sdk.runtime.SetTransforms(sdk.transforms) + // determine input type and create trigger for it t := sdk.setupTrigger(sdk.config, sdk.runtime) + if t == nil { + return errors.New("Failed to create Trigger") + } // Initialize the trigger (i.e. start a web server, or connect to message bus) deferred, err := t.Initialize(sdk.appWg, sdk.appCtx, sdk.backgroundChannel) if err != nil { sdk.LoggingClient.Error(err.Error()) + return errors.New("Failed to initialize Trigger") } // deferred is a a function that needs to be called when services exits. @@ -437,9 +443,18 @@ func (sdk *AppFunctionsSDK) setupTrigger(configuration *common.ConfigurationStru case "HTTP": sdk.LoggingClient.Info("HTTP trigger selected") t = &http.Trigger{Configuration: configuration, Runtime: runtime, Webserver: sdk.webserver, EdgeXClients: sdk.edgexClients} - case "MESSAGEBUS": - sdk.LoggingClient.Info("MessageBus trigger selected") + + case "MESSAGEBUS", + "EDGEX-MESSAGEBUS": // Allows for more explicit name now that we have plain MQTT option also + sdk.LoggingClient.Info("EdgeX MessageBus trigger selected") t = &messagebus.Trigger{Configuration: configuration, Runtime: runtime, EdgeXClients: sdk.edgexClients} + + case "EXTERNAL-MQTT": + sdk.LoggingClient.Info("External MQTT trigger selected") + t = mqtt.NewTrigger(configuration, runtime, sdk.edgexClients, sdk.secretProvider) + + default: + sdk.LoggingClient.Error(fmt.Sprintf("Invalid Trigger type of '%s' specified", configuration.Binding.Type)) } return t diff --git a/internal/common/config.go b/internal/common/config.go index 1bc7c3ff8..d9b0acaff 100644 --- a/internal/common/config.go +++ b/internal/common/config.go @@ -53,6 +53,8 @@ type ConfigurationStruct struct { Service ServiceInfo // MessageBus MessageBus types.MessageBusConfig + // MqttBroker + MqttBroker MqttBrokerConfig // Binding Binding BindingInfo // ApplicationSettings @@ -88,12 +90,37 @@ type BindingInfo struct { // // example: messagebus // required: true - // enum: messagebus,http + // enum: messagebus (edgex-messagebus), http, external-mqtt Type string SubscribeTopic string PublishTopic string } +// MqttBrokerConfig contains the MQTT broker configuration for MQTT Trigger +type MqttBrokerConfig struct { + // Url contains the fully qualified URL to connect to the MQTT broker + Url string + // ClientId to connect to the broker with. + ClientId string + // ConnectTimeout is a time duration indicating how long to wait timing out on the broker connection + ConnectTimeout string + // AutoReconnect indicated whether or not to retry connection if disconnected + AutoReconnect bool + // KeepAlive is seconds between client ping when no active data flowing to avoid client being disconnected + KeepAlive int64 + // QoS for MQTT Connection + QoS byte + // Retain setting for MQTT Connection + Retain bool + // SkipCertVerify indicates if the certificate verification should be skipped + SkipCertVerify bool + // SecretPath is the name of the path in secret provider to retrieve your secrets + SecretPath string + // AuthMode indicates what to use when connecting to the broker. Options are "none", "cacert" , "usernamepassword", "clientcert". + // If a CA Cert exists in the SecretPath then it will be used for all modes except "none". + AuthMode string +} + type PipelineInfo struct { ExecutionOrder string UseTargetTypeOfByteArray bool diff --git a/internal/trigger/mqtt/mqtt.go b/internal/trigger/mqtt/mqtt.go new file mode 100644 index 000000000..691d0db31 --- /dev/null +++ b/internal/trigger/mqtt/mqtt.go @@ -0,0 +1,183 @@ +// +// Copyright (c) 2020 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package mqtt + +import ( + "context" + "errors" + "fmt" + "net/url" + "sync" + "time" + + pahoMqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/edgexfoundry/go-mod-bootstrap/bootstrap" + "github.com/edgexfoundry/go-mod-core-contracts/clients" + "github.com/edgexfoundry/go-mod-messaging/pkg/types" + "github.com/google/uuid" + + "github.com/edgexfoundry/app-functions-sdk-go/appcontext" + "github.com/edgexfoundry/app-functions-sdk-go/internal/common" + "github.com/edgexfoundry/app-functions-sdk-go/internal/runtime" + "github.com/edgexfoundry/app-functions-sdk-go/internal/security" + "github.com/edgexfoundry/app-functions-sdk-go/pkg/secure" +) + +// Trigger implements Trigger to support Triggers +type Trigger struct { + configuration *common.ConfigurationStruct + mqttClient pahoMqtt.Client + runtime *runtime.GolangRuntime + edgeXClients common.EdgeXClients + secretProvider security.SecretProvider +} + +func NewTrigger( + configuration *common.ConfigurationStruct, + runtime *runtime.GolangRuntime, + clients common.EdgeXClients, + secretProvider security.SecretProvider) *Trigger { + return &Trigger{ + configuration: configuration, + runtime: runtime, + edgeXClients: clients, + secretProvider: secretProvider, + } +} + +// Initialize initializes the Trigger for an external MQTT broker +func (trigger *Trigger) Initialize(_ *sync.WaitGroup, _ context.Context, background <-chan types.MessageEnvelope) (bootstrap.Deferred, error) { + // Convenience short cuts + logger := trigger.edgeXClients.LoggingClient + brokerConfig := trigger.configuration.MqttBroker + topic := trigger.configuration.Binding.SubscribeTopic + + logger.Info("Initializing MQTT Trigger") + + if background != nil { + return nil, errors.New("background publishing not supported for services using MQTT trigger") + } + + if len(topic) == 0 { + return nil, fmt.Errorf("missing SubscribeTopic for MQTT Trigger. Must be present in [Binding] section.") + } + + brokerUrl, err := url.Parse(brokerConfig.Url) + if err != nil { + return nil, fmt.Errorf("invalid MQTT Broker Url '%s': %s", trigger.configuration.MqttBroker.Url, err.Error()) + } + + opts := pahoMqtt.NewClientOptions() + opts.AutoReconnect = brokerConfig.AutoReconnect + opts.ClientID = brokerConfig.ClientId + if len(brokerConfig.ConnectTimeout) > 0 { + duration, err := time.ParseDuration(brokerConfig.ConnectTimeout) + if err != nil { + return nil, fmt.Errorf("invalid MQTT ConnectTimeout '%s': %s", brokerConfig.ConnectTimeout, err.Error()) + } + opts.ConnectTimeout = duration + } + opts.KeepAlive = brokerConfig.KeepAlive + opts.Servers = []*url.URL{brokerUrl} + + mqttFactory := secure.NewMqttFactory( + logger, + trigger.secretProvider, + brokerConfig.AuthMode, + brokerConfig.SecretPath, + brokerConfig.SkipCertVerify, + ) + + mqttClient, err := mqttFactory.Create(opts) + if err != nil { + return nil, fmt.Errorf("unable to create secure MQTT Client: %s", err.Error()) + } + + logger.Info(fmt.Sprintf("Connecting to mqtt broker for MQTT trigger at: %s", brokerUrl)) + + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { + return nil, fmt.Errorf("could not connect to broker for MQTT trigger: %s", token.Error().Error()) + } + logger.Info("Connected to mqtt server for MQTT trigger") + + if token := mqttClient.Subscribe(topic, brokerConfig.QoS, trigger.messageHandler); token.Wait() && token.Error() != nil { + mqttClient.Disconnect(0) + return nil, fmt.Errorf("could not subscribe to topic '%s' for MQTT trigger: %s", topic, token.Error().Error()) + } + + logger.Info(fmt.Sprintf("Subscribed to topic '%s' for MQTT trigger", topic)) + + deferred := func() { + logger.Info("Disconnecting from broker for MQTT trigger") + trigger.mqttClient.Disconnect(0) + } + + trigger.mqttClient = mqttClient + + return deferred, nil +} + +func (trigger *Trigger) messageHandler(client pahoMqtt.Client, message pahoMqtt.Message) { + // Convenience short cuts + logger := trigger.edgeXClients.LoggingClient + brokerConfig := trigger.configuration.MqttBroker + topic := trigger.configuration.Binding.PublishTopic + + data := message.Payload() + contentType := clients.ContentTypeJSON + if data[0] != byte('{') { + // If not JSON then assume it is CBOR + contentType = clients.ContentTypeCBOR + } + + correlationID := uuid.New().String() + + edgexContext := &appcontext.Context{ + CorrelationID: correlationID, + Configuration: trigger.configuration, + LoggingClient: trigger.edgeXClients.LoggingClient, + EventClient: trigger.edgeXClients.EventClient, + ValueDescriptorClient: trigger.edgeXClients.ValueDescriptorClient, + CommandClient: trigger.edgeXClients.CommandClient, + NotificationsClient: trigger.edgeXClients.NotificationsClient, + } + + logger.Trace("Received message from MQTT Trigger", clients.CorrelationHeader, correlationID) + logger.Debug(fmt.Sprintf("Received message from MQTT Trigger with %d bytes", len(data)), clients.ContentType, contentType) + + envelope := types.MessageEnvelope{ + CorrelationID: correlationID, + ContentType: contentType, + Payload: data, + } + + messageError := trigger.runtime.ProcessMessage(edgexContext, envelope) + if messageError != nil { + // ProcessMessage logs the error, so no need to log it here. + // ToDo: Do we want to publish the error back to the Broker? + return + } + + if len(edgexContext.OutputData) > 0 && len(topic) > 0 { + if token := client.Publish(topic, brokerConfig.QoS, brokerConfig.Retain, edgexContext.OutputData); token.Wait() && token.Error() != nil { + logger.Error("could not publish to topic '%s' for MQTT trigger: %s", topic, token.Error().Error()) + } else { + logger.Trace("Sent MQTT Trigger response message", clients.CorrelationHeader, correlationID) + logger.Debug(fmt.Sprintf("Sent MQTT Trigger response message on topic '%s' with %d bytes", topic, len(edgexContext.OutputData))) + } + } +} diff --git a/internal/webserver/server_test.go b/internal/webserver/server_test.go index 12737b6f6..998094635 100644 --- a/internal/webserver/server_test.go +++ b/internal/webserver/server_test.go @@ -109,7 +109,7 @@ func TestConfigureAndConfigRoute(t *testing.T) { rr := httptest.NewRecorder() webserver.router.ServeHTTP(rr, req) - expected := `{"Writable":{"LogLevel":"","Pipeline":{"ExecutionOrder":"","UseTargetTypeOfByteArray":false,"Functions":null},"StoreAndForward":{"Enabled":false,"RetryInterval":"","MaxRetryCount":0},"InsecureSecrets":null},"Logging":{"EnableRemote":false,"File":""},"Registry":{"Host":"","Port":0,"Type":""},"Service":{"BootTimeout":"","CheckInterval":"","Host":"","HTTPSCert":"","HTTPSKey":"","ServerBindAddr":"","Port":0,"Protocol":"","StartupMsg":"","ReadMaxLimit":0,"Timeout":""},"MessageBus":{"PublishHost":{"Host":"","Port":0,"Protocol":""},"SubscribeHost":{"Host":"","Port":0,"Protocol":""},"Type":"","Optional":null},"Binding":{"Type":"","SubscribeTopic":"","PublishTopic":""},"ApplicationSettings":null,"Clients":null,"Database":{"Type":"","Host":"","Port":0,"Timeout":"","Username":"","Password":"","MaxIdle":0,"BatchSize":0},"SecretStore":{"Host":"","Port":0,"Path":"","Protocol":"","Namespace":"","RootCaCertPath":"","ServerName":"","Authentication":{"AuthType":"","AuthToken":""},"AdditionalRetryAttempts":0,"RetryWaitPeriod":"","TokenFile":""},"SecretStoreExclusive":{"Host":"","Port":0,"Path":"","Protocol":"","Namespace":"","RootCaCertPath":"","ServerName":"","Authentication":{"AuthType":"","AuthToken":""},"AdditionalRetryAttempts":0,"RetryWaitPeriod":"","TokenFile":""}}` + "\n" + expected := `{"Writable":{"LogLevel":"","Pipeline":{"ExecutionOrder":"","UseTargetTypeOfByteArray":false,"Functions":null},"StoreAndForward":{"Enabled":false,"RetryInterval":"","MaxRetryCount":0},"InsecureSecrets":null},"Logging":{"EnableRemote":false,"File":""},"Registry":{"Host":"","Port":0,"Type":""},"Service":{"BootTimeout":"","CheckInterval":"","Host":"","HTTPSCert":"","HTTPSKey":"","ServerBindAddr":"","Port":0,"Protocol":"","StartupMsg":"","ReadMaxLimit":0,"Timeout":""},"MessageBus":{"PublishHost":{"Host":"","Port":0,"Protocol":""},"SubscribeHost":{"Host":"","Port":0,"Protocol":""},"Type":"","Optional":null},"MqttBroker":{"Url":"","ClientId":"","ConnectTimeout":"","AutoReconnect":false,"KeepAlive":0,"QoS":0,"Retain":false,"SkipCertVerify":false,"SecretPath":"","AuthMode":""},"Binding":{"Type":"","SubscribeTopic":"","PublishTopic":""},"ApplicationSettings":null,"Clients":null,"Database":{"Type":"","Host":"","Port":0,"Timeout":"","Username":"","Password":"","MaxIdle":0,"BatchSize":0},"SecretStore":{"Host":"","Port":0,"Path":"","Protocol":"","Namespace":"","RootCaCertPath":"","ServerName":"","Authentication":{"AuthType":"","AuthToken":""},"AdditionalRetryAttempts":0,"RetryWaitPeriod":"","TokenFile":""},"SecretStoreExclusive":{"Host":"","Port":0,"Path":"","Protocol":"","Namespace":"","RootCaCertPath":"","ServerName":"","Authentication":{"AuthType":"","AuthToken":""},"AdditionalRetryAttempts":0,"RetryWaitPeriod":"","TokenFile":""}}` + "\n" body := rr.Body.String() assert.Equal(t, expected, body) diff --git a/pkg/secure/mqttfactory.go b/pkg/secure/mqttfactory.go new file mode 100644 index 000000000..75d44b3de --- /dev/null +++ b/pkg/secure/mqttfactory.go @@ -0,0 +1,184 @@ +// +// Copyright (c) 2020 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package secure + +import ( + "crypto/tls" + "crypto/x509" + "errors" + + "github.com/eclipse/paho.mqtt.golang" + "github.com/edgexfoundry/go-mod-core-contracts/clients/logger" + + "github.com/edgexfoundry/app-functions-sdk-go/internal/security" +) + +type mqttSecrets struct { + username string + password string + keyPemBlock []byte + certPemBlock []byte + caPemBlock []byte +} + +const ( + AuthModeNone = "none" + AuthModeUsernamePassword = "usernamepassword" + AuthModeCert = "clientcert" + AuthModeCA = "cacert" + // Name of the keys to look for in secret provider + MQTTSecretUsername = "username" + MQTTSecretPassword = "password" + MQTTSecretClientKey = "clientkey" + MQTTSecretClientCert = AuthModeCert + MQTTSecretCACert = AuthModeCA +) + +type MqttFactory struct { + logger logger.LoggingClient + secretProvider security.SecretProvider + authMode string + secretPath string + opts *mqtt.ClientOptions + skipCertVerify bool +} + +func NewMqttFactory(lc logger.LoggingClient, sp security.SecretProvider, mode string, path string, skipVerify bool) MqttFactory { + return MqttFactory{ + logger: lc, + secretProvider: sp, + authMode: mode, + secretPath: path, + skipCertVerify: skipVerify, + } +} + +func (factory MqttFactory) Create(opts *mqtt.ClientOptions) (mqtt.Client, error) { + if factory.authMode == "" { + factory.authMode = AuthModeNone + factory.logger.Warn("AuthMode not set, defaulting to \"" + AuthModeNone + "\"") + } + + factory.opts = opts + + //get the secrets from the secret provider and populate the struct + secrets, err := factory.getSecrets() + if err != nil { + return nil, err + } + //ensure that the authmode selected has the required secret values + if secrets != nil { + err = factory.validateSecrets(*secrets) + if err != nil { + return nil, err + } + // configure the mqtt client with the retrieved secret values + err = factory.configureMQTTClientForAuth(*secrets) + if err != nil { + return nil, err + } + } + + return mqtt.NewClient(factory.opts), nil +} + +func (factory MqttFactory) getSecrets() (*mqttSecrets, error) { + // No Auth? No Problem!...No secrets required. + if factory.authMode == AuthModeNone { + return nil, nil + } + + secrets, err := factory.secretProvider.GetSecrets(factory.secretPath) + if err != nil { + return nil, err + } + mqttSecrets := &mqttSecrets{ + username: secrets[MQTTSecretUsername], + password: secrets[MQTTSecretPassword], + keyPemBlock: []byte(secrets[MQTTSecretClientKey]), + certPemBlock: []byte(secrets[MQTTSecretClientCert]), + caPemBlock: []byte(secrets[MQTTSecretCACert]), + } + + return mqttSecrets, nil +} + +func (factory MqttFactory) validateSecrets(secrets mqttSecrets) error { + caCertPool := x509.NewCertPool() + if factory.authMode == AuthModeUsernamePassword { + if secrets.username == "" || secrets.password == "" { + return errors.New("AuthModeUsernamePassword selected however Username or Password was not found at secret path") + } + + } else if factory.authMode == AuthModeCert { + // need both to make a successful connection + if len(secrets.keyPemBlock) <= 0 || len(secrets.certPemBlock) <= 0 { + return errors.New("AuthModeCert selected however the key or cert PEM block was not found at secret path") + } + } else if factory.authMode == AuthModeCA { + if len(secrets.caPemBlock) <= 0 { + return errors.New("AuthModeCA selected however no PEM Block was found at secret path") + } + } else if factory.authMode != AuthModeNone { + return errors.New("Invalid AuthMode selected") + } + + if len(secrets.caPemBlock) > 0 { + ok := caCertPool.AppendCertsFromPEM(secrets.caPemBlock) + if !ok { + return errors.New("Error parsing CA Certificate") + } + } + + return nil +} + +func (factory MqttFactory) configureMQTTClientForAuth(secrets mqttSecrets) error { + var cert tls.Certificate + var err error + caCertPool := x509.NewCertPool() + tlsConfig := &tls.Config{ + InsecureSkipVerify: factory.skipCertVerify, + } + switch factory.authMode { + case AuthModeUsernamePassword: + factory.opts.SetUsername(secrets.username) + factory.opts.SetPassword(secrets.password) + case AuthModeCert: + cert, err = tls.X509KeyPair(secrets.certPemBlock, secrets.keyPemBlock) + if err != nil { + return err + } + tlsConfig.Certificates = []tls.Certificate{cert} + case AuthModeCA: + break + case AuthModeNone: + return nil + } + + if len(secrets.caPemBlock) > 0 { + ok := caCertPool.AppendCertsFromPEM(secrets.caPemBlock) + if !ok { + return errors.New("Error parsing CA PEM block") + } + tlsConfig.ClientCAs = caCertPool + } + + factory.opts.SetTLSConfig(tlsConfig) + + return nil +} diff --git a/pkg/secure/mqttfactory_test.go b/pkg/secure/mqttfactory_test.go new file mode 100644 index 000000000..c06d118a5 --- /dev/null +++ b/pkg/secure/mqttfactory_test.go @@ -0,0 +1,332 @@ +// +// Copyright (c) 2020 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// This test will only be executed if the tag brokerRunning is added when running +// the tests with a command like: +// go test -tags brokerRunning +package secure + +import ( + "errors" + "testing" + + "github.com/eclipse/paho.mqtt.golang" + "github.com/edgexfoundry/go-mod-core-contracts/clients/logger" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/edgexfoundry/app-functions-sdk-go/internal/security" +) + +const testCACert = `-----BEGIN CERTIFICATE----- +MIIDhTCCAm2gAwIBAgIUQl1RUGewZOXaSLnmH1i12zSYOtswDQYJKoZIhvcNAQEL +BQAwUjELMAkGA1UEBhMCVVMxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDELMAkGA1UEAwwCY2EwHhcNMjAwNDA4 +MDExNDQ2WhcNMjUwNDA4MDExNDQ2WjBSMQswCQYDVQQGEwJVUzETMBEGA1UECAwK +U29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMQsw +CQYDVQQDDAJjYTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAOqslFtX +nxr6yBZdLDKp1iTmsnFreEit7Z1BnNy9vQW6xrKRH+nxZWr0n9UIbx7KtmFkSBQ9 +Bb5zC/3ZdjcuQAuKSTgQB7AP1D2dX6geJPo1Ph9NS0aVmuUqQ6dU+/4R5ATfoWag +M7slCixfkBzbHEh0mCqr7FoDWq2h+Cz2n8K85tBZjLyUuzyRaqH7ZkHfJD1cxkGK +FcwudCg4zpKYOSctm+JpTlF6YPjlngN79jaJIQEAmx/twv1lOCAGBw/hZM3FGmQx +5dA1W7qaJ6NHgNRXWRS1AERtHpAAsWNBT1CKuAS/j0PlreRyR3aMgQYQ5camxi9a +qCrMiHybaqj+UCkCAwEAAaNTMFEwHQYDVR0OBBYEFPNCbvrfw2QDoOyYfNjT9sNO +52xOMB8GA1UdIwQYMBaAFPNCbvrfw2QDoOyYfNjT9sNO52xOMA8GA1UdEwEB/wQF +MAMBAf8wDQYJKoZIhvcNAQELBQADggEBAHdFTqe6vi3BzgOMJEMO+81ZmiMohgKZ +Alyo8wH1C5RgwWW5w1OU+2RQfdOZgDfFkuQzmj0Kt2gzqACuAEtKzDt78lJ4f+WZ +MmRKBudJONUHTTm1micK3pqmn++nSygag0KxDvVbL+stSEgZwEBSOEvGDPXrL5qs +5yVOCi4xvsOCa1ymSnW6sX0z5GcgJQj2Znrr5QbEKHFSG86+WYEYnZ2zCNV7ahQo +bwXGZPOCUkpQzOstie/lPsf3Sd13/NIAk23TQ+rtaWIP9syQ85XWGRKRAUFOJEK0 +2/jr0Xot+Y/3raEfNSrq6sHTzX1q4PoWkSwNEEGXifBqDr+9PXK3mOQ= +-----END CERTIFICATE----- +` +const testClientCert = `-----BEGIN CERTIFICATE----- +MIIDLzCCAhcCFG+y+oEr87O2iQH90ayO4hU/GvSqMA0GCSqGSIb3DQEBCwUAMFIx +CzAJBgNVBAYTAlVTMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRl +cm5ldCBXaWRnaXRzIFB0eSBMdGQxCzAJBgNVBAMMAmNhMB4XDTIwMDQwODAxMTY1 +OVoXDTIxMDQwMzAxMTY1OVowVjELMAkGA1UEBhMCVVMxEzARBgNVBAgMClNvbWUt +U3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UE +AwwGY2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4TlobJoF +gNoCc5Znb0OzVoMypoay1RSTAhnU0arpHVugUMZMO6oxSt371MN+e4cUxoes4uhN +qeVG7AxUkdMCNJbzjAmJeDQtLKYHcY4YI30HHWCW0c8SxEsrj6DzjizgKZcUdX4H +6HwAltOp/RZYJTBVVexE1WYOheTNJuw5QeNbTGpfpKM7RuHADnytLbrSiK09FZYx +23PIsLhx8b7+k1AtRFGhFqDRMF6Fqbo6xdU8hZ1eAvJP5t87U/PWeQ9ld2lxd3fQ +xiP4IBQs1QI2gTp5O41ifRCpO7scXRaFweyPAgMVOQ42eVZiJUR37AF/nVzXxB5N +iTH9Ij/c/shJvQIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQDZ1tvo2JbA27qs+DzH +PQudMgCPqHylnqlbX94FtKrIh6kP4YwrMNoOCdcU/MHGG2b3ldoMgx9qrTnkk8g1 +3/gX/r4MDiTw2LocmIPYSukfR0J4k0ijlZtbtr9EtNPvy5iSla8Xi+iSm70wj+Zi +Z0GE0gOi8JfYPlxCtw3uVpsdqaHEevI70D4H1yAG22YYXUZt0QK02zztgBA2c7nE +kX0EMnYch0e7urs9o1M6JWJGlWZQxgVnxekbFDPfRelR1m0zFnbfXG2rnfuRpVEL +6SGxFU8+v1VepAHLvhS2VULYbWBOHZsh1yCteUXdePMYIN7c71qaCyC89N3GBia5 +uXOR +-----END CERTIFICATE----- +` +const testClientKey = `-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEA4TlobJoFgNoCc5Znb0OzVoMypoay1RSTAhnU0arpHVugUMZM +O6oxSt371MN+e4cUxoes4uhNqeVG7AxUkdMCNJbzjAmJeDQtLKYHcY4YI30HHWCW +0c8SxEsrj6DzjizgKZcUdX4H6HwAltOp/RZYJTBVVexE1WYOheTNJuw5QeNbTGpf +pKM7RuHADnytLbrSiK09FZYx23PIsLhx8b7+k1AtRFGhFqDRMF6Fqbo6xdU8hZ1e +AvJP5t87U/PWeQ9ld2lxd3fQxiP4IBQs1QI2gTp5O41ifRCpO7scXRaFweyPAgMV +OQ42eVZiJUR37AF/nVzXxB5NiTH9Ij/c/shJvQIDAQABAoIBADPL4BgZ0+ouOSIc +FO2hxDzBL4TctYQLl0OEbU1K4RG/YL8y25VdLrjpFGF6FDyUdFK0IS6N/k50TDs9 +GrXusTMnBBvQlazvUvRRuqSC6UpAFsLK0+SsmsRKBVqiyWCJMYRfGnVq5qaw3fHR +++YYnWzwELASBkKNlgl09TleWkysbnZbWIMQ5Qm0k+s/9vvjooA2aMXTeLtyhGfI +49OvyCrrX5v7ILdHl7RGAyPRT+ipyt1i0fAqHk4ouLdTRrAx4S5TvUpszrts1P8f +5ggLd1s6RVTz27uASu3U/gLH630m1PU46d02UI1tWen3TgRm/VqjO2aqkZaZispQ +HwTRZIECgYEA9rL7KoZflVQJ4ndg3V522BhAciN99taYWHr018kG5vNVGFBHSVOt +De0gb7z8FhK0Zs4MifU3b03qr7Ac1+p0zIAwATPT4TOLzc4SKBd33TZk/JCZCGSR +hqQPF0FZ+EKJqh7yif+ssFXp0xKrNybm58Z7jfF8vWMdz0QkJ1pZkn8CgYEA6bcp +YkH6IoHmCZ5hWE3/hYQcvfcM10z0cWTTKstxgSid9dj0HUqxMsFhBF1yzUtsDZQB +E933gZyj/LE5Z/EbqUSX0H/M0P7Uwtj9lS7W/vQdOQMfAciqggNKhyaBnBYsxw9l +5IelOxGF+taEvDkPsVt9cvZm/nbf+irU5JLCzcMCgYEA8o3/jUwY5oV+QoAFaSHb +z5PoqVBkJTHREA20dgVdF+3fmMw1is8Os0aWQcaaREmXvgyRH4NOQc1mFd8ePNx0 +giz3BfejNySrLGqUR37rh0BYAktZa3sV6j+b5s2GXCVvnShYZ35OmAGgqLsORGen +V/M6v9DTSJIPWR4yPc8DipkCgYEAhmtW/PFPaRtm7+9Ms5ogtWz3jvaRRx82lCVW +Io3iGVQADc8bD+HOqo94Oid5CMQxQFn4iLGoUb6Cvqo7hyGwNBmEa2GlripyuiJN +LslC1F4YlJrL8Z21G5PDAJpP/zLtzAt6Igc2LBP3B/7rVspG0U36h+1Z7U73oQ2T +ZmdWbTsCgYALxjB0NvqBk+TNYMZFysqZnI3CxYQXwHfElQQQUqcQnunAOLJ8H+nb +JryGx90ylYY2Mh2U273435uwQcX1g5gu3rBF8McHKj5EYSVDgpeBMx8ej2ENvW7q +CR6KVnoNdMwJZM3ARpBYNlhFTzDyew2WYLitZsN/uV8t+XxJFDyJQA== +-----END RSA PRIVATE KEY----- +` + +func TestValidateSecrets(t *testing.T) { + target := NewMqttFactory(logger.NewMockClient(), nil, "", "", false) + tests := []struct { + Name string + AuthMode string + secrets mqttSecrets + ErrorExpectation bool + ErrorMessage string + }{ + {"Invalid AuthMode", "BadAuthMode", mqttSecrets{}, true, "Invalid AuthMode selected"}, + {"No Auth No error", AuthModeNone, mqttSecrets{}, false, ""}, + {"UsernamePassword No Error", AuthModeUsernamePassword, mqttSecrets{ + username: "user", + password: "Password", + }, false, ""}, + {"UsernamePassword Error no Username", AuthModeUsernamePassword, mqttSecrets{ + password: "Password", + }, true, "AuthModeUsernamePassword selected however Username or Password was not found at secret path"}, + {"UsernamePassword Error no Password", AuthModeUsernamePassword, mqttSecrets{ + username: "user", + }, true, "AuthModeUsernamePassword selected however Username or Password was not found at secret path"}, + {"ClientCert No Error", AuthModeCert, mqttSecrets{ + certPemBlock: []byte("----"), + keyPemBlock: []byte("----"), + }, false, ""}, + {"ClientCert No Key", AuthModeCert, mqttSecrets{ + certPemBlock: []byte("----"), + }, true, "AuthModeCert selected however the key or cert PEM block was not found at secret path"}, + {"ClientCert No Cert", AuthModeCert, mqttSecrets{ + keyPemBlock: []byte("----"), + }, true, "AuthModeCert selected however the key or cert PEM block was not found at secret path"}, + {"CACert no error", AuthModeCA, mqttSecrets{ + caPemBlock: []byte(testCACert), + }, false, ""}, + {"CACert invalid error", AuthModeCA, mqttSecrets{ + caPemBlock: []byte(`------`), + }, true, "Error parsing CA Certificate"}, + {"CACert no ca error", AuthModeCA, mqttSecrets{}, true, "AuthModeCA selected however no PEM Block was found at secret path"}, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + target.authMode = test.AuthMode + result := target.validateSecrets(test.secrets) + if test.ErrorExpectation { + assert.Error(t, result, "Result should be an error") + assert.Equal(t, test.ErrorMessage, result.(error).Error()) + } else { + assert.Nil(t, result, "Should be nil") + } + }) + } +} + +func TestGetSecrets(t *testing.T) { + // setup mock secret client + mockSecretProvider := security.NewSecretProvider(nil, nil) + mockSecretProvider.ExclusiveSecretClient = &mockMQTTSecretClient{} + + target := NewMqttFactory(logger.NewMockClient(), mockSecretProvider, "", "", false) + tests := []struct { + Name string + AuthMode string + SecretPath string + ExpectedSecrets *mqttSecrets + ExpectingError bool + }{ + {"No Auth No error", AuthModeNone, "", nil, false}, + {"Auth No Secrets found", AuthModeCA, "/notfound", nil, true}, + {"Auth With Secrets", AuthModeUsernamePassword, "/mqtt", &mqttSecrets{ + username: "TEST_USER", + password: "TEST_PASS", + keyPemBlock: []uint8{}, + certPemBlock: []uint8{}, + caPemBlock: []uint8{}, + }, false}, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + target.authMode = test.AuthMode + target.secretPath = test.SecretPath + mqttSecrets, err := target.getSecrets() + if test.ExpectingError { + assert.Error(t, err, "Expecting error") + return + } + require.Equal(t, test.ExpectedSecrets, mqttSecrets) + }) + } +} + +func TestConfigureMQTTClientForAuth(t *testing.T) { + target := NewMqttFactory(logger.NewMockClient(), nil, "", "", false) + target.opts = mqtt.NewClientOptions() + tests := []struct { + Name string + AuthMode string + secrets mqttSecrets + ErrorExpectation bool + ErrorMessage string + }{ + {"Username and Password should be set", AuthModeUsernamePassword, mqttSecrets{username: MQTTSecretUsername, password: MQTTSecretPassword}, false, ""}, + {"No AuthMode", AuthModeNone, mqttSecrets{}, false, ""}, + {"Invalid AuthMode", "", mqttSecrets{}, false, ""}, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + target.authMode = test.AuthMode + result := target.configureMQTTClientForAuth(test.secrets) + if test.ErrorExpectation { + assert.Error(t, result, "Result should be an error") + assert.Equal(t, test.ErrorMessage, result.(error).Error()) + } else { + assert.Nil(t, result, "Should be nil") + } + }) + } +} +func TestConfigureMQTTClientForAuthWithUsernamePassword(t *testing.T) { + target := NewMqttFactory(logger.NewMockClient(), nil, "", "", false) + target.opts = mqtt.NewClientOptions() + target.authMode = AuthModeUsernamePassword + err := target.configureMQTTClientForAuth(mqttSecrets{ + username: "Username", + password: "Password", + }) + require.NoError(t, err) + assert.Equal(t, target.opts.Username, "Username") + assert.Equal(t, target.opts.Password, "Password") + assert.Nil(t, target.opts.TLSConfig.ClientCAs) + assert.Nil(t, target.opts.TLSConfig.Certificates) + +} +func TestConfigureMQTTClientForAuthWithUsernamePasswordAndCA(t *testing.T) { + target := NewMqttFactory(logger.NewMockClient(), nil, "", "", false) + target.opts = mqtt.NewClientOptions() + target.authMode = AuthModeUsernamePassword + err := target.configureMQTTClientForAuth(mqttSecrets{ + username: "Username", + password: "Password", + caPemBlock: []byte(testCACert), + }) + require.NoError(t, err) + assert.Equal(t, target.opts.Username, "Username") + assert.Equal(t, target.opts.Password, "Password") + assert.Nil(t, target.opts.TLSConfig.Certificates) + assert.NotNil(t, target.opts.TLSConfig.ClientCAs) +} + +func TestConfigureMQTTClientForAuthWithCACert(t *testing.T) { + target := NewMqttFactory(logger.NewMockClient(), nil, "", "", false) + target.opts = mqtt.NewClientOptions() + target.authMode = AuthModeCA + err := target.configureMQTTClientForAuth(mqttSecrets{ + username: "Username", + password: "Password", + caPemBlock: []byte(testCACert), + }) + + require.NoError(t, err) + assert.NotNil(t, target.opts.TLSConfig.ClientCAs) + assert.Empty(t, target.opts.Username) + assert.Empty(t, target.opts.Password) + assert.Nil(t, target.opts.TLSConfig.Certificates) +} +func TestConfigureMQTTClientForAuthWithClientCert(t *testing.T) { + target := NewMqttFactory(logger.NewMockClient(), nil, "", "", false) + target.opts = mqtt.NewClientOptions() + target.authMode = AuthModeCert + err := target.configureMQTTClientForAuth(mqttSecrets{ + username: "Username", + password: "Password", + certPemBlock: []byte(testClientCert), + keyPemBlock: []byte(testClientKey), + caPemBlock: []byte(testCACert), + }) + require.NoError(t, err) + assert.Empty(t, target.opts.Username) + assert.Empty(t, target.opts.Password) + assert.NotNil(t, target.opts.TLSConfig.Certificates) + assert.NotNil(t, target.opts.TLSConfig.ClientCAs) +} + +func TestConfigureMQTTClientForAuthWithClientCertNoCA(t *testing.T) { + target := NewMqttFactory(logger.NewMockClient(), nil, "", "", false) + target.opts = mqtt.NewClientOptions() + target.authMode = AuthModeCert + err := target.configureMQTTClientForAuth(mqttSecrets{ + username: MQTTSecretUsername, + password: MQTTSecretPassword, + certPemBlock: []byte(testClientCert), + keyPemBlock: []byte(testClientKey), + }) + + require.NoError(t, err) + assert.Empty(t, target.opts.Username) + assert.Empty(t, target.opts.Password) + assert.NotNil(t, target.opts.TLSConfig.Certificates) + assert.Nil(t, target.opts.TLSConfig.ClientCAs) +} +func TestConfigureMQTTClientForAuthWithNone(t *testing.T) { + target := NewMqttFactory(logger.NewMockClient(), nil, "", "", false) + target.opts = mqtt.NewClientOptions() + target.authMode = AuthModeNone + err := target.configureMQTTClientForAuth(mqttSecrets{}) + + require.NoError(t, err) +} + +type mockMQTTSecretClient struct { +} + +// GetSecrets mock implementation of GetSecrets +func (s *mockMQTTSecretClient) GetSecrets(path string, _ ...string) (map[string]string, error) { + if path == "/notfound" { + return nil, errors.New("") + } + fakeDb := map[string]string{MQTTSecretUsername: "TEST_USER", MQTTSecretPassword: "TEST_PASS"} + return fakeDb, nil +} + +// StoreSecrets mock implementation of StoreSecrets +func (s *mockMQTTSecretClient) StoreSecrets(_ string, _ map[string]string) error { + return nil +} diff --git a/pkg/transforms/mqttsecret.go b/pkg/transforms/mqttsecret.go index a7670192c..dcbc1777f 100644 --- a/pkg/transforms/mqttsecret.go +++ b/pkg/transforms/mqttsecret.go @@ -17,8 +17,6 @@ package transforms import ( - "crypto/tls" - "crypto/x509" "errors" "fmt" "strings" @@ -26,9 +24,11 @@ import ( "time" MQTT "github.com/eclipse/paho.mqtt.golang" + "github.com/edgexfoundry/go-mod-core-contracts/clients" + "github.com/edgexfoundry/app-functions-sdk-go/appcontext" + "github.com/edgexfoundry/app-functions-sdk-go/pkg/secure" "github.com/edgexfoundry/app-functions-sdk-go/pkg/util" - "github.com/edgexfoundry/go-mod-core-contracts/clients" ) // MQTTSecretSender ... @@ -63,26 +63,6 @@ type MQTTSecretConfig struct { // If a CA Cert exists in the SecretPath then it will be used for all modes except "none". AuthMode string } -type mqttSecrets struct { - username string - password string - keypemblock []byte - certpemblock []byte - capemblock []byte -} - -const ( - AuthModeNone = "none" - AuthModeUsernamePassword = "usernamepassword" - AuthModeCert = "clientcert" - AuthModeCA = "cacert" - // Name of the keys to look for in secret provider - MQTTSecretUsername = "username" - MQTTSecretPassword = "password" - MQTTSecretClientKey = "clientkey" - MQTTSecretClientCert = AuthModeCert - MQTTSecretCACert = AuthModeCA -) // NewMQTTSecretSender ... func NewMQTTSecretSender(mqttConfig MQTTSecretConfig, persistOnError bool) *MQTTSecretSender { @@ -102,89 +82,7 @@ func NewMQTTSecretSender(mqttConfig MQTTSecretConfig, persistOnError bool) *MQTT return sender } -func (sender *MQTTSecretSender) getSecrets(edgexcontext *appcontext.Context) (*mqttSecrets, error) { - // No Auth? No Problem!...No secrets required. - if sender.mqttConfig.AuthMode == AuthModeNone { - return nil, nil - } - - secrets, err := edgexcontext.GetSecrets(sender.mqttConfig.SecretPath) - if err != nil { - return nil, err - } - mqttSecrets := &mqttSecrets{ - username: secrets[MQTTSecretUsername], - password: secrets[MQTTSecretPassword], - keypemblock: []byte(secrets[MQTTSecretClientKey]), - certpemblock: []byte(secrets[MQTTSecretClientCert]), - capemblock: []byte(secrets[MQTTSecretCACert]), - } - return mqttSecrets, nil -} -func (sender *MQTTSecretSender) validateSecrets(secrets mqttSecrets) error { - caCertPool := x509.NewCertPool() - if sender.mqttConfig.AuthMode == AuthModeUsernamePassword { - if secrets.username == "" || secrets.password == "" { - return errors.New("AuthModeUsernamePassword selected however username or password was not found at secret path") - } - - } else if sender.mqttConfig.AuthMode == AuthModeCert { - // need both to make a successful connection - if len(secrets.keypemblock) <= 0 || len(secrets.certpemblock) <= 0 { - return errors.New("AuthModeCert selected however the key or cert PEM block was not found at secret path") - } - } else if sender.mqttConfig.AuthMode == AuthModeCA { - if len(secrets.capemblock) <= 0 { - return errors.New("AuthModeCA selected however no PEM Block was found at secret path") - } - } else if sender.mqttConfig.AuthMode != AuthModeNone { - return errors.New("Invalid AuthMode selected") - } - - if len(secrets.capemblock) > 0 { - ok := caCertPool.AppendCertsFromPEM([]byte(secrets.capemblock)) - if !ok { - return errors.New("Error parsing CA Certificate") - } - } - return nil -} -func (sender *MQTTSecretSender) configureMQTTClientForAuth(secrets mqttSecrets) error { - var cert tls.Certificate - var err error - caCertPool := x509.NewCertPool() - tlsConfig := &tls.Config{ - InsecureSkipVerify: sender.mqttConfig.SkipCertVerify, - } - switch sender.mqttConfig.AuthMode { - case AuthModeUsernamePassword: - sender.opts.SetUsername(secrets.username) - sender.opts.SetPassword(secrets.password) - case AuthModeCert: - cert, err = tls.X509KeyPair(secrets.certpemblock, secrets.keypemblock) - if err != nil { - return err - } - tlsConfig.Certificates = []tls.Certificate{cert} - case AuthModeCA: - break - case AuthModeNone: - return nil - } - - if len(secrets.capemblock) > 0 { - ok := caCertPool.AppendCertsFromPEM(secrets.capemblock) - if !ok { - return errors.New("Error parsing CA PEM block") - } - tlsConfig.ClientCAs = caCertPool - } - - sender.opts.SetTLSConfig(tlsConfig) - - return nil -} func (sender *MQTTSecretSender) initializeMQTTClient(edgexcontext *appcontext.Context) error { sender.lock.Lock() defer sender.lock.Unlock() @@ -195,31 +93,22 @@ func (sender *MQTTSecretSender) initializeMQTTClient(edgexcontext *appcontext.Co return nil } - if sender.mqttConfig.AuthMode == "" { - sender.mqttConfig.AuthMode = AuthModeNone - edgexcontext.LoggingClient.Warn("AuthMode not set, defaulting to \"" + AuthModeNone + "\"") - } + mqttFactory := secure.NewMqttFactory( + edgexcontext.LoggingClient, + edgexcontext.SecretProvider, + sender.mqttConfig.AuthMode, + sender.mqttConfig.SecretPath, + sender.mqttConfig.SkipCertVerify, + ) - //get the secrets from the secret provider and populate the struct - secrets, err := sender.getSecrets(edgexcontext) + client, err := mqttFactory.Create(sender.opts) if err != nil { return err } - //ensure that the authmode selected has the required secret values - if secrets != nil { - err = sender.validateSecrets(*secrets) - if err != nil { - return err - } - // configure the mqtt client with the retrieved secret values - err = sender.configureMQTTClientForAuth(*secrets) - if err != nil { - return err - } - } + sender.client = client sender.secretsLastRetrieved = time.Now() - sender.client = MQTT.NewClient(sender.opts) + return nil } diff --git a/pkg/transforms/mqttsecret_test.go b/pkg/transforms/mqttsecret_test.go index 9f551a641..b30b047fb 100644 --- a/pkg/transforms/mqttsecret_test.go +++ b/pkg/transforms/mqttsecret_test.go @@ -20,324 +20,12 @@ package transforms import ( - "errors" "testing" - "github.com/edgexfoundry/app-functions-sdk-go/appcontext" - "github.com/edgexfoundry/app-functions-sdk-go/internal/security" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -const testcacert = `-----BEGIN CERTIFICATE----- -MIIDhTCCAm2gAwIBAgIUQl1RUGewZOXaSLnmH1i12zSYOtswDQYJKoZIhvcNAQEL -BQAwUjELMAkGA1UEBhMCVVMxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM -GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDELMAkGA1UEAwwCY2EwHhcNMjAwNDA4 -MDExNDQ2WhcNMjUwNDA4MDExNDQ2WjBSMQswCQYDVQQGEwJVUzETMBEGA1UECAwK -U29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMQsw -CQYDVQQDDAJjYTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAOqslFtX -nxr6yBZdLDKp1iTmsnFreEit7Z1BnNy9vQW6xrKRH+nxZWr0n9UIbx7KtmFkSBQ9 -Bb5zC/3ZdjcuQAuKSTgQB7AP1D2dX6geJPo1Ph9NS0aVmuUqQ6dU+/4R5ATfoWag -M7slCixfkBzbHEh0mCqr7FoDWq2h+Cz2n8K85tBZjLyUuzyRaqH7ZkHfJD1cxkGK -FcwudCg4zpKYOSctm+JpTlF6YPjlngN79jaJIQEAmx/twv1lOCAGBw/hZM3FGmQx -5dA1W7qaJ6NHgNRXWRS1AERtHpAAsWNBT1CKuAS/j0PlreRyR3aMgQYQ5camxi9a -qCrMiHybaqj+UCkCAwEAAaNTMFEwHQYDVR0OBBYEFPNCbvrfw2QDoOyYfNjT9sNO -52xOMB8GA1UdIwQYMBaAFPNCbvrfw2QDoOyYfNjT9sNO52xOMA8GA1UdEwEB/wQF -MAMBAf8wDQYJKoZIhvcNAQELBQADggEBAHdFTqe6vi3BzgOMJEMO+81ZmiMohgKZ -Alyo8wH1C5RgwWW5w1OU+2RQfdOZgDfFkuQzmj0Kt2gzqACuAEtKzDt78lJ4f+WZ -MmRKBudJONUHTTm1micK3pqmn++nSygag0KxDvVbL+stSEgZwEBSOEvGDPXrL5qs -5yVOCi4xvsOCa1ymSnW6sX0z5GcgJQj2Znrr5QbEKHFSG86+WYEYnZ2zCNV7ahQo -bwXGZPOCUkpQzOstie/lPsf3Sd13/NIAk23TQ+rtaWIP9syQ85XWGRKRAUFOJEK0 -2/jr0Xot+Y/3raEfNSrq6sHTzX1q4PoWkSwNEEGXifBqDr+9PXK3mOQ= ------END CERTIFICATE----- -` -const testclientcert = `-----BEGIN CERTIFICATE----- -MIIDLzCCAhcCFG+y+oEr87O2iQH90ayO4hU/GvSqMA0GCSqGSIb3DQEBCwUAMFIx -CzAJBgNVBAYTAlVTMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRl -cm5ldCBXaWRnaXRzIFB0eSBMdGQxCzAJBgNVBAMMAmNhMB4XDTIwMDQwODAxMTY1 -OVoXDTIxMDQwMzAxMTY1OVowVjELMAkGA1UEBhMCVVMxEzARBgNVBAgMClNvbWUt -U3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UE -AwwGY2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4TlobJoF -gNoCc5Znb0OzVoMypoay1RSTAhnU0arpHVugUMZMO6oxSt371MN+e4cUxoes4uhN -qeVG7AxUkdMCNJbzjAmJeDQtLKYHcY4YI30HHWCW0c8SxEsrj6DzjizgKZcUdX4H -6HwAltOp/RZYJTBVVexE1WYOheTNJuw5QeNbTGpfpKM7RuHADnytLbrSiK09FZYx -23PIsLhx8b7+k1AtRFGhFqDRMF6Fqbo6xdU8hZ1eAvJP5t87U/PWeQ9ld2lxd3fQ -xiP4IBQs1QI2gTp5O41ifRCpO7scXRaFweyPAgMVOQ42eVZiJUR37AF/nVzXxB5N -iTH9Ij/c/shJvQIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQDZ1tvo2JbA27qs+DzH -PQudMgCPqHylnqlbX94FtKrIh6kP4YwrMNoOCdcU/MHGG2b3ldoMgx9qrTnkk8g1 -3/gX/r4MDiTw2LocmIPYSukfR0J4k0ijlZtbtr9EtNPvy5iSla8Xi+iSm70wj+Zi -Z0GE0gOi8JfYPlxCtw3uVpsdqaHEevI70D4H1yAG22YYXUZt0QK02zztgBA2c7nE -kX0EMnYch0e7urs9o1M6JWJGlWZQxgVnxekbFDPfRelR1m0zFnbfXG2rnfuRpVEL -6SGxFU8+v1VepAHLvhS2VULYbWBOHZsh1yCteUXdePMYIN7c71qaCyC89N3GBia5 -uXOR ------END CERTIFICATE----- -` -const testclientkey = `-----BEGIN RSA PRIVATE KEY----- -MIIEpAIBAAKCAQEA4TlobJoFgNoCc5Znb0OzVoMypoay1RSTAhnU0arpHVugUMZM -O6oxSt371MN+e4cUxoes4uhNqeVG7AxUkdMCNJbzjAmJeDQtLKYHcY4YI30HHWCW -0c8SxEsrj6DzjizgKZcUdX4H6HwAltOp/RZYJTBVVexE1WYOheTNJuw5QeNbTGpf -pKM7RuHADnytLbrSiK09FZYx23PIsLhx8b7+k1AtRFGhFqDRMF6Fqbo6xdU8hZ1e -AvJP5t87U/PWeQ9ld2lxd3fQxiP4IBQs1QI2gTp5O41ifRCpO7scXRaFweyPAgMV -OQ42eVZiJUR37AF/nVzXxB5NiTH9Ij/c/shJvQIDAQABAoIBADPL4BgZ0+ouOSIc -FO2hxDzBL4TctYQLl0OEbU1K4RG/YL8y25VdLrjpFGF6FDyUdFK0IS6N/k50TDs9 -GrXusTMnBBvQlazvUvRRuqSC6UpAFsLK0+SsmsRKBVqiyWCJMYRfGnVq5qaw3fHR -++YYnWzwELASBkKNlgl09TleWkysbnZbWIMQ5Qm0k+s/9vvjooA2aMXTeLtyhGfI -49OvyCrrX5v7ILdHl7RGAyPRT+ipyt1i0fAqHk4ouLdTRrAx4S5TvUpszrts1P8f -5ggLd1s6RVTz27uASu3U/gLH630m1PU46d02UI1tWen3TgRm/VqjO2aqkZaZispQ -HwTRZIECgYEA9rL7KoZflVQJ4ndg3V522BhAciN99taYWHr018kG5vNVGFBHSVOt -De0gb7z8FhK0Zs4MifU3b03qr7Ac1+p0zIAwATPT4TOLzc4SKBd33TZk/JCZCGSR -hqQPF0FZ+EKJqh7yif+ssFXp0xKrNybm58Z7jfF8vWMdz0QkJ1pZkn8CgYEA6bcp -YkH6IoHmCZ5hWE3/hYQcvfcM10z0cWTTKstxgSid9dj0HUqxMsFhBF1yzUtsDZQB -E933gZyj/LE5Z/EbqUSX0H/M0P7Uwtj9lS7W/vQdOQMfAciqggNKhyaBnBYsxw9l -5IelOxGF+taEvDkPsVt9cvZm/nbf+irU5JLCzcMCgYEA8o3/jUwY5oV+QoAFaSHb -z5PoqVBkJTHREA20dgVdF+3fmMw1is8Os0aWQcaaREmXvgyRH4NOQc1mFd8ePNx0 -giz3BfejNySrLGqUR37rh0BYAktZa3sV6j+b5s2GXCVvnShYZ35OmAGgqLsORGen -V/M6v9DTSJIPWR4yPc8DipkCgYEAhmtW/PFPaRtm7+9Ms5ogtWz3jvaRRx82lCVW -Io3iGVQADc8bD+HOqo94Oid5CMQxQFn4iLGoUb6Cvqo7hyGwNBmEa2GlripyuiJN -LslC1F4YlJrL8Z21G5PDAJpP/zLtzAt6Igc2LBP3B/7rVspG0U36h+1Z7U73oQ2T -ZmdWbTsCgYALxjB0NvqBk+TNYMZFysqZnI3CxYQXwHfElQQQUqcQnunAOLJ8H+nb -JryGx90ylYY2Mh2U273435uwQcX1g5gu3rBF8McHKj5EYSVDgpeBMx8ej2ENvW7q -CR6KVnoNdMwJZM3ARpBYNlhFTzDyew2WYLitZsN/uV8t+XxJFDyJQA== ------END RSA PRIVATE KEY----- -` - -func TestMQTTValidateSecrets(t *testing.T) { - sender := NewMQTTSecretSender(MQTTSecretConfig{}, false) - tests := []struct { - Name string - AuthMode string - secrets mqttSecrets - ErrorExpectation bool - ErrorMessage string - }{ - {"Invalid AuthMode", "BadAuthMode", mqttSecrets{}, true, "Invalid AuthMode selected"}, - {"No Auth No error", AuthModeNone, mqttSecrets{}, false, ""}, - {"UsernamePassword No Error", AuthModeUsernamePassword, mqttSecrets{ - username: "user", - password: "password", - }, false, ""}, - {"UsernamePassword Error no username", AuthModeUsernamePassword, mqttSecrets{ - password: "password", - }, true, "AuthModeUsernamePassword selected however username or password was not found at secret path"}, - {"UsernamePassword Error no password", AuthModeUsernamePassword, mqttSecrets{ - username: "user", - }, true, "AuthModeUsernamePassword selected however username or password was not found at secret path"}, - {"ClientCert No Error", AuthModeCert, mqttSecrets{ - certpemblock: []byte("----"), - keypemblock: []byte("----"), - }, false, ""}, - {"ClientCert No Key", AuthModeCert, mqttSecrets{ - certpemblock: []byte("----"), - }, true, "AuthModeCert selected however the key or cert PEM block was not found at secret path"}, - {"ClientCert No Cert", AuthModeCert, mqttSecrets{ - keypemblock: []byte("----"), - }, true, "AuthModeCert selected however the key or cert PEM block was not found at secret path"}, - {"CACert no error", AuthModeCA, mqttSecrets{ - capemblock: []byte(testcacert), - }, false, ""}, - {"CACert invalid error", AuthModeCA, mqttSecrets{ - capemblock: []byte(`------`), - }, true, "Error parsing CA Certificate"}, - {"CACert no ca error", AuthModeCA, mqttSecrets{}, true, "AuthModeCA selected however no PEM Block was found at secret path"}, - } - - for _, test := range tests { - t.Run(test.Name, func(t *testing.T) { - sender.mqttConfig = MQTTSecretConfig{ - AuthMode: test.AuthMode, - } - result := sender.validateSecrets(test.secrets) - if test.ErrorExpectation { - assert.Error(t, result, "Result should be an error") - assert.Equal(t, test.ErrorMessage, result.(error).Error()) - } else { - assert.Nil(t, result, "Should be nil") - } - }) - } -} - -func TestMQTTClientGetSecrets(t *testing.T) { - sender := NewMQTTSecretSender(MQTTSecretConfig{}, false) - tests := []struct { - Name string - AuthMode string - SecretPath string - ExpectedSecrets *mqttSecrets - ExpectingError bool - }{ - {"No Auth No error", AuthModeNone, "", nil, false}, - {"Auth No Secrets found", AuthModeCA, "/notfound", nil, true}, - {"Auth With Secrets", AuthModeUsernamePassword, "/mqtt", &mqttSecrets{ - username: "TEST_USER", - password: "TEST_PASS", - keypemblock: []uint8{}, - certpemblock: []uint8{}, - capemblock: []uint8{}, - }, false}, - } - // setup mock secret client - mockSecretProvider := security.NewSecretProvider(nil, nil) - mockSecretProvider.ExclusiveSecretClient = &mockMQTTSecretClient{} - context := &appcontext.Context{ - SecretProvider: mockSecretProvider, - } - for _, test := range tests { - t.Run(test.Name, func(t *testing.T) { - sender.mqttConfig = MQTTSecretConfig{ - AuthMode: test.AuthMode, - SecretPath: test.SecretPath, - } - mqttSecrets, err := sender.getSecrets(context) - if test.ExpectingError { - assert.Error(t, err, "Expecting error") - return - } - require.Equal(t, test.ExpectedSecrets, mqttSecrets) - }) - } -} - -type mockMQTTSecretClient struct { -} - -// GetSecrets mock implementation of GetSecrets -func (s *mockMQTTSecretClient) GetSecrets(path string, keys ...string) (map[string]string, error) { - if path == "/notfound" { - return nil, errors.New("") - } - fakeDb := map[string]string{"username": "TEST_USER", "password": "TEST_PASS"} - return fakeDb, nil -} - -// StoreSecrets mock implementation of StoreSecrets -func (s *mockMQTTSecretClient) StoreSecrets(path string, secrets map[string]string) error { - return nil -} - -func TestConfigureMQTTClient(t *testing.T) { - sender := NewMQTTSecretSender(MQTTSecretConfig{}, false) - sender.mqttConfig = MQTTSecretConfig{} - tests := []struct { - Name string - AuthMode string - secrets mqttSecrets - ErrorExpectation bool - ErrorMessage string - }{ - {"Username and password should be set", AuthModeUsernamePassword, mqttSecrets{username: "username", password: "password"}, false, ""}, - {"No AuthMode", AuthModeNone, mqttSecrets{}, false, ""}, - {"Invalid AuthMode", "", mqttSecrets{}, false, ""}, - } - - for _, test := range tests { - t.Run(test.Name, func(t *testing.T) { - sender.mqttConfig = MQTTSecretConfig{ - AuthMode: test.AuthMode, - } - result := sender.configureMQTTClientForAuth(test.secrets) - if test.ErrorExpectation { - assert.Error(t, result, "Result should be an error") - assert.Equal(t, test.ErrorMessage, result.(error).Error()) - } else { - assert.Nil(t, result, "Should be nil") - } - }) - } -} -func TestConfigureMQTTClientWithUsernamePassword(t *testing.T) { - sender := NewMQTTSecretSender(MQTTSecretConfig{}, false) - sender.mqttConfig = MQTTSecretConfig{AuthMode: AuthModeUsernamePassword} - err := sender.configureMQTTClientForAuth(mqttSecrets{ - username: "username", - password: "password", - }) - require.NoError(t, err) - assert.Equal(t, sender.opts.Username, "username") - assert.Equal(t, sender.opts.Password, "password") - assert.Nil(t, sender.opts.TLSConfig.ClientCAs) - assert.Nil(t, sender.opts.TLSConfig.Certificates) - -} -func TestConfigureMQTTClientWithUsernamePasswordAndCA(t *testing.T) { - sender := NewMQTTSecretSender(MQTTSecretConfig{}, false) - sender.mqttConfig = MQTTSecretConfig{AuthMode: AuthModeUsernamePassword} - err := sender.configureMQTTClientForAuth(mqttSecrets{ - username: "username", - password: "password", - capemblock: []byte(testcacert), - }) - require.NoError(t, err) - assert.Equal(t, sender.opts.Username, "username") - assert.Equal(t, sender.opts.Password, "password") - assert.Nil(t, sender.opts.TLSConfig.Certificates) - assert.NotNil(t, sender.opts.TLSConfig.ClientCAs) -} - -func TestConfigureMQTTClientWithCACert(t *testing.T) { - sender := NewMQTTSecretSender(MQTTSecretConfig{}, false) - sender.mqttConfig = MQTTSecretConfig{ - AuthMode: AuthModeCA, - } - err := sender.configureMQTTClientForAuth(mqttSecrets{ - username: "username", - password: "password", - capemblock: []byte(testcacert), - }) - - require.NoError(t, err) - assert.NotNil(t, sender.opts.TLSConfig.ClientCAs) - assert.Empty(t, sender.opts.Username) - assert.Empty(t, sender.opts.Password) - assert.Nil(t, sender.opts.TLSConfig.Certificates) -} -func TestConfigureMQTTClientWithClientCert(t *testing.T) { - sender := NewMQTTSecretSender(MQTTSecretConfig{}, false) - sender.mqttConfig = MQTTSecretConfig{ - AuthMode: AuthModeCert, - } - err := sender.configureMQTTClientForAuth(mqttSecrets{ - username: "username", - password: "password", - certpemblock: []byte(testclientcert), - keypemblock: []byte(testclientkey), - capemblock: []byte(testcacert), - }) - require.NoError(t, err) - assert.Empty(t, sender.opts.Username) - assert.Empty(t, sender.opts.Password) - assert.NotNil(t, sender.opts.TLSConfig.Certificates) - assert.NotNil(t, sender.opts.TLSConfig.ClientCAs) -} - -func TestConfigureMQTTClientWithClientCertNoCA(t *testing.T) { - sender := NewMQTTSecretSender(MQTTSecretConfig{}, false) - sender.mqttConfig = MQTTSecretConfig{ - AuthMode: AuthModeCert, - } - err := sender.configureMQTTClientForAuth(mqttSecrets{ - username: "username", - password: "password", - certpemblock: []byte(testclientcert), - keypemblock: []byte(testclientkey), - }) - - require.NoError(t, err) - assert.Empty(t, sender.opts.Username) - assert.Empty(t, sender.opts.Password) - assert.NotNil(t, sender.opts.TLSConfig.Certificates) - assert.Nil(t, sender.opts.TLSConfig.ClientCAs) -} -func TestConfigureMQTTClientWithNone(t *testing.T) { - sender := NewMQTTSecretSender(MQTTSecretConfig{}, false) - sender.mqttConfig = MQTTSecretConfig{ - AuthMode: AuthModeNone, - } - err := sender.configureMQTTClientForAuth(mqttSecrets{}) - - require.NoError(t, err) -} - func TestSetRetryDataPersistFalse(t *testing.T) { context.RetryData = nil sender := NewMQTTSecretSender(MQTTSecretConfig{}, false)