From 2d6c7c3a8aff00bf91a310990aa1f0e500a8f4bf Mon Sep 17 00:00:00 2001 From: Alex Ullrich Date: Sun, 28 Nov 2021 15:38:15 -0500 Subject: [PATCH] refactor(triggers): Normalize Orchestration Share functionality between custom and built-in triggers, expanding testability of those built-in. fixes #960 Signed-off-by: Alex Ullrich --- internal/app/mocks/TriggerServiceBinding.go | 65 -- internal/app/service.go | 2 +- internal/app/service_test.go | 10 +- internal/app/triggerfactory.go | 12 +- internal/app/triggerfactory_test.go | 12 +- internal/app/triggermessageprocessor.go | 4 +- internal/app/triggermessageprocessor_test.go | 2 +- .../http/mocks/TriggerRequestReader.go | 31 + .../http/mocks/TriggerResponseWriter.go | 56 ++ .../trigger/http/mocks/TriggerRouteManager.go | 19 + internal/trigger/http/rest.go | 91 ++- internal/trigger/http/rest_test.go | 202 ++++- internal/trigger/messagebus/messaging.go | 83 +- internal/trigger/messagebus/messaging_test.go | 730 ++++-------------- .../trigger/messagebus/mocks/MessageClient.go | 69 ++ internal/trigger/mqtt/mocks/Client.go | 151 ++++ internal/trigger/mqtt/mocks/Message.go | 101 +++ internal/trigger/mqtt/mocks/Token.go | 72 ++ internal/trigger/mqtt/mqtt.go | 84 +- internal/trigger/mqtt/mqtt_test.go | 172 +++++ pkg/secure/mqttfactory.go | 13 +- pkg/secure/mqttfactory_test.go | 23 +- pkg/transforms/mqttsecret.go | 2 +- 23 files changed, 1198 insertions(+), 808 deletions(-) delete mode 100644 internal/app/mocks/TriggerServiceBinding.go create mode 100644 internal/trigger/http/mocks/TriggerRequestReader.go create mode 100644 internal/trigger/http/mocks/TriggerResponseWriter.go create mode 100644 internal/trigger/http/mocks/TriggerRouteManager.go create mode 100644 internal/trigger/messagebus/mocks/MessageClient.go create mode 100644 internal/trigger/mqtt/mocks/Client.go create mode 100644 internal/trigger/mqtt/mocks/Message.go create mode 100644 internal/trigger/mqtt/mocks/Token.go create mode 100644 internal/trigger/mqtt/mqtt_test.go diff --git a/internal/app/mocks/TriggerServiceBinding.go b/internal/app/mocks/TriggerServiceBinding.go deleted file mode 100644 index 1ac2837ce..000000000 --- a/internal/app/mocks/TriggerServiceBinding.go +++ /dev/null @@ -1,65 +0,0 @@ -// Code generated by mockery v2.9.4. DO NOT EDIT. - -package mocks - -import ( - appfunction "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction" - interfaces "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" - - mock "github.com/stretchr/testify/mock" - - runtime "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime" - - types "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" -) - -// TriggerServiceBinding is an autogenerated mock type for the TriggerServiceBinding type -type TriggerServiceBinding struct { - mock.Mock -} - -// GetMatchingPipelines provides a mock function with given fields: incomingTopic -func (_m *TriggerServiceBinding) GetMatchingPipelines(incomingTopic string) []*interfaces.FunctionPipeline { - ret := _m.Called(incomingTopic) - - var r0 []*interfaces.FunctionPipeline - if rf, ok := ret.Get(0).(func(string) []*interfaces.FunctionPipeline); ok { - r0 = rf(incomingTopic) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*interfaces.FunctionPipeline) - } - } - - return r0 -} - -// LoadCustomConfig provides a mock function with given fields: config, sectionName -func (_m *TriggerServiceBinding) LoadCustomConfig(config interfaces.UpdatableConfig, sectionName string) error { - ret := _m.Called(config, sectionName) - - var r0 error - if rf, ok := ret.Get(0).(func(interfaces.UpdatableConfig, string) error); ok { - r0 = rf(config, sectionName) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// ProcessMessage provides a mock function with given fields: appContext, envelope, pipeline -func (_m *TriggerServiceBinding) ProcessMessage(appContext *appfunction.Context, envelope types.MessageEnvelope, pipeline *interfaces.FunctionPipeline) *runtime.MessageError { - ret := _m.Called(appContext, envelope, pipeline) - - var r0 *runtime.MessageError - if rf, ok := ret.Get(0).(func(*appfunction.Context, types.MessageEnvelope, *interfaces.FunctionPipeline) *runtime.MessageError); ok { - r0 = rf(appContext, envelope, pipeline) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*runtime.MessageError) - } - } - - return r0 -} diff --git a/internal/app/service.go b/internal/app/service.go index 30cee1ad9..60328f7d2 100644 --- a/internal/app/service.go +++ b/internal/app/service.go @@ -160,7 +160,7 @@ func (svc *Service) MakeItRun() error { svc.ctx.stop = stop // determine input type and create trigger for it - t := svc.setupTrigger(svc.config, svc.runtime) + t := svc.setupTrigger(svc.config) if t == nil { return errors.New("failed to create Trigger") } diff --git a/internal/app/service_test.go b/internal/app/service_test.go index 2f3765ea2..cba99d804 100644 --- a/internal/app/service_test.go +++ b/internal/app/service_test.go @@ -243,7 +243,10 @@ func TestSetupHTTPTrigger(t *testing.T) { testRuntime := runtime.NewGolangRuntime("", nil, dic) testRuntime.SetDefaultFunctionsPipeline(nil) - trigger := sdk.setupTrigger(sdk.config, testRuntime) + + sdk.runtime = testRuntime + + trigger := sdk.setupTrigger(sdk.config) result := IsInstanceOf(trigger, (*triggerHttp.Trigger)(nil)) assert.True(t, result, "Expected Instance of HTTP Trigger") } @@ -259,7 +262,10 @@ func TestSetupMessageBusTrigger(t *testing.T) { } testRuntime := runtime.NewGolangRuntime("", nil, dic) testRuntime.SetDefaultFunctionsPipeline(nil) - trigger := sdk.setupTrigger(sdk.config, testRuntime) + + sdk.runtime = testRuntime + + trigger := sdk.setupTrigger(sdk.config) result := IsInstanceOf(trigger, (*messagebus.Trigger)(nil)) assert.True(t, result, "Expected Instance of Message Bus Trigger") } diff --git a/internal/app/triggerfactory.go b/internal/app/triggerfactory.go index c373bf4ed..2b6cc2ef7 100644 --- a/internal/app/triggerfactory.go +++ b/internal/app/triggerfactory.go @@ -21,7 +21,6 @@ package app import ( "fmt" "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/common" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime" "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/http" "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/messagebus" "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/mqtt" @@ -36,21 +35,24 @@ const ( TriggerTypeHTTP = "HTTP" ) -func (svc *Service) setupTrigger(configuration *common.ConfigurationStruct, runtime *runtime.GolangRuntime) interfaces.Trigger { +func (svc *Service) setupTrigger(configuration *common.ConfigurationStruct) interfaces.Trigger { var t interfaces.Trigger + bnd := NewTriggerServiceBinding(svc) + mp := &triggerMessageProcessor{bnd} + switch triggerType := strings.ToUpper(configuration.Trigger.Type); triggerType { case TriggerTypeHTTP: svc.LoggingClient().Info("HTTP trigger selected") - t = http.NewTrigger(svc.dic, svc.runtime, svc.webserver) + t = http.NewTrigger(bnd, mp, svc.webserver) case TriggerTypeMessageBus: svc.LoggingClient().Info("EdgeX MessageBus trigger selected") - t = messagebus.NewTrigger(svc.dic, svc.runtime) + t = messagebus.NewTrigger(bnd, mp) case TriggerTypeMQTT: svc.LoggingClient().Info("External MQTT trigger selected") - t = mqtt.NewTrigger(svc.dic, svc.runtime) + t = mqtt.NewTrigger(bnd, mp) default: if factory, found := svc.customTriggerFactories[triggerType]; found { diff --git a/internal/app/triggerfactory_test.go b/internal/app/triggerfactory_test.go index 2df646ac8..3ffd73ea3 100644 --- a/internal/app/triggerfactory_test.go +++ b/internal/app/triggerfactory_test.go @@ -102,7 +102,7 @@ func TestSetupTrigger_HTTP(t *testing.T) { lc: logger.MockLogger{}, } - trigger := sdk.setupTrigger(sdk.config, nil) + trigger := sdk.setupTrigger(sdk.config) require.NotNil(t, trigger, "should be defined") require.IsType(t, &http.Trigger{}, trigger, "should be an http trigger") @@ -118,7 +118,7 @@ func TestSetupTrigger_EdgeXMessageBus(t *testing.T) { lc: logger.MockLogger{}, } - trigger := sdk.setupTrigger(sdk.config, nil) + trigger := sdk.setupTrigger(sdk.config) require.NotNil(t, trigger, "should be defined") require.IsType(t, &messagebus.Trigger{}, trigger, "should be an edgex-messagebus trigger") @@ -143,7 +143,7 @@ func TestSetupTrigger_MQTT(t *testing.T) { lc: lc, } - trigger := sdk.setupTrigger(sdk.config, nil) + trigger := sdk.setupTrigger(sdk.config) require.NotNil(t, trigger, "should be defined") require.IsType(t, &mqtt.Trigger{}, trigger, "should be an external-MQTT trigger") @@ -173,7 +173,7 @@ func Test_Service_setupTrigger_CustomType(t *testing.T) { }) require.NoError(t, err) - trigger := sdk.setupTrigger(sdk.config, nil) + trigger := sdk.setupTrigger(sdk.config) require.NotNil(t, trigger, "should be defined") require.IsType(t, &mockCustomTrigger{}, trigger, "should be a custom trigger") @@ -196,7 +196,7 @@ func Test_Service_SetupTrigger_CustomTypeError(t *testing.T) { }) require.NoError(t, err) - trigger := sdk.setupTrigger(sdk.config, nil) + trigger := sdk.setupTrigger(sdk.config) require.Nil(t, trigger, "should be nil") } @@ -213,7 +213,7 @@ func Test_Service_SetupTrigger_CustomTypeNotFound(t *testing.T) { lc: logger.MockLogger{}, } - trigger := sdk.setupTrigger(sdk.config, nil) + trigger := sdk.setupTrigger(sdk.config) require.Nil(t, trigger, "should be nil") } diff --git a/internal/app/triggermessageprocessor.go b/internal/app/triggermessageprocessor.go index b29a9c6ed..184f77f68 100644 --- a/internal/app/triggermessageprocessor.go +++ b/internal/app/triggermessageprocessor.go @@ -59,7 +59,7 @@ func (b *simpleTriggerServiceBinding) Config() *common.ConfigurationStruct { return b.Service.config } -// customTriggerBinding wraps the CustomTriggerServiceBinding interface so that we can attach methods +// triggerMessageProcessor wraps the ServiceBinding interface so that we can attach methods type triggerMessageProcessor struct { bnd trigger.ServiceBinding } @@ -91,7 +91,7 @@ func (mp *triggerMessageProcessor) Process(ctx interfaces.AppFunctionContext, en func (mp *triggerMessageProcessor) MessageReceived(ctx interfaces.AppFunctionContext, envelope types.MessageEnvelope, responseHandler interfaces.PipelineResponseHandler) error { lc := mp.bnd.LoggingClient() - lc.Debugf("custom trigger attempting to find pipeline(s) for topic %s", envelope.ReceivedTopic) + lc.Debugf("trigger attempting to find pipeline(s) for topic %s", envelope.ReceivedTopic) // ensure we have a context established that we can safely cast to *appfunction.Context to pass to runtime if _, ok := ctx.(*appfunction.Context); ctx == nil || !ok { diff --git a/internal/app/triggermessageprocessor_test.go b/internal/app/triggermessageprocessor_test.go index 8c9cf599d..6ed08979a 100644 --- a/internal/app/triggermessageprocessor_test.go +++ b/internal/app/triggermessageprocessor_test.go @@ -128,7 +128,7 @@ func Test_triggerMessageProcessor_MessageReceived(t *testing.T) { t.Run(tt.name, func(t *testing.T) { tsb := triggerMocks.ServiceBinding{} - tsb.On("ProcessMessage", mock.Anything, mock.Anything, mock.Anything).Return(tt.setup.runtimeProcessor) + tsb.On("ProcessMessage", mock.AnythingOfType("*appfunction.Context"), mock.AnythingOfType("types.MessageEnvelope"), mock.AnythingOfType("*interfaces.FunctionPipeline")).Return(tt.setup.runtimeProcessor) tsb.On("GetMatchingPipelines", tt.args.envelope.ReceivedTopic).Return(tt.setup.pipelineMatcher) tsb.On("LoggingClient").Return(lc) diff --git a/internal/trigger/http/mocks/TriggerRequestReader.go b/internal/trigger/http/mocks/TriggerRequestReader.go new file mode 100644 index 000000000..99f1e9f57 --- /dev/null +++ b/internal/trigger/http/mocks/TriggerRequestReader.go @@ -0,0 +1,31 @@ +// Code generated by mockery v2.9.4. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// TriggerRequestReader is an autogenerated mock type for the TriggerRequestReader type +type TriggerRequestReader struct { + mock.Mock +} + +// Read provides a mock function with given fields: p +func (_m *TriggerRequestReader) Read(p []byte) (int, error) { + ret := _m.Called(p) + + var r0 int + if rf, ok := ret.Get(0).(func([]byte) int); ok { + r0 = rf(p) + } else { + r0 = ret.Get(0).(int) + } + + var r1 error + if rf, ok := ret.Get(1).(func([]byte) error); ok { + r1 = rf(p) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/internal/trigger/http/mocks/TriggerResponseWriter.go b/internal/trigger/http/mocks/TriggerResponseWriter.go new file mode 100644 index 000000000..97a1e0eba --- /dev/null +++ b/internal/trigger/http/mocks/TriggerResponseWriter.go @@ -0,0 +1,56 @@ +// Code generated by mockery v2.9.4. DO NOT EDIT. + +package mocks + +import ( + http "net/http" + + mock "github.com/stretchr/testify/mock" +) + +// TriggerResponseWriter is an autogenerated mock type for the TriggerResponseWriter type +type TriggerResponseWriter struct { + mock.Mock +} + +// Header provides a mock function with given fields: +func (_m *TriggerResponseWriter) Header() http.Header { + ret := _m.Called() + + var r0 http.Header + if rf, ok := ret.Get(0).(func() http.Header); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(http.Header) + } + } + + return r0 +} + +// Write provides a mock function with given fields: _a0 +func (_m *TriggerResponseWriter) Write(_a0 []byte) (int, error) { + ret := _m.Called(_a0) + + var r0 int + if rf, ok := ret.Get(0).(func([]byte) int); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(int) + } + + var r1 error + if rf, ok := ret.Get(1).(func([]byte) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// WriteHeader provides a mock function with given fields: statusCode +func (_m *TriggerResponseWriter) WriteHeader(statusCode int) { + _m.Called(statusCode) +} diff --git a/internal/trigger/http/mocks/TriggerRouteManager.go b/internal/trigger/http/mocks/TriggerRouteManager.go new file mode 100644 index 000000000..1e867516d --- /dev/null +++ b/internal/trigger/http/mocks/TriggerRouteManager.go @@ -0,0 +1,19 @@ +// Code generated by mockery v2.9.4. DO NOT EDIT. + +package mocks + +import ( + http "net/http" + + mock "github.com/stretchr/testify/mock" +) + +// TriggerRouteManager is an autogenerated mock type for the TriggerRouteManager type +type TriggerRouteManager struct { + mock.Mock +} + +// SetupTriggerRoute provides a mock function with given fields: path, handlerForTrigger +func (_m *TriggerRouteManager) SetupTriggerRoute(path string, handlerForTrigger func(http.ResponseWriter, *http.Request)) { + _m.Called(path, handlerForTrigger) +} diff --git a/internal/trigger/http/rest.go b/internal/trigger/http/rest.go index 0fbf017f6..c5088104a 100644 --- a/internal/trigger/http/rest.go +++ b/internal/trigger/http/rest.go @@ -1,5 +1,6 @@ // // Copyright (c) 2021 Intel Corporation +// Copyright (c) 2021 One Track Consulting // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,6 +21,8 @@ import ( "context" "errors" "fmt" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger" + "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" "io" "net/http" "sync" @@ -27,51 +30,61 @@ import ( "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" "github.com/edgexfoundry/app-functions-sdk-go/v2/internal" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/webserver" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap" - bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" - "github.com/edgexfoundry/go-mod-bootstrap/v2/di" "github.com/edgexfoundry/go-mod-core-contracts/v2/common" "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" ) +type TriggerRouteManager interface { + SetupTriggerRoute(path string, handlerForTrigger func(http.ResponseWriter, *http.Request)) +} + +type TriggerResponseWriter interface { + http.ResponseWriter +} + +type TriggerRequestReader interface { + io.ReadCloser +} + // Trigger implements Trigger to support Triggers type Trigger struct { - dic *di.Container - Runtime *runtime.GolangRuntime - Webserver *webserver.WebServer - outputData []byte + serviceBinding trigger.ServiceBinding + messageProcessor trigger.MessageProcessor + RouteManager TriggerRouteManager } -func NewTrigger(dic *di.Container, runtime *runtime.GolangRuntime, webserver *webserver.WebServer) *Trigger { +func NewTrigger(bnd trigger.ServiceBinding, mp trigger.MessageProcessor, trm TriggerRouteManager) *Trigger { return &Trigger{ - dic: dic, - Runtime: runtime, - Webserver: webserver, + serviceBinding: bnd, + messageProcessor: mp, + RouteManager: trm, } } // Initialize initializes the Trigger for logging and REST route func (trigger *Trigger) Initialize(_ *sync.WaitGroup, _ context.Context, background <-chan interfaces.BackgroundMessage) (bootstrap.Deferred, error) { - lc := bootstrapContainer.LoggingClientFrom(trigger.dic.Get) + lc := trigger.serviceBinding.LoggingClient() if background != nil { return nil, errors.New("background publishing not supported for services using HTTP trigger") } lc.Info("Initializing HTTP Trigger") - trigger.Webserver.SetupTriggerRoute(internal.ApiTriggerRoute, trigger.requestHandler) + trigger.RouteManager.SetupTriggerRoute(internal.ApiTriggerRoute, trigger.requestHandler) lc.Info("HTTP Trigger Initialized") return nil, nil } func (trigger *Trigger) requestHandler(writer http.ResponseWriter, r *http.Request) { - lc := bootstrapContainer.LoggingClientFrom(trigger.dic.Get) - defer func() { _ = r.Body.Close() }() + lc := trigger.serviceBinding.LoggingClient() + defer func() { + if r.Body != nil { + _ = r.Body.Close() + } + }() contentType := r.Header.Get(common.ContentType) @@ -87,8 +100,6 @@ func (trigger *Trigger) requestHandler(writer http.ResponseWriter, r *http.Reque correlationID := r.Header.Get(common.CorrelationHeader) - appContext := appfunction.NewContext(correlationID, trigger.dic, contentType) - lc.Trace("Received message from http", common.CorrelationHeader, correlationID) lc.Debug("Received message from http", common.ContentType, contentType) @@ -98,27 +109,33 @@ func (trigger *Trigger) requestHandler(writer http.ResponseWriter, r *http.Reque Payload: data, } - messageError := trigger.Runtime.ProcessMessage(appContext, envelope, trigger.Runtime.GetDefaultPipeline()) - if messageError != nil { - // ProcessMessage logs the error, so no need to log it here. - writer.WriteHeader(messageError.ErrorCode) - _, _ = writer.Write([]byte(messageError.Err.Error())) - return - } + appContext := trigger.serviceBinding.BuildContext(envelope) - if len(appContext.ResponseContentType()) > 0 { - writer.Header().Set(common.ContentType, appContext.ResponseContentType()) - } + // TODO: should only get the default pipeline since topic is empty, can we ensure this and use the passed context directly insatead of cloning, deal with error out here? + messageError := trigger.messageProcessor.MessageReceived(appContext, envelope, getResponseHandler(writer, lc)) - _, err = writer.Write(appContext.ResponseData()) - if err != nil { - lc.Errorf("unable to write ResponseData as HTTP response: %s", err.Error()) - return + if messageError != nil { + // Process logs the error, so no need to log it here. + writer.WriteHeader(http.StatusInternalServerError) + _, _ = writer.Write([]byte(messageError.Error())) } +} - if appContext.ResponseData() != nil { - lc.Trace("Sent http response message", common.CorrelationHeader, correlationID) +func getResponseHandler(writer http.ResponseWriter, lc logger.LoggingClient) interfaces.PipelineResponseHandler { + return func(ctx interfaces.AppFunctionContext, pipeline *interfaces.FunctionPipeline) error { + if len(ctx.ResponseContentType()) > 0 { + writer.Header().Set(common.ContentType, ctx.ResponseContentType()) + } + + _, err := writer.Write(ctx.ResponseData()) + if err != nil { + lc.Errorf("unable to write ResponseData as HTTP response: %s", err.Error()) + return err + } + + if ctx.ResponseData() != nil { + lc.Trace("Sent http response message", common.CorrelationHeader, ctx.CorrelationID()) + } + return nil } - - trigger.outputData = nil } diff --git a/internal/trigger/http/rest_test.go b/internal/trigger/http/rest_test.go index 171150462..9edff9a8f 100644 --- a/internal/trigger/http/rest_test.go +++ b/internal/trigger/http/rest_test.go @@ -1,6 +1,7 @@ // // Copyright (c) 2020 Technotects // Copyright (c) 2021 Intel Corporation +// Copyright (c) 2021 One Track Consulting // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,28 +19,209 @@ package http import ( + "bytes" + "fmt" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/http/mocks" + triggerMocks "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/mocks" + interfaceMocks "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces/mocks" + "github.com/edgexfoundry/go-mod-core-contracts/v2/common" + "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" + "github.com/google/uuid" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "net/http" "testing" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" - bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" - "github.com/edgexfoundry/go-mod-bootstrap/v2/di" "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" "github.com/stretchr/testify/assert" ) -func TestTriggerInitializeWitBackgroundChannel(t *testing.T) { +func TestTriggerInitializeWithBackgroundChannel(t *testing.T) { background := make(chan interfaces.BackgroundMessage) - dic := di.NewContainer(di.ServiceConstructorMap{ - bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} { - return logger.NewMockClient() - }, - }) - trigger := NewTrigger(dic, nil, nil) + + bnd := &triggerMocks.ServiceBinding{} + bnd.On("LoggingClient").Return(logger.NewMockClient()) + + trigger := NewTrigger(bnd, nil, nil) deferred, err := trigger.Initialize(nil, nil, background) assert.Nil(t, deferred) - assert.NotNil(t, err) + assert.Error(t, err) assert.Equal(t, "background publishing not supported for services using HTTP trigger", err.Error()) } + +func TestTriggerInitialize(t *testing.T) { + bnd := &triggerMocks.ServiceBinding{} + bnd.On("LoggingClient").Return(logger.NewMockClient()) + + trm := &mocks.TriggerRouteManager{} + trm.On("SetupTriggerRoute", internal.ApiTriggerRoute, mock.AnythingOfType("func(http.ResponseWriter, *http.Request)")) + defer trm.AssertExpectations(t) + + trigger := NewTrigger(bnd, nil, trm) + + deferred, err := trigger.Initialize(nil, nil, nil) + + assert.Nil(t, deferred) + assert.NoError(t, err) +} + +func TestTriggerRequestHandler_BodyReadError(t *testing.T) { + bnd := &triggerMocks.ServiceBinding{} + bnd.On("LoggingClient").Return(logger.NewMockClient()) + + trigger := Trigger{ + serviceBinding: bnd, + } + + errorMsg := "fake error" + + writer := &mocks.TriggerResponseWriter{} + writer.On("WriteHeader", http.StatusBadRequest) + writer.On("Write", mock.Anything).Return(0, nil) + defer writer.AssertExpectations(t) + + reqReader := &mocks.TriggerRequestReader{} + reqReader.On("Read", mock.Anything).Return(0, fmt.Errorf(errorMsg)) + + req, err := http.NewRequest("", "", reqReader) + req.Header = http.Header{} + + require.NoError(t, err) + + trigger.requestHandler(writer, req) + + writer.AssertExpectations(t) +} + +func TestTriggerRequestHandler_ProcessError(t *testing.T) { + data := []byte("some data") + contentType := "arbitrary string" + correlationId := uuid.NewString() + afc := &interfaceMocks.AppFunctionContext{} + + bnd := &triggerMocks.ServiceBinding{} + bnd.On("LoggingClient").Return(logger.NewMockClient()) + bnd.On("BuildContext", mock.AnythingOfType("types.MessageEnvelope")).Return(afc) + + mp := &triggerMocks.MessageProcessor{} + mp.On("MessageReceived", afc, mock.AnythingOfType("types.MessageEnvelope"), mock.AnythingOfType("interfaces.PipelineResponseHandler")).Return(func(ctx interfaces.AppFunctionContext, env types.MessageEnvelope, f interfaces.PipelineResponseHandler) error { + assert.Equal(t, correlationId, env.CorrelationID) + assert.Equal(t, afc, ctx) + assert.Equal(t, data, env.Payload) + assert.Equal(t, contentType, env.ContentType) + return fmt.Errorf("error") + }) + + trigger := Trigger{ + serviceBinding: bnd, + messageProcessor: mp, + } + + writer := &mocks.TriggerResponseWriter{} + writer.On("WriteHeader", http.StatusInternalServerError) + writer.On("Write", []byte("error")).Return(0, nil) + + req, err := http.NewRequest("", "", bytes.NewBuffer(data)) + req.Header = http.Header{} + req.Header.Add(common.ContentType, contentType) + req.Header.Add(common.CorrelationHeader, correlationId) + + require.NoError(t, err) + + trigger.requestHandler(writer, req) + + writer.AssertExpectations(t) +} + +func TestTriggerRequestHandler(t *testing.T) { + data := []byte("some data") + contentType := "arbitrary string" + correlationId := uuid.NewString() + afc := &interfaceMocks.AppFunctionContext{} + + bnd := &triggerMocks.ServiceBinding{} + bnd.On("LoggingClient").Return(logger.NewMockClient()) + bnd.On("BuildContext", mock.AnythingOfType("types.MessageEnvelope")).Return(afc) + + mp := &triggerMocks.MessageProcessor{} + mp.On("MessageReceived", mock.Anything, mock.Anything, mock.Anything).Return(func(ctx interfaces.AppFunctionContext, env types.MessageEnvelope, f interfaces.PipelineResponseHandler) error { + assert.Equal(t, correlationId, env.CorrelationID) + assert.Equal(t, afc, ctx) + assert.Equal(t, data, env.Payload) + assert.Equal(t, contentType, env.ContentType) + return nil + }) + + trigger := Trigger{ + serviceBinding: bnd, + messageProcessor: mp, + } + + writer := &mocks.TriggerResponseWriter{} + + req, err := http.NewRequest("", "", bytes.NewBuffer(data)) + req.Header = http.Header{} + req.Header.Add(common.ContentType, contentType) + req.Header.Add(common.CorrelationHeader, correlationId) + + require.NoError(t, err) + + trigger.requestHandler(writer, req) +} + +func Test_getResponseHandler(t *testing.T) { + data := []byte("some data in response") + correlationId := uuid.NewString() + + type inputs struct { + correlationId string + contentType string + data []byte + pipeline *interfaces.FunctionPipeline + writerHeader http.Header + } + tests := []struct { + name string + inputs inputs + setup func(writer *mocks.TriggerResponseWriter, ctx *interfaceMocks.AppFunctionContext, i inputs) + wantErr bool + }{ + {name: "write error", inputs: inputs{pipeline: &interfaces.FunctionPipeline{}, correlationId: uuid.NewString(), data: []byte("some data in response")}, setup: func(writer *mocks.TriggerResponseWriter, ctx *interfaceMocks.AppFunctionContext, ip inputs) { + ctx.On("ResponseContentType").Return(ip.contentType) + ctx.On("ResponseData").Return(data) + writer.On("Write", data).Return(0, fmt.Errorf("write error")) + }, wantErr: true}, + {name: "happy no content type", inputs: inputs{pipeline: &interfaces.FunctionPipeline{}, correlationId: uuid.NewString(), data: []byte("some data in response")}, setup: func(writer *mocks.TriggerResponseWriter, ctx *interfaceMocks.AppFunctionContext, ip inputs) { + ctx.On("CorrelationID").Return(correlationId) + ctx.On("ResponseContentType").Return(ip.contentType) + ctx.On("ResponseData").Return(data) + writer.On("Write", data).Return(5, nil) + }, wantErr: false}, + {name: "happy", inputs: inputs{pipeline: &interfaces.FunctionPipeline{}, correlationId: uuid.NewString(), data: []byte("some data in response")}, setup: func(writer *mocks.TriggerResponseWriter, ctx *interfaceMocks.AppFunctionContext, ip inputs) { + ctx.On("CorrelationID").Return(correlationId) + ctx.On("ResponseContentType").Return(ip.contentType) + ctx.On("ResponseData").Return(data) + writer.On("Write", data).Return(5, nil) + writer.On("Header").Return(ip.writerHeader) + }, wantErr: false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + writer := &mocks.TriggerResponseWriter{} + ctx := &interfaceMocks.AppFunctionContext{} + + tt.setup(writer, ctx, tt.inputs) + + err := getResponseHandler(writer, logger.NewMockClient())(ctx, tt.inputs.pipeline) + + assert.Equal(t, tt.wantErr, err != nil) + + assert.Equal(t, tt.inputs.contentType, tt.inputs.writerHeader.Get(common.ContentType)) + }) + } +} diff --git a/internal/trigger/messagebus/messaging.go b/internal/trigger/messagebus/messaging.go index 830a5396a..5cc32daad 100644 --- a/internal/trigger/messagebus/messaging.go +++ b/internal/trigger/messagebus/messaging.go @@ -1,5 +1,6 @@ // // Copyright (c) 2021 Intel Corporation +// Copyright (c) 2021 One Track Consulting // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,21 +21,17 @@ import ( "context" "errors" "fmt" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger" "strings" "sync" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/bootstrap/container" sdkCommon "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/common" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/util" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap" - bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" bootstrapMessaging "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/messaging" - "github.com/edgexfoundry/go-mod-bootstrap/v2/di" "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" "github.com/edgexfoundry/go-mod-core-contracts/v2/common" "github.com/edgexfoundry/go-mod-messaging/v2/messaging" @@ -43,24 +40,25 @@ import ( // Trigger implements Trigger to support MessageBusData type Trigger struct { - dic *di.Container - runtime *runtime.GolangRuntime - topics []types.TopicChannel - client messaging.MessageClient + messageProcessor trigger.MessageProcessor + serviceBinding trigger.ServiceBinding + topics []types.TopicChannel + client messaging.MessageClient } -func NewTrigger(dic *di.Container, runtime *runtime.GolangRuntime) *Trigger { +func NewTrigger(bnd trigger.ServiceBinding, mp trigger.MessageProcessor) *Trigger { return &Trigger{ - dic: dic, - runtime: runtime, + messageProcessor: mp, + serviceBinding: bnd, } } // Initialize ... func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context, background <-chan interfaces.BackgroundMessage) (bootstrap.Deferred, error) { var err error - lc := bootstrapContainer.LoggingClientFrom(trigger.dic.Get) - config := container.ConfigurationFrom(trigger.dic.Get) + + lc := trigger.serviceBinding.LoggingClient() + config := trigger.serviceBinding.Config() lc.Infof("Initializing Message Bus Trigger for '%s'", config.Trigger.EdgexMessageBus.Type) @@ -181,23 +179,31 @@ func (trigger *Trigger) messageHandler(logger logger.LoggingClient, _ types.Topi message.ContentType) logger.Tracef("MessageBus Trigger: Received message with %s=%s", common.CorrelationHeader, message.CorrelationID) - pipelines := trigger.runtime.GetMatchingPipelines(message.ReceivedTopic) - logger.Debugf("MessageBus Trigger found %d pipeline(s) that match the incoming topic '%s'", len(pipelines), message.ReceivedTopic) - for _, pipeline := range pipelines { - go trigger.processMessageWithPipeline(logger, message, pipeline) - } + appContext := trigger.serviceBinding.BuildContext(message) + + go func() { + processErr := trigger.messageProcessor.MessageReceived(appContext, message, trigger.responseHandler) + if processErr != nil { + trigger.serviceBinding.LoggingClient().Errorf("MQTT Trigger: Failed to process message on pipeline(s): %s", processErr.Error()) + } + }() } -func (trigger *Trigger) processMessageWithPipeline(logger logger.LoggingClient, message types.MessageEnvelope, pipeline *interfaces.FunctionPipeline) { - appContext := appfunction.NewContext(message.CorrelationID, trigger.dic, message.ContentType) +func (trigger *Trigger) responseHandler(appContext interfaces.AppFunctionContext, pipeline *interfaces.FunctionPipeline) error { + if appContext.ResponseData() != nil { + lc := trigger.serviceBinding.LoggingClient() + config := trigger.serviceBinding.Config() - messageError := trigger.runtime.ProcessMessage(appContext, message, pipeline) - if messageError != nil { - // ProcessMessage logs the error, so no need to log it here. - return - } + publishTopic, err := appContext.ApplyValues(config.Trigger.EdgexMessageBus.PublishHost.PublishTopic) + + if err != nil { + lc.Errorf("MessageBus Trigger: Unable to format output topic '%s' for pipeline '%s': %s", + config.Trigger.EdgexMessageBus.PublishHost.PublishTopic, + pipeline.Id, + err.Error()) + return err + } - if appContext.ResponseData() != nil { var contentType string if appContext.ResponseContentType() != "" { @@ -215,32 +221,23 @@ func (trigger *Trigger) processMessageWithPipeline(logger logger.LoggingClient, ContentType: contentType, } - config := container.ConfigurationFrom(trigger.dic.Get) - publishTopic, err := appContext.ApplyValues(config.Trigger.EdgexMessageBus.PublishHost.PublishTopic) - - if err != nil { - logger.Errorf("MessageBus Trigger: Unable to format output topic '%s' for pipeline '%s': %s", - config.Trigger.EdgexMessageBus.PublishHost.PublishTopic, - pipeline.Id, - err.Error()) - return - } - err = trigger.client.Publish(outputEnvelope, publishTopic) + if err != nil { - logger.Errorf("MessageBus trigger: Could not publish to topic '%s' for pipeline '%s': %s", + lc.Errorf("MessageBus trigger: Could not publish to topic '%s' for pipeline '%s': %s", publishTopic, pipeline.Id, err.Error()) - return + return err } - logger.Debugf("MessageBus Trigger: Published response message for pipeline '%s' on topic '%s' with %d bytes", + lc.Debugf("MessageBus Trigger: Published response message for pipeline '%s' on topic '%s' with %d bytes", pipeline.Id, publishTopic, len(appContext.ResponseData())) - logger.Tracef("MessageBus Trigger published message: %s=%s", common.CorrelationHeader, message.CorrelationID) + lc.Tracef("MessageBus Trigger published message: %s=%s", common.CorrelationHeader, appContext.CorrelationID()) } + return nil } func (_ *Trigger) createMessagingClientConfig(localConfig sdkCommon.MessageBusConfig) types.MessageBusConfig { @@ -272,7 +269,7 @@ func (trigger *Trigger) setOptionalAuthData(messageBusConfig *types.MessageBusCo lc.Infof("Setting options for secure MessageBus with AuthMode='%s' and SecretName='%s", authMode, secretName) - secretProvider := bootstrapContainer.SecretProviderFrom(trigger.dic.Get) + secretProvider := trigger.serviceBinding.SecretProvider() if secretProvider == nil { return errors.New("secret provider is missing. Make sure it is specified to be used in bootstrap.Run()") } diff --git a/internal/trigger/messagebus/messaging_test.go b/internal/trigger/messagebus/messaging_test.go index 36c616edd..b4fc41057 100644 --- a/internal/trigger/messagebus/messaging_test.go +++ b/internal/trigger/messagebus/messaging_test.go @@ -1,5 +1,6 @@ // // Copyright (c) 2021 Intel Corporation +// Copyright (c) 2021 One Track Consulting // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,20 +20,21 @@ package messagebus import ( "context" "encoding/json" - "os" + "fmt" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/messagebus/mocks" + interfaceMocks "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces/mocks" + bootstrapMessaging "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/messaging" + "github.com/stretchr/testify/mock" "sync" "testing" "time" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/bootstrap/container" sdkCommon "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/common" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime" + triggerMocks "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/mocks" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" - bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" - "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces/mocks" - bootstrapMessaging "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/messaging" - "github.com/edgexfoundry/go-mod-bootstrap/v2/di" + bootstrapMocks "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces/mocks" "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos" "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" @@ -50,7 +52,6 @@ import ( const TriggerTypeMessageBus = "EDGEX-MESSAGEBUS" var addEventRequest = createTestEventRequest() -var expectedEvent = addEventRequest.Event func createTestEventRequest() requests.AddEventRequest { event := dtos.NewEvent("thermostat", "LivingRoomThermostat", "temperature") @@ -59,17 +60,6 @@ func createTestEventRequest() requests.AddEventRequest { return request } -var dic *di.Container - -func TestMain(m *testing.M) { - dic = di.NewContainer(di.ServiceConstructorMap{ - bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} { - return logger.NewMockClient() - }, - }) - os.Exit(m.Run()) -} - func TestInitializeNotSecure(t *testing.T) { config := sdkCommon.ConfigurationStruct{ @@ -94,15 +84,11 @@ func TestInitializeNotSecure(t *testing.T) { }, } - dic.Update(di.ServiceConstructorMap{ - container.ConfigurationName: func(get di.Get) interface{} { - return &config - }, - }) - - goRuntime := &runtime.GolangRuntime{} + serviceBinding := &triggerMocks.ServiceBinding{} + serviceBinding.On("Config").Return(&config) + serviceBinding.On("LoggingClient").Return(logger.NewMockClient()) - trigger := NewTrigger(dic, goRuntime) + trigger := NewTrigger(serviceBinding, nil) _, err := trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) require.NoError(t, err) @@ -141,24 +127,18 @@ func TestInitializeSecure(t *testing.T) { }, } - mock := mocks.SecretProvider{} + mock := bootstrapMocks.SecretProvider{} mock.On("GetSecret", secretName).Return(map[string]string{ bootstrapMessaging.SecretUsernameKey: "user", bootstrapMessaging.SecretPasswordKey: "password", }, nil) - dic.Update(di.ServiceConstructorMap{ - container.ConfigurationName: func(get di.Get) interface{} { - return &config - }, - bootstrapContainer.SecretProviderName: func(get di.Get) interface{} { - return &mock - }, - }) + serviceBinding := &triggerMocks.ServiceBinding{} + serviceBinding.On("Config").Return(&config) + serviceBinding.On("LoggingClient").Return(logger.NewMockClient()) + serviceBinding.On("SecretProvider").Return(&mock) - goRuntime := &runtime.GolangRuntime{} - - trigger := NewTrigger(dic, goRuntime) + trigger := NewTrigger(serviceBinding, nil) _, err := trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) require.NoError(t, err) @@ -192,124 +172,17 @@ func TestInitializeBadConfiguration(t *testing.T) { }, } - dic.Update(di.ServiceConstructorMap{ - container.ConfigurationName: func(get di.Get) interface{} { - return &config - }, - }) + serviceBinding := &triggerMocks.ServiceBinding{} + serviceBinding.On("Config").Return(&config) + serviceBinding.On("LoggingClient").Return(logger.NewMockClient()) - goRuntime := &runtime.GolangRuntime{} + trigger := NewTrigger(serviceBinding, nil) - trigger := NewTrigger(dic, goRuntime) _, err := trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) assert.Error(t, err) } -func TestPipelinePerTopic(t *testing.T) { - testClientConfig := types.MessageBusConfig{ - PublishHost: types.HostInfo{ - Host: "*", - Port: 6664, - Protocol: "tcp", - }, - Type: "zero", - } - - testClient, err := messaging.NewMessageClient(testClientConfig) - require.NoError(t, err, "Unable to create to publisher") - - config := sdkCommon.ConfigurationStruct{ - Trigger: sdkCommon.TriggerInfo{ - Type: TriggerTypeMessageBus, - EdgexMessageBus: sdkCommon.MessageBusConfig{ - Type: "zero", - PublishHost: sdkCommon.PublishHostInfo{ - Host: "*", - Port: 6666, - Protocol: "tcp", - PublishTopic: "", - }, - SubscribeHost: sdkCommon.SubscribeHostInfo{ - Host: "localhost", - Port: 6664, - Protocol: "tcp", - SubscribeTopics: "edgex/events/device", - }, - }, - }, - } - - dic.Update(di.ServiceConstructorMap{ - container.ConfigurationName: func(get di.Get) interface{} { - return &config - }, - }) - - expectedCorrelationID := "123" - - transform1WasCalled := make(chan bool, 1) - transform2WasCalled := make(chan bool, 1) - - transform1 := func(appContext interfaces.AppFunctionContext, data interface{}) (bool, interface{}) { - assert.Equal(t, expectedEvent, data) - transform1WasCalled <- true - return false, nil - } - - transform2 := func(appContext interfaces.AppFunctionContext, data interface{}) (bool, interface{}) { - assert.Equal(t, expectedEvent, data) - transform2WasCalled <- true - return false, nil - } - - goRuntime := runtime.NewGolangRuntime("", nil, dic) - - err = goRuntime.AddFunctionsPipeline("P1", []string{"edgex/events/device/P1/#"}, []interfaces.AppFunction{transform1}) - require.NoError(t, err) - err = goRuntime.AddFunctionsPipeline("P2", []string{"edgex/events/device/P2/#"}, []interfaces.AppFunction{transform2}) - require.NoError(t, err) - - trigger := NewTrigger(dic, goRuntime) - _, err = trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) - require.NoError(t, err) - - payload, err := json.Marshal(addEventRequest) - require.NoError(t, err) - - message := types.MessageEnvelope{ - CorrelationID: expectedCorrelationID, - Payload: payload, - ContentType: common.ContentTypeJSON, - } - - //transform1 in P1 pipeline should be called after this executes - err = testClient.Publish(message, "edgex/events/device/P1/LivingRoomThermostat/temperature") - require.NoError(t, err, "Failed to publish message") - - select { - case <-transform1WasCalled: - // do nothing, just need to fall out. - case <-transform2WasCalled: - t.Fail() // should not have happened - case <-time.After(3 * time.Second): - require.Fail(t, "Transform never called") - } - - //transform2 in P2 pipeline should be called after this executes - err = testClient.Publish(message, "edgex/events/device/P2/LivingRoomThermostat/temperature") - require.NoError(t, err, "Failed to publish message") - - select { - case <-transform1WasCalled: - t.Fail() // should not have happened - case <-transform2WasCalled: - // do nothing, just need to fall out. - case <-time.After(3 * time.Second): - require.Fail(t, "Transform never called") - } -} - -func TestInitializeAndProcessEventWithNoOutput(t *testing.T) { +func TestInitializeAndProcessEvent(t *testing.T) { config := sdkCommon.ConfigurationStruct{ Trigger: sdkCommon.TriggerInfo{ @@ -332,26 +205,25 @@ func TestInitializeAndProcessEventWithNoOutput(t *testing.T) { }, } - dic.Update(di.ServiceConstructorMap{ - container.ConfigurationName: func(get di.Get) interface{} { - return &config - }, - }) - expectedCorrelationID := "123" - transformWasCalled := make(chan bool, 1) + messageProcessed := make(chan bool, 1) - transform1 := func(appContext interfaces.AppFunctionContext, data interface{}) (bool, interface{}) { - assert.Equal(t, expectedEvent, data) - transformWasCalled <- true - return false, nil - } + expectedContext := appfunction.NewContext(uuid.NewString(), nil, "") + + serviceBinding := &triggerMocks.ServiceBinding{} + serviceBinding.On("Config").Return(&config) + serviceBinding.On("LoggingClient").Return(logger.NewMockClient()) + serviceBinding.On("BuildContext", mock.Anything).Return(expectedContext) + + messageProcessor := &triggerMocks.MessageProcessor{} + messageProcessor.On("MessageReceived", expectedContext, mock.Anything, mock.AnythingOfType("interfaces.PipelineResponseHandler")).Return(func(interfaces.AppFunctionContext, types.MessageEnvelope, interfaces.PipelineResponseHandler) error { + messageProcessed <- true + return nil + }) - goRuntime := runtime.NewGolangRuntime("", nil, dic) - goRuntime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1}) + trigger := NewTrigger(serviceBinding, messageProcessor) - trigger := NewTrigger(dic, goRuntime) _, err := trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) require.NoError(t, err) @@ -380,321 +252,10 @@ func TestInitializeAndProcessEventWithNoOutput(t *testing.T) { require.NoError(t, err, "Failed to publish message") select { - case <-transformWasCalled: + case <-messageProcessed: // do nothing, just need to fall out. - case <-time.After(3 * time.Second): - require.Fail(t, "Transform never called") - } -} - -func TestInitializeAndProcessEventWithOutput(t *testing.T) { - - config := sdkCommon.ConfigurationStruct{ - Trigger: sdkCommon.TriggerInfo{ - Type: TriggerTypeMessageBus, - EdgexMessageBus: sdkCommon.MessageBusConfig{ - Type: "zero", - PublishHost: sdkCommon.PublishHostInfo{ - Host: "*", - Port: 5586, - Protocol: "tcp", - PublishTopic: "PublishTopic", - }, - SubscribeHost: sdkCommon.SubscribeHostInfo{ - Host: "localhost", - Port: 5584, - Protocol: "tcp", - SubscribeTopics: "SubscribeTopic", - }, - }, - }, - } - - dic.Update(di.ServiceConstructorMap{ - container.ConfigurationName: func(get di.Get) interface{} { - return &config - }, - }) - - responseContentType := uuid.New().String() - - expectedCorrelationID := "123" - - transformWasCalled := make(chan bool, 1) - - transform1 := func(appContext interfaces.AppFunctionContext, data interface{}) (bool, interface{}) { - assert.Equal(t, expectedEvent, data) - appContext.SetResponseContentType(responseContentType) - appContext.SetResponseData([]byte("Transformed")) //transformed message published to message bus - transformWasCalled <- true - return false, nil - - } - - goRuntime := runtime.NewGolangRuntime("", nil, dic) - goRuntime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1}) - - trigger := NewTrigger(dic, goRuntime) - - testClientConfig := types.MessageBusConfig{ - SubscribeHost: types.HostInfo{ - Host: "localhost", - Port: 5586, - Protocol: "tcp", - }, - PublishHost: types.HostInfo{ - Host: "*", - Port: 5584, - Protocol: "tcp", - }, - Type: "zero", - } - testClient, err := messaging.NewMessageClient(testClientConfig) //new client to publish & subscribe - require.NoError(t, err, "Failed to create test client") - - testTopics := []types.TopicChannel{{Topic: config.Trigger.EdgexMessageBus.PublishHost.PublishTopic, Messages: make(chan types.MessageEnvelope)}} - testMessageErrors := make(chan error) - - err = testClient.Subscribe(testTopics, testMessageErrors) //subscribe in order to receive transformed output to the bus - require.NoError(t, err) - _, err = trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) - require.NoError(t, err) - - payload, err := json.Marshal(addEventRequest) - require.NoError(t, err) - - message := types.MessageEnvelope{ - CorrelationID: expectedCorrelationID, - Payload: payload, - ContentType: common.ContentTypeJSON, - } - - err = testClient.Publish(message, "SubscribeTopic") - require.NoError(t, err, "Failed to publish message") - - select { - case <-transformWasCalled: - // do nothing, just need to fall out. - case <-time.After(3 * time.Second): - require.Fail(t, "Transform never called") - } - receiveMessage := true - - for receiveMessage { - select { - case msgErr := <-testMessageErrors: - receiveMessage = false - assert.Error(t, msgErr) - case msgs := <-testTopics[0].Messages: - receiveMessage = false - assert.Equal(t, "Transformed", string(msgs.Payload)) - assert.Equal(t, responseContentType, msgs.ContentType) - } - } -} - -func TestInitializeAndProcessEventWithOutput_InferJSON(t *testing.T) { - - config := sdkCommon.ConfigurationStruct{ - Trigger: sdkCommon.TriggerInfo{ - Type: TriggerTypeMessageBus, - EdgexMessageBus: sdkCommon.MessageBusConfig{ - Type: "zero", - PublishHost: sdkCommon.PublishHostInfo{ - Host: "*", - Port: 5701, - Protocol: "tcp", - PublishTopic: "PublishTopic", - }, - SubscribeHost: sdkCommon.SubscribeHostInfo{ - Host: "localhost", - Port: 5702, - Protocol: "tcp", - SubscribeTopics: "SubscribeTopic", - }, - }, - }, - } - - dic.Update(di.ServiceConstructorMap{ - container.ConfigurationName: func(get di.Get) interface{} { - return &config - }, - }) - - expectedCorrelationID := "123" - - transformWasCalled := make(chan bool, 1) - - transform1 := func(appContext interfaces.AppFunctionContext, data interface{}) (bool, interface{}) { - assert.Equal(t, expectedEvent, data) - appContext.SetResponseData([]byte("{;)Transformed")) //transformed message published to message bus - transformWasCalled <- true - return false, nil - - } - - goRuntime := runtime.NewGolangRuntime("", nil, dic) - goRuntime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1}) - - trigger := NewTrigger(dic, goRuntime) - - testClientConfig := types.MessageBusConfig{ - SubscribeHost: types.HostInfo{ - Host: "localhost", - Port: 5701, - Protocol: "tcp", - }, - PublishHost: types.HostInfo{ - Host: "*", - Port: 5702, - Protocol: "tcp", - }, - Type: "zero", - } - testClient, err := messaging.NewMessageClient(testClientConfig) //new client to publish & subscribe - require.NoError(t, err, "Failed to create test client") - - testTopics := []types.TopicChannel{{Topic: config.Trigger.EdgexMessageBus.PublishHost.PublishTopic, Messages: make(chan types.MessageEnvelope)}} - testMessageErrors := make(chan error) - - err = testClient.Subscribe(testTopics, testMessageErrors) //subscribe in order to receive transformed output to the bus - require.NoError(t, err) - _, err = trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) - require.NoError(t, err) - - payload, err := json.Marshal(addEventRequest) - require.NoError(t, err) - - message := types.MessageEnvelope{ - CorrelationID: expectedCorrelationID, - Payload: payload, - ContentType: common.ContentTypeJSON, - } - - err = testClient.Publish(message, "SubscribeTopic") - require.NoError(t, err, "Failed to publish message") - - select { - case <-transformWasCalled: - // do nothing, just need to fall out. - case <-time.After(3 * time.Second): - require.Fail(t, "Transform never called") - } - - receiveMessage := true - - for receiveMessage { - select { - case msgErr := <-testMessageErrors: - receiveMessage = false - assert.Error(t, msgErr) - case msgs := <-testTopics[0].Messages: - receiveMessage = false - assert.Equal(t, "{;)Transformed", string(msgs.Payload)) - assert.Equal(t, common.ContentTypeJSON, msgs.ContentType) - } - } -} - -func TestInitializeAndProcessEventWithOutput_AssumeCBOR(t *testing.T) { - - config := sdkCommon.ConfigurationStruct{ - Trigger: sdkCommon.TriggerInfo{ - Type: TriggerTypeMessageBus, - EdgexMessageBus: sdkCommon.MessageBusConfig{ - Type: "zero", - PublishHost: sdkCommon.PublishHostInfo{ - Host: "*", - Port: 5703, - Protocol: "tcp", - PublishTopic: "PublishTopic", - }, - SubscribeHost: sdkCommon.SubscribeHostInfo{ - Host: "localhost", - Port: 5704, - Protocol: "tcp", - SubscribeTopics: "SubscribeTopic", - }, - }, - }, - } - - dic.Update(di.ServiceConstructorMap{ - container.ConfigurationName: func(get di.Get) interface{} { - return &config - }, - }) - - expectedCorrelationID := "123" - - transformWasCalled := make(chan bool, 1) - - transform1 := func(appContext interfaces.AppFunctionContext, data interface{}) (bool, interface{}) { - assert.Equal(t, expectedEvent, data) - appContext.SetResponseData([]byte("Transformed")) //transformed message published to message bus - transformWasCalled <- true - return false, nil - } - - goRuntime := runtime.NewGolangRuntime("", nil, dic) - goRuntime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1}) - - trigger := NewTrigger(dic, goRuntime) - testClientConfig := types.MessageBusConfig{ - SubscribeHost: types.HostInfo{ - Host: "localhost", - Port: 5703, - Protocol: "tcp", - }, - PublishHost: types.HostInfo{ - Host: "*", - Port: 5704, - Protocol: "tcp", - }, - Type: "zero", - } - testClient, err := messaging.NewMessageClient(testClientConfig) //new client to publish & subscribe - require.NoError(t, err, "Failed to create test client") - - testTopics := []types.TopicChannel{{Topic: config.Trigger.EdgexMessageBus.PublishHost.PublishTopic, Messages: make(chan types.MessageEnvelope)}} - testMessageErrors := make(chan error) - - err = testClient.Subscribe(testTopics, testMessageErrors) //subscribe in order to receive transformed output to the bus - require.NoError(t, err) - _, err = trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) - require.NoError(t, err) - - payload, _ := json.Marshal(addEventRequest) - - message := types.MessageEnvelope{ - CorrelationID: expectedCorrelationID, - Payload: payload, - ContentType: common.ContentTypeJSON, - } - - err = testClient.Publish(message, "SubscribeTopic") - require.NoError(t, err, "Failed to publish message") - - select { - case <-transformWasCalled: - // do nothing, just need to fall out. - case <-time.After(3 * time.Second): - require.Fail(t, "Transform never called") - } - - receiveMessage := true - - for receiveMessage { - select { - case msgErr := <-testMessageErrors: - receiveMessage = false - assert.Error(t, msgErr) - case msgs := <-testTopics[0].Messages: - receiveMessage = false - assert.Equal(t, "Transformed", string(msgs.Payload)) - assert.Equal(t, common.ContentTypeCBOR, msgs.ContentType) - } + case <-time.After(5 * time.Second): + require.Fail(t, "Message never processed") } } @@ -721,19 +282,15 @@ func TestInitializeAndProcessBackgroundMessage(t *testing.T) { }, } - dic.Update(di.ServiceConstructorMap{ - container.ConfigurationName: func(get di.Get) interface{} { - return &config - }, - }) - expectedCorrelationID := "123" expectedPayload := []byte(`{"id":"5888dea1bd36573f4681d6f9","origin":1471806386919,"pushed":0,"device":"livingroomthermostat","readings":[{"id":"5888dea0bd36573f4681d6f8","created":1485364896983,"modified":1485364896983,"origin":1471806386919,"pushed":0,"name":"temperature","value":"38","device":"livingroomthermostat"}]}`) - goRuntime := runtime.NewGolangRuntime("", nil, dic) + serviceBinding := &triggerMocks.ServiceBinding{} + serviceBinding.On("Config").Return(&config) + serviceBinding.On("LoggingClient").Return(logger.NewMockClient()) - trigger := NewTrigger(dic, goRuntime) + trigger := NewTrigger(serviceBinding, nil) testClientConfig := types.MessageBusConfig{ SubscribeHost: types.HostInfo{ @@ -787,91 +344,6 @@ func TestInitializeAndProcessBackgroundMessage(t *testing.T) { } } -func TestInitializeAndProcessEventMultipleTopics(t *testing.T) { - config := sdkCommon.ConfigurationStruct{ - Trigger: sdkCommon.TriggerInfo{ - Type: TriggerTypeMessageBus, - EdgexMessageBus: sdkCommon.MessageBusConfig{ - Type: "zero", - PublishHost: sdkCommon.PublishHostInfo{ - Host: "*", - Port: 5592, - Protocol: "tcp", - PublishTopic: "", - }, - SubscribeHost: sdkCommon.SubscribeHostInfo{ - Host: "localhost", - Port: 5594, - Protocol: "tcp", - SubscribeTopics: "t1,t2", - }, - }, - }, - } - - dic.Update(di.ServiceConstructorMap{ - container.ConfigurationName: func(get di.Get) interface{} { - return &config - }, - }) - - expectedCorrelationID := "123" - - transformWasCalled := make(chan bool, 1) - transform1 := func(appContext interfaces.AppFunctionContext, data interface{}) (bool, interface{}) { - require.Equal(t, expectedEvent, data) - transformWasCalled <- true - return false, nil - } - - goRuntime := runtime.NewGolangRuntime("", nil, dic) - goRuntime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1}) - - trigger := NewTrigger(dic, goRuntime) - _, err := trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) - require.NoError(t, err) - - payload, _ := json.Marshal(addEventRequest) - - message := types.MessageEnvelope{ - CorrelationID: expectedCorrelationID, - Payload: payload, - ContentType: common.ContentTypeJSON, - } - - testClientConfig := types.MessageBusConfig{ - PublishHost: types.HostInfo{ - Host: "*", - Port: 5594, - Protocol: "tcp", - }, - Type: "zero", - } - - testClient, err := messaging.NewMessageClient(testClientConfig) - require.NoError(t, err, "Unable to create to publisher") - - err = testClient.Publish(message, "t1") //transform1 should be called after this executes - require.NoError(t, err, "Failed to publish message") - - select { - case <-transformWasCalled: - // do nothing, just need to fall out. - case <-time.After(3 * time.Second): - require.Fail(t, "Transform never called for t1") - } - - err = testClient.Publish(message, "t2") //transform1 should be called after this executes - require.NoError(t, err, "Failed to publish message") - - select { - case <-transformWasCalled: - // do nothing, just need to fall out. - case <-time.After(3 * time.Second): - require.Fail(t, "Transform never called t2") - } -} - type mockBackgroundMessage struct { DeliverToTopic string Payload types.MessageEnvelope @@ -884,3 +356,115 @@ func (bg mockBackgroundMessage) Topic() string { func (bg mockBackgroundMessage) Message() types.MessageEnvelope { return bg.Payload } + +func TestTrigger_responseHandler(t *testing.T) { + const topicWithPlaceholder = "/topic/with/{ph}/placeholder" + const formattedTopic = "topic/with/ph-value/placeholder" + const setContentType = "content-type" + const correlationId = "corrid-1233523" + var setContentTypePayload = []byte("not-empty") + var inferJsonPayload = []byte("{not-empty") + var inferJsonArrayPayload = []byte("[not-empty") + + type fields struct { + publishTopic string + } + type args struct { + pipeline *interfaces.FunctionPipeline + } + tests := []struct { + name string + fields fields + args args + wantErr bool + setup func(*triggerMocks.ServiceBinding, *interfaceMocks.AppFunctionContext, *mocks.MessageClient) + }{ + {name: "no response data", wantErr: false, setup: func(processor *triggerMocks.ServiceBinding, functionContext *interfaceMocks.AppFunctionContext, _ *mocks.MessageClient) { + functionContext.On("ResponseData").Return(nil) + }}, + {name: "topic format failed", fields: fields{publishTopic: topicWithPlaceholder}, args: args{pipeline: &interfaces.FunctionPipeline{}}, wantErr: true, setup: func(processor *triggerMocks.ServiceBinding, functionContext *interfaceMocks.AppFunctionContext, _ *mocks.MessageClient) { + functionContext.On("ResponseData").Return(setContentTypePayload) + functionContext.On("ApplyValues", topicWithPlaceholder).Return("", fmt.Errorf("apply values failed")) + }}, + {name: "publish failed", fields: fields{publishTopic: topicWithPlaceholder}, args: args{pipeline: &interfaces.FunctionPipeline{}}, wantErr: true, setup: func(processor *triggerMocks.ServiceBinding, functionContext *interfaceMocks.AppFunctionContext, client *mocks.MessageClient) { + functionContext.On("ResponseData").Return(setContentTypePayload) + functionContext.On("ResponseContentType").Return(setContentType) + functionContext.On("CorrelationID").Return(correlationId) + functionContext.On("ApplyValues", topicWithPlaceholder).Return(formattedTopic, nil) + client.On("Publish", mock.Anything, mock.Anything).Return(func(envelope types.MessageEnvelope, s string) error { + return fmt.Errorf("publish failed") + }) + }}, + {name: "happy", fields: fields{publishTopic: topicWithPlaceholder}, args: args{pipeline: &interfaces.FunctionPipeline{}}, wantErr: false, setup: func(processor *triggerMocks.ServiceBinding, functionContext *interfaceMocks.AppFunctionContext, client *mocks.MessageClient) { + functionContext.On("ResponseData").Return(setContentTypePayload) + functionContext.On("ResponseContentType").Return(setContentType) + functionContext.On("CorrelationID").Return(correlationId) + functionContext.On("ApplyValues", topicWithPlaceholder).Return(formattedTopic, nil) + client.On("Publish", mock.Anything, mock.Anything).Return(func(envelope types.MessageEnvelope, s string) error { + assert.Equal(t, correlationId, envelope.CorrelationID) + assert.Equal(t, setContentType, envelope.ContentType) + assert.Equal(t, setContentTypePayload, envelope.Payload) + return nil + }) + }}, + {name: "happy assume CBOR", fields: fields{publishTopic: topicWithPlaceholder}, args: args{pipeline: &interfaces.FunctionPipeline{}}, wantErr: false, setup: func(processor *triggerMocks.ServiceBinding, functionContext *interfaceMocks.AppFunctionContext, client *mocks.MessageClient) { + functionContext.On("ResponseData").Return(setContentTypePayload) + functionContext.On("ResponseContentType").Return("") + functionContext.On("CorrelationID").Return(correlationId) + functionContext.On("ApplyValues", topicWithPlaceholder).Return(formattedTopic, nil) + client.On("Publish", mock.Anything, mock.Anything).Return(func(envelope types.MessageEnvelope, s string) error { + assert.Equal(t, correlationId, envelope.CorrelationID) + assert.Equal(t, common.ContentTypeCBOR, envelope.ContentType) + assert.Equal(t, setContentTypePayload, envelope.Payload) + return nil + }) + }}, + {name: "happy infer JSON", fields: fields{publishTopic: topicWithPlaceholder}, args: args{pipeline: &interfaces.FunctionPipeline{}}, wantErr: false, setup: func(processor *triggerMocks.ServiceBinding, functionContext *interfaceMocks.AppFunctionContext, client *mocks.MessageClient) { + functionContext.On("ResponseData").Return(inferJsonPayload) + functionContext.On("ResponseContentType").Return("") + functionContext.On("CorrelationID").Return(correlationId) + functionContext.On("ApplyValues", topicWithPlaceholder).Return(formattedTopic, nil) + client.On("Publish", mock.Anything, mock.Anything).Return(func(envelope types.MessageEnvelope, s string) error { + assert.Equal(t, correlationId, envelope.CorrelationID) + assert.Equal(t, common.ContentTypeJSON, envelope.ContentType) + assert.Equal(t, inferJsonPayload, envelope.Payload) + return nil + }) + }}, + {name: "happy infer JSON array", fields: fields{publishTopic: topicWithPlaceholder}, args: args{pipeline: &interfaces.FunctionPipeline{}}, wantErr: false, setup: func(processor *triggerMocks.ServiceBinding, functionContext *interfaceMocks.AppFunctionContext, client *mocks.MessageClient) { + functionContext.On("ResponseData").Return(inferJsonArrayPayload) + functionContext.On("ResponseContentType").Return("") + functionContext.On("CorrelationID").Return(correlationId) + functionContext.On("ApplyValues", topicWithPlaceholder).Return(formattedTopic, nil) + client.On("Publish", mock.Anything, mock.Anything).Return(func(envelope types.MessageEnvelope, s string) error { + assert.Equal(t, correlationId, envelope.CorrelationID) + assert.Equal(t, common.ContentTypeJSON, envelope.ContentType) + assert.Equal(t, inferJsonArrayPayload, envelope.Payload) + return nil + }) + }}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + serviceBinding := &triggerMocks.ServiceBinding{} + + serviceBinding.On("Config").Return(&sdkCommon.ConfigurationStruct{Trigger: sdkCommon.TriggerInfo{EdgexMessageBus: sdkCommon.MessageBusConfig{PublishHost: sdkCommon.PublishHostInfo{PublishTopic: tt.fields.publishTopic}}}}) + serviceBinding.On("LoggingClient").Return(logger.NewMockClient()) + + ctx := &interfaceMocks.AppFunctionContext{} + client := &mocks.MessageClient{} + + if tt.setup != nil { + tt.setup(serviceBinding, ctx, client) + } + + trigger := &Trigger{ + serviceBinding: serviceBinding, + client: client, + } + if err := trigger.responseHandler(ctx, tt.args.pipeline); (err != nil) != tt.wantErr { + t.Errorf("responseHandler() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/internal/trigger/messagebus/mocks/MessageClient.go b/internal/trigger/messagebus/mocks/MessageClient.go new file mode 100644 index 000000000..c5bdaa7f2 --- /dev/null +++ b/internal/trigger/messagebus/mocks/MessageClient.go @@ -0,0 +1,69 @@ +// Code generated by mockery v2.9.4. DO NOT EDIT. + +package mocks + +import ( + types "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" + mock "github.com/stretchr/testify/mock" +) + +// MessageClient is an autogenerated mock type for the MessageClient type +type MessageClient struct { + mock.Mock +} + +// Connect provides a mock function with given fields: +func (_m *MessageClient) Connect() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Disconnect provides a mock function with given fields: +func (_m *MessageClient) Disconnect() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Publish provides a mock function with given fields: message, topic +func (_m *MessageClient) Publish(message types.MessageEnvelope, topic string) error { + ret := _m.Called(message, topic) + + var r0 error + if rf, ok := ret.Get(0).(func(types.MessageEnvelope, string) error); ok { + r0 = rf(message, topic) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Subscribe provides a mock function with given fields: topics, messageErrors +func (_m *MessageClient) Subscribe(topics []types.TopicChannel, messageErrors chan error) error { + ret := _m.Called(topics, messageErrors) + + var r0 error + if rf, ok := ret.Get(0).(func([]types.TopicChannel, chan error) error); ok { + r0 = rf(topics, messageErrors) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/internal/trigger/mqtt/mocks/Client.go b/internal/trigger/mqtt/mocks/Client.go new file mode 100644 index 000000000..aace22363 --- /dev/null +++ b/internal/trigger/mqtt/mocks/Client.go @@ -0,0 +1,151 @@ +// Code generated by mockery v2.9.4. DO NOT EDIT. + +package mocks + +import ( + mqtt "github.com/eclipse/paho.mqtt.golang" + mock "github.com/stretchr/testify/mock" +) + +// Client is an autogenerated mock type for the Client type +type Client struct { + mock.Mock +} + +// AddRoute provides a mock function with given fields: topic, callback +func (_m *Client) AddRoute(topic string, callback mqtt.MessageHandler) { + _m.Called(topic, callback) +} + +// Connect provides a mock function with given fields: +func (_m *Client) Connect() mqtt.Token { + ret := _m.Called() + + var r0 mqtt.Token + if rf, ok := ret.Get(0).(func() mqtt.Token); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(mqtt.Token) + } + } + + return r0 +} + +// Disconnect provides a mock function with given fields: quiesce +func (_m *Client) Disconnect(quiesce uint) { + _m.Called(quiesce) +} + +// IsConnected provides a mock function with given fields: +func (_m *Client) IsConnected() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// IsConnectionOpen provides a mock function with given fields: +func (_m *Client) IsConnectionOpen() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// OptionsReader provides a mock function with given fields: +func (_m *Client) OptionsReader() mqtt.ClientOptionsReader { + ret := _m.Called() + + var r0 mqtt.ClientOptionsReader + if rf, ok := ret.Get(0).(func() mqtt.ClientOptionsReader); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(mqtt.ClientOptionsReader) + } + + return r0 +} + +// Publish provides a mock function with given fields: topic, qos, retained, payload +func (_m *Client) Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token { + ret := _m.Called(topic, qos, retained, payload) + + var r0 mqtt.Token + if rf, ok := ret.Get(0).(func(string, byte, bool, interface{}) mqtt.Token); ok { + r0 = rf(topic, qos, retained, payload) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(mqtt.Token) + } + } + + return r0 +} + +// Subscribe provides a mock function with given fields: topic, qos, callback +func (_m *Client) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token { + ret := _m.Called(topic, qos, callback) + + var r0 mqtt.Token + if rf, ok := ret.Get(0).(func(string, byte, mqtt.MessageHandler) mqtt.Token); ok { + r0 = rf(topic, qos, callback) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(mqtt.Token) + } + } + + return r0 +} + +// SubscribeMultiple provides a mock function with given fields: filters, callback +func (_m *Client) SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token { + ret := _m.Called(filters, callback) + + var r0 mqtt.Token + if rf, ok := ret.Get(0).(func(map[string]byte, mqtt.MessageHandler) mqtt.Token); ok { + r0 = rf(filters, callback) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(mqtt.Token) + } + } + + return r0 +} + +// Unsubscribe provides a mock function with given fields: topics +func (_m *Client) Unsubscribe(topics ...string) mqtt.Token { + _va := make([]interface{}, len(topics)) + for _i := range topics { + _va[_i] = topics[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 mqtt.Token + if rf, ok := ret.Get(0).(func(...string) mqtt.Token); ok { + r0 = rf(topics...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(mqtt.Token) + } + } + + return r0 +} diff --git a/internal/trigger/mqtt/mocks/Message.go b/internal/trigger/mqtt/mocks/Message.go new file mode 100644 index 000000000..ea7cee9b3 --- /dev/null +++ b/internal/trigger/mqtt/mocks/Message.go @@ -0,0 +1,101 @@ +// Code generated by mockery v2.9.4. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// Message is an autogenerated mock type for the Message type +type Message struct { + mock.Mock +} + +// Ack provides a mock function with given fields: +func (_m *Message) Ack() { + _m.Called() +} + +// Duplicate provides a mock function with given fields: +func (_m *Message) Duplicate() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MessageID provides a mock function with given fields: +func (_m *Message) MessageID() uint16 { + ret := _m.Called() + + var r0 uint16 + if rf, ok := ret.Get(0).(func() uint16); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint16) + } + + return r0 +} + +// Payload provides a mock function with given fields: +func (_m *Message) Payload() []byte { + ret := _m.Called() + + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + return r0 +} + +// Qos provides a mock function with given fields: +func (_m *Message) Qos() byte { + ret := _m.Called() + + var r0 byte + if rf, ok := ret.Get(0).(func() byte); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(byte) + } + + return r0 +} + +// Retained provides a mock function with given fields: +func (_m *Message) Retained() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Topic provides a mock function with given fields: +func (_m *Message) Topic() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} diff --git a/internal/trigger/mqtt/mocks/Token.go b/internal/trigger/mqtt/mocks/Token.go new file mode 100644 index 000000000..196aca12d --- /dev/null +++ b/internal/trigger/mqtt/mocks/Token.go @@ -0,0 +1,72 @@ +// Code generated by mockery v2.9.4. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + + time "time" +) + +// Token is an autogenerated mock type for the Token type +type Token struct { + mock.Mock +} + +// Done provides a mock function with given fields: +func (_m *Token) Done() <-chan struct{} { + ret := _m.Called() + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// Error provides a mock function with given fields: +func (_m *Token) Error() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Wait provides a mock function with given fields: +func (_m *Token) Wait() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// WaitTimeout provides a mock function with given fields: _a0 +func (_m *Token) WaitTimeout(_a0 time.Duration) bool { + ret := _m.Called(_a0) + + var r0 bool + if rf, ok := ret.Get(0).(func(time.Duration) bool); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} diff --git a/internal/trigger/mqtt/mqtt.go b/internal/trigger/mqtt/mqtt.go index 88d10cc93..466fa9f8a 100644 --- a/internal/trigger/mqtt/mqtt.go +++ b/internal/trigger/mqtt/mqtt.go @@ -1,5 +1,6 @@ // // Copyright (c) 2021 Intel Corporation +// Copyright (c) 2021 One Track Consulting // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,24 +21,20 @@ import ( "context" "errors" "fmt" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/common" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger" "net/url" "strings" "sync" "time" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" - - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/bootstrap/container" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/secure" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/util" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap" - bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" - "github.com/edgexfoundry/go-mod-bootstrap/v2/di" "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" - "github.com/edgexfoundry/go-mod-core-contracts/v2/common" + commonContracts "github.com/edgexfoundry/go-mod-core-contracts/v2/common" "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" pahoMqtt "github.com/eclipse/paho.mqtt.golang" @@ -46,28 +43,31 @@ import ( // Trigger implements Trigger to support Triggers type Trigger struct { - dic *di.Container - lc logger.LoggingClient - mqttClient pahoMqtt.Client - runtime *runtime.GolangRuntime - qos byte - retain bool - publishTopic string + messageProcessor trigger.MessageProcessor + serviceBinding trigger.ServiceBinding + lc logger.LoggingClient + mqttClient pahoMqtt.Client + qos byte + retain bool + publishTopic string + config *common.ConfigurationStruct } -func NewTrigger(dic *di.Container, runtime *runtime.GolangRuntime) *Trigger { - return &Trigger{ - dic: dic, - runtime: runtime, - lc: bootstrapContainer.LoggingClientFrom(dic.Get), +func NewTrigger(bnd trigger.ServiceBinding, mp trigger.MessageProcessor) *Trigger { + t := &Trigger{ + messageProcessor: mp, + serviceBinding: bnd, } + + return t } // Initialize initializes the Trigger for an external MQTT broker func (trigger *Trigger) Initialize(_ *sync.WaitGroup, _ context.Context, background <-chan interfaces.BackgroundMessage) (bootstrap.Deferred, error) { // Convenience short cuts lc := trigger.lc - config := container.ConfigurationFrom(trigger.dic.Get) + config := trigger.config + brokerConfig := config.Trigger.ExternalMqtt topics := config.Trigger.ExternalMqtt.SubscribeTopics @@ -104,10 +104,9 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, _ context.Context, backgro opts.KeepAlive = brokerConfig.KeepAlive opts.Servers = []*url.URL{brokerUrl} - // Since this factory is shared between the MQTT pipeline function and this trigger we must provide - // a dummy AppFunctionContext which will provide access to GetSecret mqttFactory := secure.NewMqttFactory( - appfunction.NewContext("", trigger.dic, ""), + trigger.serviceBinding.SecretProvider(), + trigger.serviceBinding.LoggingClient(), brokerConfig.AuthMode, brokerConfig.SecretPath, brokerConfig.SkipCertVerify, @@ -139,7 +138,7 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, _ context.Context, backgro func (trigger *Trigger) onConnectHandler(mqttClient pahoMqtt.Client) { // Convenience short cuts lc := trigger.lc - config := container.ConfigurationFrom(trigger.dic.Get) + config := trigger.config topics := util.DeleteEmptyAndTrim(strings.FieldsFunc(config.Trigger.ExternalMqtt.SubscribeTopics, util.SplitComma)) qos := config.Trigger.ExternalMqtt.QoS @@ -160,10 +159,10 @@ func (trigger *Trigger) messageHandler(_ pahoMqtt.Client, mqttMessage pahoMqtt.M lc := trigger.lc data := mqttMessage.Payload() - contentType := common.ContentTypeJSON + contentType := commonContracts.ContentTypeJSON if data[0] != byte('{') && data[0] != byte('[') { // If not JSON then assume it is CBOR - contentType = common.ContentTypeCBOR + contentType = commonContracts.ContentTypeCBOR } correlationID := uuid.New().String() @@ -179,25 +178,19 @@ func (trigger *Trigger) messageHandler(_ pahoMqtt.Client, mqttMessage pahoMqtt.M len(message.Payload), message.ReceivedTopic, message.ContentType) - lc.Tracef("%s=%s", common.CorrelationHeader, correlationID) + lc.Tracef("%s=%s", commonContracts.CorrelationHeader, correlationID) - pipelines := trigger.runtime.GetMatchingPipelines(message.ReceivedTopic) - lc.Debugf("MQTT Trigger found %d pipeline(s) that match the incoming topic '%s'", len(pipelines), message.ReceivedTopic) - for _, pipeline := range pipelines { - go trigger.processMessageWithPipeline(message, pipeline) - } -} + ctx := trigger.serviceBinding.BuildContext(message) -func (trigger *Trigger) processMessageWithPipeline(envelope types.MessageEnvelope, pipeline *interfaces.FunctionPipeline) { - appContext := appfunction.NewContext(envelope.CorrelationID, trigger.dic, envelope.ContentType) - - messageError := trigger.runtime.ProcessMessage(appContext, envelope, pipeline) - if messageError != nil { - // ProcessMessage logs the error, so no need to log it here. - // ToDo: Do we want to publish the error back to the Broker? - return - } + go func() { + processErr := trigger.messageProcessor.MessageReceived(ctx, message, trigger.responseHandler) + if processErr != nil { + lc.Errorf("MQTT Trigger: Failed to process message on pipeline(s): %s", processErr.Error()) + } + }() +} +func (trigger *Trigger) responseHandler(appContext interfaces.AppFunctionContext, pipeline *interfaces.FunctionPipeline) error { if len(appContext.ResponseData()) > 0 && len(trigger.publishTopic) > 0 { formattedTopic, err := appContext.ApplyValues(trigger.publishTopic) @@ -206,19 +199,22 @@ func (trigger *Trigger) processMessageWithPipeline(envelope types.MessageEnvelop trigger.publishTopic, pipeline.Id, err.Error()) + return err } if token := trigger.mqttClient.Publish(formattedTopic, trigger.qos, trigger.retain, appContext.ResponseData()); token.Wait() && token.Error() != nil { trigger.lc.Errorf("MQTT trigger: Could not publish to topic '%s' for pipeline '%s': %s", formattedTopic, pipeline.Id, - token.Error().Error()) + token.Error()) + return token.Error() } else { trigger.lc.Debugf("MQTT Trigger: Published response message for pipeline '%s' on topic '%s' with %d bytes", pipeline.Id, formattedTopic, len(appContext.ResponseData())) - trigger.lc.Tracef("MQTT Trigger published message: %s=%s", common.CorrelationHeader, envelope.CorrelationID) + trigger.lc.Tracef("MQTT Trigger published message: %s=%s", commonContracts.CorrelationHeader, appContext.CorrelationID()) } } + return nil } diff --git a/internal/trigger/mqtt/mqtt_test.go b/internal/trigger/mqtt/mqtt_test.go new file mode 100644 index 000000000..17d87d682 --- /dev/null +++ b/internal/trigger/mqtt/mqtt_test.go @@ -0,0 +1,172 @@ +// +// Copyright (c) 2021 One Track Consulting +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package mqtt + +import ( + "fmt" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction" + sdkCommon "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/common" + triggerMocks "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/mocks" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/mqtt/mocks" + "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" + interfaceMocks "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces/mocks" + "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" + "github.com/edgexfoundry/go-mod-core-contracts/v2/common" + "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "testing" +) + +func TestNewTrigger(t *testing.T) { + bnd := &triggerMocks.ServiceBinding{} + mp := &triggerMocks.MessageProcessor{} + + got := NewTrigger(bnd, mp) + + require.NotNil(t, got) + + assert.Equal(t, bnd, got.serviceBinding) + assert.Equal(t, mp, got.messageProcessor) +} + +func TestTrigger_responseHandler(t *testing.T) { + const topicWithPlaceholder = "/topic/with/{ph}/placeholder" + const formattedTopic = "topic/with/ph-value/placeholder" + const setContentType = "content-type" + const correlationId = "corrid-1233523" + var payload = []byte("not-empty") + const retain = true + var qos = byte(8) + + type fields struct { + publishTopic string + qos byte + retain bool + } + type args struct { + pipeline *interfaces.FunctionPipeline + } + tests := []struct { + name string + fields fields + args args + wantErr bool + setup func(*triggerMocks.ServiceBinding, *interfaceMocks.AppFunctionContext, *mocks.Client, *mocks.Token) + }{ + {name: "no response data", wantErr: false, setup: func(processor *triggerMocks.ServiceBinding, functionContext *interfaceMocks.AppFunctionContext, _ *mocks.Client, _ *mocks.Token) { + functionContext.On("ResponseData").Return(nil) + }}, + {name: "topic format failed", fields: fields{qos: qos, retain: retain, publishTopic: topicWithPlaceholder}, args: args{pipeline: &interfaces.FunctionPipeline{}}, wantErr: true, setup: func(processor *triggerMocks.ServiceBinding, functionContext *interfaceMocks.AppFunctionContext, _ *mocks.Client, _ *mocks.Token) { + functionContext.On("ResponseData").Return(payload) + functionContext.On("ApplyValues", topicWithPlaceholder).Return("", fmt.Errorf("apply values failed")) + }}, + {name: "publish failed", fields: fields{qos: qos, retain: retain, publishTopic: topicWithPlaceholder}, args: args{pipeline: &interfaces.FunctionPipeline{}}, wantErr: true, setup: func(processor *triggerMocks.ServiceBinding, functionContext *interfaceMocks.AppFunctionContext, client *mocks.Client, token *mocks.Token) { + functionContext.On("ResponseData").Return(payload) + functionContext.On("ResponseContentType").Return(setContentType) + functionContext.On("CorrelationID").Return(correlationId) + functionContext.On("ApplyValues", topicWithPlaceholder).Return(formattedTopic, nil) + client.On("Publish", mock.Anything, qos, retain, mock.Anything).Return(token) + token.On("Wait").Return(true) + token.On("Error").Return(fmt.Errorf("publish error")) + }}, + {name: "happy", fields: fields{qos: qos, retain: retain, publishTopic: topicWithPlaceholder}, args: args{pipeline: &interfaces.FunctionPipeline{}}, wantErr: false, setup: func(processor *triggerMocks.ServiceBinding, functionContext *interfaceMocks.AppFunctionContext, client *mocks.Client, token *mocks.Token) { + functionContext.On("ResponseData").Return(payload) + functionContext.On("ResponseContentType").Return(setContentType) + functionContext.On("CorrelationID").Return(correlationId) + functionContext.On("ApplyValues", topicWithPlaceholder).Return(formattedTopic, nil) + client.On("Publish", mock.Anything, qos, retain, mock.Anything).Return(token) + token.On("Wait").Return(true) + token.On("Error").Return(nil) + }}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + serviceBinding := &triggerMocks.ServiceBinding{} + + serviceBinding.On("Config").Return(&sdkCommon.ConfigurationStruct{Trigger: sdkCommon.TriggerInfo{EdgexMessageBus: sdkCommon.MessageBusConfig{PublishHost: sdkCommon.PublishHostInfo{PublishTopic: tt.fields.publishTopic}}}}) + + ctx := &interfaceMocks.AppFunctionContext{} + client := &mocks.Client{} + token := &mocks.Token{} + + if tt.setup != nil { + tt.setup(serviceBinding, ctx, client, token) + } + + trigger := &Trigger{ + serviceBinding: serviceBinding, + qos: qos, + retain: retain, + publishTopic: tt.fields.publishTopic, + lc: logger.NewMockClient(), + mqttClient: client, + } + if err := trigger.responseHandler(ctx, tt.args.pipeline); (err != nil) != tt.wantErr { + t.Errorf("responseHandler() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestTrigger_messageHandler(t *testing.T) { + tests := []struct { + name string + data []byte + expectedContentType string + }{ + {name: "assume CBOR", data: []byte("not json"), expectedContentType: common.ContentTypeCBOR}, + {name: "json array", data: []byte("[ json array"), expectedContentType: common.ContentTypeJSON}, + {name: "json object", data: []byte("{ json object"), expectedContentType: common.ContentTypeJSON}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + topic := uuid.NewString() + ctx := &appfunction.Context{} + message := &mocks.Message{} + message.On("Payload").Return(tt.data) + message.On("Topic").Return(topic) + + serviceBinding := &triggerMocks.ServiceBinding{} + serviceBinding.On("BuildContext", mock.Anything).Return(func(envelope types.MessageEnvelope) interfaces.AppFunctionContext { + assert.Equal(t, tt.expectedContentType, envelope.ContentType) + assert.Equal(t, tt.data, envelope.Payload) + assert.Equal(t, topic, envelope.ReceivedTopic) + assert.NotEmpty(t, envelope.CorrelationID) + + return ctx + }) + + messageProcessor := &triggerMocks.MessageProcessor{} + messageProcessor.On("MessageReceived", ctx, mock.Anything, mock.Anything).Return(func(inctx interfaces.AppFunctionContext, _ types.MessageEnvelope, _ interfaces.PipelineResponseHandler) error { + assert.Equal(t, ctx, inctx) + return nil + }) + + trigger := &Trigger{ + lc: logger.NewMockClient(), + serviceBinding: serviceBinding, + messageProcessor: messageProcessor, + } + + trigger.messageHandler(nil, message) + }) + } +} diff --git a/pkg/secure/mqttfactory.go b/pkg/secure/mqttfactory.go index 7fc266a46..8e21b0ab2 100644 --- a/pkg/secure/mqttfactory.go +++ b/pkg/secure/mqttfactory.go @@ -20,16 +20,13 @@ import ( "crypto/tls" "crypto/x509" "errors" - "github.com/eclipse/paho.mqtt.golang" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/messaging" "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" - - "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" ) type MqttFactory struct { - appContext interfaces.AppFunctionContext + sp messaging.SecretDataProvider logger logger.LoggingClient authMode string secretPath string @@ -37,10 +34,10 @@ type MqttFactory struct { skipCertVerify bool } -func NewMqttFactory(appContext interfaces.AppFunctionContext, mode string, path string, skipVerify bool) MqttFactory { +func NewMqttFactory(sp messaging.SecretDataProvider, log logger.LoggingClient, mode string, path string, skipVerify bool) MqttFactory { return MqttFactory{ - appContext: appContext, - logger: appContext.LoggingClient(), + sp: sp, + logger: log, authMode: mode, secretPath: path, skipCertVerify: skipVerify, @@ -56,7 +53,7 @@ func (factory MqttFactory) Create(opts *mqtt.ClientOptions) (mqtt.Client, error) factory.opts = opts //get the secrets from the secret provider and populate the struct - secretData, err := messaging.GetSecretData(factory.authMode, factory.secretPath, factory.appContext) + secretData, err := messaging.GetSecretData(factory.authMode, factory.secretPath, factory.sp) if err != nil { return nil, err } diff --git a/pkg/secure/mqttfactory_test.go b/pkg/secure/mqttfactory_test.go index f47da6b36..d113c24a3 100644 --- a/pkg/secure/mqttfactory_test.go +++ b/pkg/secure/mqttfactory_test.go @@ -36,7 +36,7 @@ import ( var lc logger.LoggingClient var dic *di.Container -var context *appfunction.Context +var secretDataProvider messaging.SecretDataProvider func TestMain(m *testing.M) { lc = logger.NewMockClient() @@ -46,7 +46,10 @@ func TestMain(m *testing.M) { }, }) - context = appfunction.NewContext("123", dic, "") + ctx := appfunction.NewContext("123", dic, "") + + secretDataProvider = ctx + lc = ctx.LoggingClient() os.Exit(m.Run()) } @@ -55,7 +58,7 @@ func TestNewMqttFactory(t *testing.T) { expectedMode := "none" expectedPath := "myPath" expectedSkipVerify := true - target := NewMqttFactory(context, expectedMode, expectedPath, expectedSkipVerify) + target := NewMqttFactory(secretDataProvider, lc, expectedMode, expectedPath, expectedSkipVerify) assert.NotNil(t, target.logger) assert.Equal(t, expectedMode, target.authMode) @@ -66,7 +69,7 @@ func TestNewMqttFactory(t *testing.T) { } func TestConfigureMQTTClientForAuth(t *testing.T) { - target := NewMqttFactory(context, "", "", false) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() tests := []struct { Name string @@ -98,7 +101,7 @@ func TestConfigureMQTTClientForAuth(t *testing.T) { } } func TestConfigureMQTTClientForAuthWithUsernamePassword(t *testing.T) { - target := NewMqttFactory(context, "", "", false) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() target.authMode = messaging.AuthModeUsernamePassword err := target.configureMQTTClientForAuth(&messaging.SecretData{ @@ -113,7 +116,7 @@ func TestConfigureMQTTClientForAuthWithUsernamePassword(t *testing.T) { } func TestConfigureMQTTClientForAuthWithUsernamePasswordAndCA(t *testing.T) { - target := NewMqttFactory(context, "", "", false) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() target.authMode = messaging.AuthModeUsernamePassword err := target.configureMQTTClientForAuth(&messaging.SecretData{ @@ -129,7 +132,7 @@ func TestConfigureMQTTClientForAuthWithUsernamePasswordAndCA(t *testing.T) { } func TestConfigureMQTTClientForAuthWithCACert(t *testing.T) { - target := NewMqttFactory(context, "", "", false) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() target.authMode = messaging.AuthModeCA err := target.configureMQTTClientForAuth(&messaging.SecretData{ @@ -145,7 +148,7 @@ func TestConfigureMQTTClientForAuthWithCACert(t *testing.T) { assert.Nil(t, target.opts.TLSConfig.Certificates) } func TestConfigureMQTTClientForAuthWithClientCert(t *testing.T) { - target := NewMqttFactory(context, "", "", false) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() target.authMode = messaging.AuthModeCert err := target.configureMQTTClientForAuth(&messaging.SecretData{ @@ -163,7 +166,7 @@ func TestConfigureMQTTClientForAuthWithClientCert(t *testing.T) { } func TestConfigureMQTTClientForAuthWithClientCertNoCA(t *testing.T) { - target := NewMqttFactory(context, "", "", false) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() target.authMode = messaging.AuthModeCert err := target.configureMQTTClientForAuth(&messaging.SecretData{ @@ -180,7 +183,7 @@ func TestConfigureMQTTClientForAuthWithClientCertNoCA(t *testing.T) { assert.Nil(t, target.opts.TLSConfig.ClientCAs) } func TestConfigureMQTTClientForAuthWithNone(t *testing.T) { - target := NewMqttFactory(context, "", "", false) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() target.authMode = messaging.AuthModeNone err := target.configureMQTTClientForAuth(&messaging.SecretData{}) diff --git a/pkg/transforms/mqttsecret.go b/pkg/transforms/mqttsecret.go index ef01394b1..56814ceb6 100644 --- a/pkg/transforms/mqttsecret.go +++ b/pkg/transforms/mqttsecret.go @@ -107,7 +107,7 @@ func (sender *MQTTSecretSender) initializeMQTTClient(ctx interfaces.AppFunctionC } config := sender.mqttConfig - mqttFactory := secure.NewMqttFactory(ctx, config.AuthMode, config.SecretPath, config.SkipCertVerify) + mqttFactory := secure.NewMqttFactory(ctx, ctx.LoggingClient(), config.AuthMode, config.SecretPath, config.SkipCertVerify) if len(sender.mqttConfig.KeepAlive) > 0 { keepAlive, err := time.ParseDuration(sender.mqttConfig.KeepAlive)