Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(triggers): Add MQTT Trigger with secure connection options #498

Merged
merged 3 commits into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions appsdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/edgexfoundry/app-functions-sdk-go/internal/trigger"
"github.com/edgexfoundry/app-functions-sdk-go/internal/trigger/http"
"github.com/edgexfoundry/app-functions-sdk-go/internal/trigger/messagebus"
"github.com/edgexfoundry/app-functions-sdk-go/internal/trigger/mqtt"
"github.com/edgexfoundry/app-functions-sdk-go/internal/webserver"
"github.com/edgexfoundry/app-functions-sdk-go/pkg/util"
)
Expand Down Expand Up @@ -149,13 +150,18 @@ func (sdk *AppFunctionsSDK) MakeItRun() error {

sdk.runtime.Initialize(sdk.storeClient, sdk.secretProvider)
sdk.runtime.SetTransforms(sdk.transforms)

// determine input type and create trigger for it
t := sdk.setupTrigger(sdk.config, sdk.runtime)
if t == nil {
return errors.New("Failed to create Trigger")
lenny-goodell marked this conversation as resolved.
Show resolved Hide resolved
}

// Initialize the trigger (i.e. start a web server, or connect to message bus)
deferred, err := t.Initialize(sdk.appWg, sdk.appCtx, sdk.backgroundChannel)
if err != nil {
sdk.LoggingClient.Error(err.Error())
return errors.New("Failed to initialize Trigger")
}

// deferred is a a function that needs to be called when services exits.
Expand Down Expand Up @@ -435,9 +441,18 @@ func (sdk *AppFunctionsSDK) setupTrigger(configuration *common.ConfigurationStru
case "HTTP":
sdk.LoggingClient.Info("HTTP trigger selected")
t = &http.Trigger{Configuration: configuration, Runtime: runtime, Webserver: sdk.webserver, EdgeXClients: sdk.edgexClients}
case "MESSAGEBUS":
sdk.LoggingClient.Info("MessageBus trigger selected")

case "MESSAGEBUS",
"EDGEX-MESSAGEBUS": // Allows for more explicit name now that we have plain MQTT option also
sdk.LoggingClient.Info("EdgeX MessageBus trigger selected")
t = &messagebus.Trigger{Configuration: configuration, Runtime: runtime, EdgeXClients: sdk.edgexClients}

case "EXTERNAL-MQTT":
sdk.LoggingClient.Info("External MQTT trigger selected")
t = mqtt.NewTrigger(configuration, runtime, sdk.edgexClients, sdk.secretProvider)

default:
sdk.LoggingClient.Error(fmt.Sprintf("Invalid Trigger type of '%s' specified", configuration.Binding.Type))
}

return t
Expand Down
29 changes: 28 additions & 1 deletion internal/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type ConfigurationStruct struct {
Service ServiceInfo
// MessageBus
MessageBus types.MessageBusConfig
// MqttBroker
MqttBroker MqttBrokerConfig
// Binding
Binding BindingInfo
// ApplicationSettings
Expand Down Expand Up @@ -88,12 +90,37 @@ type BindingInfo struct {
//
// example: messagebus
// required: true
// enum: messagebus,http
// enum: messagebus (edgex-messagebus), http, external-mqtt
Type string
SubscribeTopic string
PublishTopic string
}

// MqttBrokerConfig contains the MQTT broker configuration for MQTT Trigger
type MqttBrokerConfig struct {
// Url contains the fully qualified URL to connect to the MQTT broker
Url string
// ClientId to connect to the broker with.
ClientId string
// ConnectTimeout is a time duration indicating how long to wait timing out on the broker connection
ConnectTimeout string
// AutoReconnect indicated whether or not to retry connection if disconnected
AutoReconnect bool
// KeepAlive is seconds between client ping when no active data flowing to avoid client being disconnected
KeepAlive int64
// QoS for MQTT Connection
QoS byte
// Retain setting for MQTT Connection
Retain bool
// SkipCertVerify indicates if the certificate verification should be skipped
SkipCertVerify bool
// SecretPath is the name of the path in secret provider to retrieve your secrets
SecretPath string
// AuthMode indicates what to use when connecting to the broker. Options are "none", "cacert" , "usernamepassword", "clientcert".
// If a CA Cert exists in the SecretPath then it will be used for all modes except "none".
AuthMode string
}

type PipelineInfo struct {
ExecutionOrder string
UseTargetTypeOfByteArray bool
Expand Down
183 changes: 183 additions & 0 deletions internal/trigger/mqtt/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//
// Copyright (c) 2020 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package mqtt

import (
"context"
"errors"
"fmt"
"net/url"
"sync"
"time"

pahoMqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/edgexfoundry/go-mod-bootstrap/bootstrap"
"github.com/edgexfoundry/go-mod-core-contracts/clients"
"github.com/edgexfoundry/go-mod-messaging/pkg/types"
"github.com/google/uuid"

"github.com/edgexfoundry/app-functions-sdk-go/appcontext"
"github.com/edgexfoundry/app-functions-sdk-go/internal/common"
"github.com/edgexfoundry/app-functions-sdk-go/internal/runtime"
"github.com/edgexfoundry/app-functions-sdk-go/internal/security"
"github.com/edgexfoundry/app-functions-sdk-go/pkg/secure"
)

// Trigger implements Trigger to support Triggers
type Trigger struct {
configuration *common.ConfigurationStruct
mqttClient pahoMqtt.Client
runtime *runtime.GolangRuntime
edgeXClients common.EdgeXClients
secretProvider security.SecretProvider
}

func NewTrigger(
configuration *common.ConfigurationStruct,
runtime *runtime.GolangRuntime,
clients common.EdgeXClients,
secretProvider security.SecretProvider) *Trigger {
return &Trigger{
configuration: configuration,
runtime: runtime,
edgeXClients: clients,
secretProvider: secretProvider,
}
}

// Initialize initializes the Trigger for an external MQTT broker
func (trigger *Trigger) Initialize(_ *sync.WaitGroup, _ context.Context, background <-chan types.MessageEnvelope) (bootstrap.Deferred, error) {
// Convenience short cuts
logger := trigger.edgeXClients.LoggingClient
brokerConfig := trigger.configuration.MqttBroker
topic := trigger.configuration.Binding.SubscribeTopic

logger.Info("Initializing MQTT Trigger")

if background != nil {
return nil, errors.New("background publishing not supported for services using MQTT trigger")
}

if len(topic) == 0 {
return nil, fmt.Errorf("missing SubscribeTopic for MQTT Trigger. Must be present in [Binding] section.")
}

brokerUrl, err := url.Parse(brokerConfig.Url)
if err != nil {
return nil, fmt.Errorf("invalid MQTT Broker Url '%s': %s", trigger.configuration.MqttBroker.Url, err.Error())
}

opts := pahoMqtt.NewClientOptions()
opts.AutoReconnect = brokerConfig.AutoReconnect
opts.ClientID = brokerConfig.ClientId
if len(brokerConfig.ConnectTimeout) > 0 {
duration, err := time.ParseDuration(brokerConfig.ConnectTimeout)
if err != nil {
return nil, fmt.Errorf("invalid MQTT ConnectTimeout '%s': %s", brokerConfig.ConnectTimeout, err.Error())
}
opts.ConnectTimeout = duration
}
opts.KeepAlive = brokerConfig.KeepAlive
opts.Servers = []*url.URL{brokerUrl}

mqttFactory := secure.NewMqttFactory(
logger,
trigger.secretProvider,
brokerConfig.AuthMode,
brokerConfig.SecretPath,
brokerConfig.SkipCertVerify,
)

mqttClient, err := mqttFactory.Create(opts)
if err != nil {
return nil, fmt.Errorf("unable to create secure MQTT Client: %s", err.Error())
}

logger.Info(fmt.Sprintf("Connecting to mqtt broker for MQTT trigger at: %s", brokerUrl))

if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
return nil, fmt.Errorf("could not connect to broker for MQTT trigger: %s", token.Error().Error())
}
logger.Info("Connected to mqtt server for MQTT trigger")

if token := mqttClient.Subscribe(topic, brokerConfig.QoS, trigger.messageHandler); token.Wait() && token.Error() != nil {
mqttClient.Disconnect(0)
return nil, fmt.Errorf("could not subscribe to topic '%s' for MQTT trigger: %s", topic, token.Error().Error())
}

logger.Info(fmt.Sprintf("Subscribed to topic '%s' for MQTT trigger", topic))

deferred := func() {
logger.Info("Disconnecting from broker for MQTT trigger")
trigger.mqttClient.Disconnect(0)
}

trigger.mqttClient = mqttClient

return deferred, nil
}

func (trigger *Trigger) messageHandler(client pahoMqtt.Client, message pahoMqtt.Message) {
// Convenience short cuts
logger := trigger.edgeXClients.LoggingClient
brokerConfig := trigger.configuration.MqttBroker
topic := trigger.configuration.Binding.PublishTopic

data := message.Payload()
contentType := clients.ContentTypeJSON
if data[0] != byte('{') {
// If not JSON then assume it is CBOR
contentType = clients.ContentTypeCBOR
}

correlationID := uuid.New().String()

edgexContext := &appcontext.Context{
CorrelationID: correlationID,
Configuration: trigger.configuration,
LoggingClient: trigger.edgeXClients.LoggingClient,
EventClient: trigger.edgeXClients.EventClient,
ValueDescriptorClient: trigger.edgeXClients.ValueDescriptorClient,
CommandClient: trigger.edgeXClients.CommandClient,
NotificationsClient: trigger.edgeXClients.NotificationsClient,
}

logger.Trace("Received message from MQTT Trigger", clients.CorrelationHeader, correlationID)
logger.Debug(fmt.Sprintf("Received message from MQTT Trigger with %d bytes", len(data)), clients.ContentType, contentType)

envelope := types.MessageEnvelope{
CorrelationID: correlationID,
ContentType: contentType,
Payload: data,
}

messageError := trigger.runtime.ProcessMessage(edgexContext, envelope)
if messageError != nil {
// ProcessMessage logs the error, so no need to log it here.
// ToDo: Do we want to publish the error back to the Broker?
return
}

if len(edgexContext.OutputData) > 0 && len(topic) > 0 {
if token := client.Publish(topic, brokerConfig.QoS, brokerConfig.Retain, edgexContext.OutputData); token.Wait() && token.Error() != nil {
logger.Error("could not publish to topic '%s' for MQTT trigger: %s", topic, token.Error().Error())
} else {
logger.Trace("Sent MQTT Trigger response message", clients.CorrelationHeader, correlationID)
logger.Debug(fmt.Sprintf("Sent MQTT Trigger response message on topic '%s' with %d bytes", topic, len(edgexContext.OutputData)))
}
}
}
2 changes: 1 addition & 1 deletion internal/webserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestConfigureAndConfigRoute(t *testing.T) {
rr := httptest.NewRecorder()
webserver.router.ServeHTTP(rr, req)

expected := `{"Writable":{"LogLevel":"","Pipeline":{"ExecutionOrder":"","UseTargetTypeOfByteArray":false,"Functions":null},"StoreAndForward":{"Enabled":false,"RetryInterval":"","MaxRetryCount":0},"InsecureSecrets":null},"Logging":{"EnableRemote":false,"File":""},"Registry":{"Host":"","Port":0,"Type":""},"Service":{"BootTimeout":"","CheckInterval":"","Host":"","HTTPSCert":"","HTTPSKey":"","ServerBindAddr":"","Port":0,"Protocol":"","StartupMsg":"","ReadMaxLimit":0,"Timeout":""},"MessageBus":{"PublishHost":{"Host":"","Port":0,"Protocol":""},"SubscribeHost":{"Host":"","Port":0,"Protocol":""},"Type":"","Optional":null},"Binding":{"Type":"","SubscribeTopic":"","PublishTopic":""},"ApplicationSettings":null,"Clients":null,"Database":{"Type":"","Host":"","Port":0,"Timeout":"","Username":"","Password":"","MaxIdle":0,"BatchSize":0},"SecretStore":{"Host":"","Port":0,"Path":"","Protocol":"","Namespace":"","RootCaCertPath":"","ServerName":"","Authentication":{"AuthType":"","AuthToken":""},"AdditionalRetryAttempts":0,"RetryWaitPeriod":"","TokenFile":""},"SecretStoreExclusive":{"Host":"","Port":0,"Path":"","Protocol":"","Namespace":"","RootCaCertPath":"","ServerName":"","Authentication":{"AuthType":"","AuthToken":""},"AdditionalRetryAttempts":0,"RetryWaitPeriod":"","TokenFile":""}}` + "\n"
expected := `{"Writable":{"LogLevel":"","Pipeline":{"ExecutionOrder":"","UseTargetTypeOfByteArray":false,"Functions":null},"StoreAndForward":{"Enabled":false,"RetryInterval":"","MaxRetryCount":0},"InsecureSecrets":null},"Logging":{"EnableRemote":false,"File":""},"Registry":{"Host":"","Port":0,"Type":""},"Service":{"BootTimeout":"","CheckInterval":"","Host":"","HTTPSCert":"","HTTPSKey":"","ServerBindAddr":"","Port":0,"Protocol":"","StartupMsg":"","ReadMaxLimit":0,"Timeout":""},"MessageBus":{"PublishHost":{"Host":"","Port":0,"Protocol":""},"SubscribeHost":{"Host":"","Port":0,"Protocol":""},"Type":"","Optional":null},"MqttBroker":{"Url":"","ClientId":"","ConnectTimeout":"","AutoReconnect":false,"KeepAlive":0,"QoS":0,"Retain":false,"SkipCertVerify":false,"SecretPath":"","AuthMode":""},"Binding":{"Type":"","SubscribeTopic":"","PublishTopic":""},"ApplicationSettings":null,"Clients":null,"Database":{"Type":"","Host":"","Port":0,"Timeout":"","Username":"","Password":"","MaxIdle":0,"BatchSize":0},"SecretStore":{"Host":"","Port":0,"Path":"","Protocol":"","Namespace":"","RootCaCertPath":"","ServerName":"","Authentication":{"AuthType":"","AuthToken":""},"AdditionalRetryAttempts":0,"RetryWaitPeriod":"","TokenFile":""},"SecretStoreExclusive":{"Host":"","Port":0,"Path":"","Protocol":"","Namespace":"","RootCaCertPath":"","ServerName":"","Authentication":{"AuthType":"","AuthToken":""},"AdditionalRetryAttempts":0,"RetryWaitPeriod":"","TokenFile":""}}` + "\n"

body := rr.Body.String()
assert.Equal(t, expected, body)
Expand Down
Loading