From e34d6539cf7a57151e8d3c3da56fb49d5bfdf27c Mon Sep 17 00:00:00 2001 From: Felix Ting Date: Fri, 11 Mar 2022 19:15:52 +0800 Subject: [PATCH 1/3] revert: Improve service initialization process (#1047) This reverts commit 6bcd8b5e6ecde811a9d4a8440825dacb4301a14c. Signed-off-by: Felix Ting --- internal/app/service.go | 28 ++----- internal/app/service_test.go | 2 +- internal/constant.go | 4 - internal/controller/rest/controller.go | 28 +++---- internal/controller/rest/controller_test.go | 13 ++-- internal/trigger/mqtt/mqtt.go | 15 +--- internal/webserver/server.go | 4 +- internal/webserver/server_test.go | 4 +- pkg/secure/mqttfactory.go | 85 ++++++--------------- pkg/secure/mqttfactory_test.go | 75 ++---------------- pkg/transforms/mqttsecret.go | 3 +- 11 files changed, 63 insertions(+), 198 deletions(-) diff --git a/internal/app/service.go b/internal/app/service.go index 9471cc7d6..588ad2450 100644 --- a/internal/app/service.go +++ b/internal/app/service.go @@ -52,7 +52,6 @@ import ( bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/flags" bootstrapInterfaces "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces" - "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/secret" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/startup" "github.com/edgexfoundry/go-mod-bootstrap/v2/di" @@ -174,11 +173,6 @@ func (svc *Service) MakeItRun() error { svc.ctx.stop = stop - httpErrors := make(chan error) - defer close(httpErrors) - - svc.webserver.StartWebServer(httpErrors) - // determine input type and create trigger for it t := svc.setupTrigger(svc.config) if t == nil { @@ -195,15 +189,6 @@ func (svc *Service) MakeItRun() error { // deferred is a function that needs to be called when services exits. svc.addDeferred(deferred) - if secret.IsSecurityEnabled() { - // add a deferred function to close the SecretAddedSignal channel created during service initialization. - svc.addDeferred(func() { - if secretAddedSignal, ok := svc.ctx.appCtx.Value(internal.ContextKeySecretAddedSignal).(chan struct{}); ok { - close(secretAddedSignal) - } - }) - } - if svc.config.Writable.StoreAndForward.Enabled { svc.startStoreForward() } else { @@ -215,6 +200,11 @@ func (svc *Service) MakeItRun() error { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + httpErrors := make(chan error) + defer close(httpErrors) + + svc.webserver.StartWebServer(httpErrors) + select { case httpError := <-httpErrors: svc.lc.Info("Http error received: ", httpError.Error()) @@ -505,12 +495,6 @@ func (svc *Service) Initialize() error { svc.ctx.appCtx, svc.ctx.appCancelCtx = context.WithCancel(context.Background()) svc.ctx.appWg = &sync.WaitGroup{} - var secretAddedSignal chan struct{} - if secret.IsSecurityEnabled() { - secretAddedSignal = make(chan struct{}, 1) - svc.ctx.appCtx = context.WithValue(svc.ctx.appCtx, internal.ContextKeySecretAddedSignal, secretAddedSignal) - } - var deferred bootstrap.Deferred var successful bool var configUpdated config.UpdatedStream = make(chan struct{}) @@ -557,7 +541,7 @@ func (svc *Service) Initialize() error { // to wait to be signaled when the configuration has been updated and then process the changes NewConfigUpdateProcessor(svc).WaitForConfigUpdates(configUpdated) - svc.webserver = webserver.NewWebServer(svc.dic, mux.NewRouter(), svc.serviceKey, secretAddedSignal) + svc.webserver = webserver.NewWebServer(svc.dic, mux.NewRouter(), svc.serviceKey) svc.webserver.ConfigureStandardRoutes() svc.lc.Info("Service started in: " + startupTimer.SinceAsString()) diff --git a/internal/app/service_test.go b/internal/app/service_test.go index 7709882cc..78ca66dcd 100644 --- a/internal/app/service_test.go +++ b/internal/app/service_test.go @@ -78,7 +78,7 @@ func IsInstanceOf(objectPtr, typePtr interface{}) bool { func TestAddRoute(t *testing.T) { router := mux.NewRouter() - ws := webserver.NewWebServer(dic, router, uuid.NewString(), nil) + ws := webserver.NewWebServer(dic, router, uuid.NewString()) sdk := Service{ webserver: ws, diff --git a/internal/constant.go b/internal/constant.go index 08745a74f..f173480d1 100644 --- a/internal/constant.go +++ b/internal/constant.go @@ -20,15 +20,11 @@ import ( "github.com/edgexfoundry/go-mod-core-contracts/v2/common" ) -type contextKey int - const ( ConfigRegistryStem = "edgex/appservices/" ApiTriggerRoute = common.ApiBase + "/trigger" ApiAddSecretRoute = common.ApiBase + "/secret" - - ContextKeySecretAddedSignal contextKey = iota ) // SDKVersion indicates the version of the SDK - will be overwritten by build diff --git a/internal/controller/rest/controller.go b/internal/controller/rest/controller.go index 333878661..952cee375 100644 --- a/internal/controller/rest/controller.go +++ b/internal/controller/rest/controller.go @@ -40,23 +40,21 @@ import ( // Controller controller for V2 REST APIs type Controller struct { - router *mux.Router - secretProvider interfaces.SecretProvider - lc logger.LoggingClient - config *sdkCommon.ConfigurationStruct - serviceName string - secretAddedSignal chan struct{} + router *mux.Router + secretProvider interfaces.SecretProvider + lc logger.LoggingClient + config *sdkCommon.ConfigurationStruct + serviceName string } // NewController creates and initializes an Controller -func NewController(router *mux.Router, dic *di.Container, serviceName string, secretAddedSignal chan struct{}) *Controller { +func NewController(router *mux.Router, dic *di.Container, serviceName string) *Controller { return &Controller{ - router: router, - secretProvider: bootstrapContainer.SecretProviderFrom(dic.Get), - lc: bootstrapContainer.LoggingClientFrom(dic.Get), - config: container.ConfigurationFrom(dic.Get), - serviceName: serviceName, - secretAddedSignal: secretAddedSignal, + router: router, + secretProvider: bootstrapContainer.SecretProviderFrom(dic.Get), + lc: bootstrapContainer.LoggingClientFrom(dic.Get), + config: container.ConfigurationFrom(dic.Get), + serviceName: serviceName, } } @@ -122,10 +120,6 @@ func (c *Controller) AddSecret(writer http.ResponseWriter, request *http.Request response := commonDtos.NewBaseResponse(secretRequest.RequestId, "", http.StatusCreated) c.sendResponse(writer, request, internal.ApiAddSecretRoute, response, http.StatusCreated) - - if c.secretAddedSignal != nil { - c.secretAddedSignal <- struct{}{} - } } func (c *Controller) sendError( diff --git a/internal/controller/rest/controller_test.go b/internal/controller/rest/controller_test.go index c6c470dee..6d4f18e68 100644 --- a/internal/controller/rest/controller_test.go +++ b/internal/controller/rest/controller_test.go @@ -59,7 +59,7 @@ func TestMain(m *testing.M) { func TestPingRequest(t *testing.T) { serviceName := uuid.NewString() - target := NewController(nil, dic, serviceName, nil) + target := NewController(nil, dic, serviceName) recorder := doRequest(t, http.MethodGet, common.ApiPingRoute, target.Ping, nil) @@ -83,7 +83,7 @@ func TestVersionRequest(t *testing.T) { internal.ApplicationVersion = expectedAppVersion internal.SDKVersion = expectedSdkVersion - target := NewController(nil, dic, serviceName, nil) + target := NewController(nil, dic, serviceName) recorder := doRequest(t, http.MethodGet, common.ApiVersion, target.Version, nil) @@ -100,7 +100,7 @@ func TestVersionRequest(t *testing.T) { func TestMetricsRequest(t *testing.T) { serviceName := uuid.NewString() - target := NewController(nil, dic, serviceName, nil) + target := NewController(nil, dic, serviceName) recorder := doRequest(t, http.MethodGet, common.ApiMetricsRoute, target.Metrics, nil) @@ -140,7 +140,7 @@ func TestConfigRequest(t *testing.T) { }, }) - target := NewController(nil, dic, serviceName, nil) + target := NewController(nil, dic, serviceName) recorder := doRequest(t, http.MethodGet, common.ApiConfigRoute, target.Config, nil) @@ -176,10 +176,7 @@ func TestAddSecretRequest(t *testing.T) { mockProvider.On("StoreSecrets", "/mqtt", map[string]string{"password": "password", "username": "username"}).Return(nil) mockProvider.On("StoreSecrets", "/no", map[string]string{"password": "password", "username": "username"}).Return(errors.New("Invalid w/o Vault")) - ch := make(chan struct{}, 1) - defer close(ch) - - target := NewController(nil, dic, uuid.NewString(), ch) + target := NewController(nil, dic, uuid.NewString()) assert.NotNil(t, target) validRequest := commonDtos.SecretRequest{ diff --git a/internal/trigger/mqtt/mqtt.go b/internal/trigger/mqtt/mqtt.go index d0c87cb29..26eefdc59 100644 --- a/internal/trigger/mqtt/mqtt.go +++ b/internal/trigger/mqtt/mqtt.go @@ -21,19 +21,17 @@ import ( "context" "errors" "fmt" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger" "net/url" "strings" "sync" "time" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/secure" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/util" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap" - "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/secret" commonContracts "github.com/edgexfoundry/go-mod-core-contracts/v2/common" "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" @@ -61,7 +59,7 @@ func NewTrigger(bnd trigger.ServiceBinding, mp trigger.MessageProcessor) *Trigge } // Initialize initializes the Trigger for an external MQTT broker -func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, background <-chan interfaces.BackgroundMessage) (bootstrap.Deferred, error) { +func (trigger *Trigger) Initialize(_ *sync.WaitGroup, _ context.Context, background <-chan interfaces.BackgroundMessage) (bootstrap.Deferred, error) { // Convenience short cuts lc := trigger.serviceBinding.LoggingClient() config := trigger.serviceBinding.Config() @@ -102,21 +100,12 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, backg opts.KeepAlive = brokerConfig.KeepAlive opts.Servers = []*url.URL{brokerUrl} - var secretAddedSignal chan struct{} - var ok bool - if secret.IsSecurityEnabled() { - if secretAddedSignal, ok = ctx.Value(internal.ContextKeySecretAddedSignal).(chan struct{}); !ok { - return nil, errors.New("the SecretAddedSignal channel cannot be found in the context") - } - } - mqttFactory := secure.NewMqttFactory( trigger.serviceBinding.SecretProvider(), trigger.serviceBinding.LoggingClient(), brokerConfig.AuthMode, brokerConfig.SecretPath, brokerConfig.SkipCertVerify, - secretAddedSignal, ) mqttClient, err := mqttFactory.Create(opts) diff --git a/internal/webserver/server.go b/internal/webserver/server.go index 00705e64d..e714ce474 100644 --- a/internal/webserver/server.go +++ b/internal/webserver/server.go @@ -52,12 +52,12 @@ type Version struct { } // NewWebServer returns a new instance of *WebServer -func NewWebServer(dic *di.Container, router *mux.Router, serviceName string, secretAddedSignal chan struct{}) *WebServer { +func NewWebServer(dic *di.Container, router *mux.Router, serviceName string) *WebServer { ws := &WebServer{ lc: bootstrapContainer.LoggingClientFrom(dic.Get), config: container.ConfigurationFrom(dic.Get), router: router, - controller: rest.NewController(router, dic, serviceName, secretAddedSignal), + controller: rest.NewController(router, dic, serviceName), } return ws diff --git a/internal/webserver/server_test.go b/internal/webserver/server_test.go index 7fa227147..b2c2dcf4b 100644 --- a/internal/webserver/server_test.go +++ b/internal/webserver/server_test.go @@ -58,7 +58,7 @@ func TestAddRoute(t *testing.T) { routePath := "/testRoute" testHandler := func(_ http.ResponseWriter, _ *http.Request) {} - webserver := NewWebServer(dic, mux.NewRouter(), uuid.NewString(), nil) + webserver := NewWebServer(dic, mux.NewRouter(), uuid.NewString()) err := webserver.AddRoute(routePath, testHandler) assert.NoError(t, err, "Not expecting an error") @@ -69,7 +69,7 @@ func TestAddRoute(t *testing.T) { } func TestSetupTriggerRoute(t *testing.T) { - webserver := NewWebServer(dic, mux.NewRouter(), uuid.NewString(), nil) + webserver := NewWebServer(dic, mux.NewRouter(), uuid.NewString()) handlerFunctionNotCalled := true handler := func(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/secure/mqttfactory.go b/pkg/secure/mqttfactory.go index 38d719e9b..8e21b0ab2 100644 --- a/pkg/secure/mqttfactory.go +++ b/pkg/secure/mqttfactory.go @@ -20,34 +20,27 @@ import ( "crypto/tls" "crypto/x509" "errors" - "fmt" - + "github.com/eclipse/paho.mqtt.golang" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/messaging" - "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/secret" "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" - - "github.com/eclipse/paho.mqtt.golang" ) type MqttFactory struct { - sp messaging.SecretDataProvider - logger logger.LoggingClient - authMode string - secretPath string - opts *mqtt.ClientOptions - skipCertVerify bool - secretAddedSignal chan struct{} + sp messaging.SecretDataProvider + logger logger.LoggingClient + authMode string + secretPath string + opts *mqtt.ClientOptions + skipCertVerify bool } -func NewMqttFactory(sp messaging.SecretDataProvider, log logger.LoggingClient, mode string, path string, skipVerify bool, - secretAddedSignal chan struct{}) MqttFactory { +func NewMqttFactory(sp messaging.SecretDataProvider, log logger.LoggingClient, mode string, path string, skipVerify bool) MqttFactory { return MqttFactory{ - sp: sp, - logger: log, - authMode: mode, - secretPath: path, - skipCertVerify: skipVerify, - secretAddedSignal: secretAddedSignal, + sp: sp, + logger: log, + authMode: mode, + secretPath: path, + skipCertVerify: skipVerify, } } @@ -59,34 +52,24 @@ func (factory MqttFactory) Create(opts *mqtt.ClientOptions) (mqtt.Client, error) factory.opts = opts - secretData, err := factory.getValidSecretData() - switch secret.IsSecurityEnabled() { - case true: - if err == nil { - break - } - factory.logger.Error(err.Error()) - for { - factory.logger.Info("Waiting for the secret creation API call to seed the proper credentials...") - <-factory.secretAddedSignal - secretData, err = factory.getValidSecretData() - if err != nil { - factory.logger.Error(err.Error()) - } else { - break - } + //get the secrets from the secret provider and populate the struct + secretData, err := messaging.GetSecretData(factory.authMode, factory.secretPath, factory.sp) + if err != nil { + return nil, err + } + //ensure that the authmode selected has the required secret values + if secretData != nil { + err = messaging.ValidateSecretData(factory.authMode, factory.secretPath, secretData) + if err != nil { + return nil, err } - case false: + // configure the mqtt client with the retrieved secret values + err = factory.configureMQTTClientForAuth(secretData) if err != nil { return nil, err } } - err = factory.configureMQTTClientForAuth(secretData) - if err != nil { - return nil, err - } - return mqtt.NewClient(factory.opts), nil } @@ -127,21 +110,3 @@ func (factory MqttFactory) configureMQTTClientForAuth(secretData *messaging.Secr return nil } - -func (factory MqttFactory) getValidSecretData() (*messaging.SecretData, error) { - //get the secrets from the secret provider and populate the struct - secretData, err := messaging.GetSecretData(factory.authMode, factory.secretPath, factory.sp) - if err != nil { - return nil, fmt.Errorf("failed to get secret data from the secret provider, error: %s", err) - } - if secretData == nil { - return nil, nil - } - //ensure that the authmode selected has the required secret values - err = messaging.ValidateSecretData(factory.authMode, factory.secretPath, secretData) - if err != nil { - return nil, fmt.Errorf("invalid secret data, error: %s", err) - } else { - return secretData, nil - } -} diff --git a/pkg/secure/mqttfactory_test.go b/pkg/secure/mqttfactory_test.go index c9bd26df5..d113c24a3 100644 --- a/pkg/secure/mqttfactory_test.go +++ b/pkg/secure/mqttfactory_test.go @@ -20,13 +20,11 @@ package secure import ( - "errors" "os" "testing" "github.com/eclipse/paho.mqtt.golang" bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" - "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces/mocks" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/messaging" "github.com/edgexfoundry/go-mod-bootstrap/v2/di" "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" @@ -60,20 +58,18 @@ func TestNewMqttFactory(t *testing.T) { expectedMode := "none" expectedPath := "myPath" expectedSkipVerify := true - expectedChannel := make(chan struct{}) - target := NewMqttFactory(secretDataProvider, lc, expectedMode, expectedPath, expectedSkipVerify, expectedChannel) + target := NewMqttFactory(secretDataProvider, lc, expectedMode, expectedPath, expectedSkipVerify) assert.NotNil(t, target.logger) assert.Equal(t, expectedMode, target.authMode) assert.Equal(t, expectedPath, target.secretPath) assert.Equal(t, expectedSkipVerify, target.skipCertVerify) - assert.Equal(t, expectedChannel, target.secretAddedSignal) assert.Nil(t, target.opts) } func TestConfigureMQTTClientForAuth(t *testing.T) { - target := NewMqttFactory(secretDataProvider, lc, "", "", false, nil) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() tests := []struct { Name string @@ -105,7 +101,7 @@ func TestConfigureMQTTClientForAuth(t *testing.T) { } } func TestConfigureMQTTClientForAuthWithUsernamePassword(t *testing.T) { - target := NewMqttFactory(secretDataProvider, lc, "", "", false, nil) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() target.authMode = messaging.AuthModeUsernamePassword err := target.configureMQTTClientForAuth(&messaging.SecretData{ @@ -120,7 +116,7 @@ func TestConfigureMQTTClientForAuthWithUsernamePassword(t *testing.T) { } func TestConfigureMQTTClientForAuthWithUsernamePasswordAndCA(t *testing.T) { - target := NewMqttFactory(secretDataProvider, lc, "", "", false, nil) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() target.authMode = messaging.AuthModeUsernamePassword err := target.configureMQTTClientForAuth(&messaging.SecretData{ @@ -136,7 +132,7 @@ func TestConfigureMQTTClientForAuthWithUsernamePasswordAndCA(t *testing.T) { } func TestConfigureMQTTClientForAuthWithCACert(t *testing.T) { - target := NewMqttFactory(secretDataProvider, lc, "", "", false, nil) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() target.authMode = messaging.AuthModeCA err := target.configureMQTTClientForAuth(&messaging.SecretData{ @@ -152,7 +148,7 @@ func TestConfigureMQTTClientForAuthWithCACert(t *testing.T) { assert.Nil(t, target.opts.TLSConfig.Certificates) } func TestConfigureMQTTClientForAuthWithClientCert(t *testing.T) { - target := NewMqttFactory(secretDataProvider, lc, "", "", false, nil) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() target.authMode = messaging.AuthModeCert err := target.configureMQTTClientForAuth(&messaging.SecretData{ @@ -170,7 +166,7 @@ func TestConfigureMQTTClientForAuthWithClientCert(t *testing.T) { } func TestConfigureMQTTClientForAuthWithClientCertNoCA(t *testing.T) { - target := NewMqttFactory(secretDataProvider, lc, "", "", false, nil) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() target.authMode = messaging.AuthModeCert err := target.configureMQTTClientForAuth(&messaging.SecretData{ @@ -187,65 +183,10 @@ func TestConfigureMQTTClientForAuthWithClientCertNoCA(t *testing.T) { assert.Nil(t, target.opts.TLSConfig.ClientCAs) } func TestConfigureMQTTClientForAuthWithNone(t *testing.T) { - target := NewMqttFactory(secretDataProvider, lc, "", "", false, nil) + target := NewMqttFactory(secretDataProvider, lc, "", "", false) target.opts = mqtt.NewClientOptions() target.authMode = messaging.AuthModeNone err := target.configureMQTTClientForAuth(&messaging.SecretData{}) require.NoError(t, err) } - -func TestGetValidSecretData(t *testing.T) { - username := "edgexuser" - password := "123" - expectedSecretData := map[string]string{ - "username": username, - "password": password, - } - invalidSecretData := map[string]string{ - "key1": "value1", - "key2": "value2", - } - mockSecretProvider := &mocks.SecretProvider{} - mockSecretProvider.On("GetSecret", "").Return(nil) - mockSecretProvider.On("GetSecret", "notfound").Return(nil, errors.New("not Found")) - mockSecretProvider.On("GetSecret", "invalid").Return(invalidSecretData, nil) - mockSecretProvider.On("GetSecret", "mqtt").Return(expectedSecretData, nil) - dic.Update(di.ServiceConstructorMap{ - bootstrapContainer.SecretProviderName: func(get di.Get) interface{} { - return mockSecretProvider - }, - }) - - tests := []struct { - Name string - AuthMode string - SecretPath string - ExpectedSecrets *messaging.SecretData - ExpectingError bool - }{ - {"No auth", messaging.AuthModeNone, "", nil, false}, - {"SecretData not found", messaging.AuthModeUsernamePassword, "notfound", nil, true}, - {"Auth with invalid SecretData", messaging.AuthModeUsernamePassword, "invalid", nil, true}, - {"Auth with valid SecretData", messaging.AuthModeUsernamePassword, "mqtt", &messaging.SecretData{ - Username: username, - Password: password, - KeyPemBlock: []uint8{}, - CertPemBlock: []uint8{}, - CaPemBlock: []uint8{}, - }, false}, - } - - for _, test := range tests { - t.Run(test.Name, func(t *testing.T) { - f := NewMqttFactory(secretDataProvider, lc, test.AuthMode, test.SecretPath, false, nil) - - secretData, err := f.getValidSecretData() - if test.ExpectingError { - assert.Error(t, err, "Expecting error") - return - } - require.Equal(t, test.ExpectedSecrets, secretData) - }) - } -} diff --git a/pkg/transforms/mqttsecret.go b/pkg/transforms/mqttsecret.go index 119614a07..56814ceb6 100644 --- a/pkg/transforms/mqttsecret.go +++ b/pkg/transforms/mqttsecret.go @@ -107,8 +107,7 @@ func (sender *MQTTSecretSender) initializeMQTTClient(ctx interfaces.AppFunctionC } config := sender.mqttConfig - // Do not use secertAddedSignal in MQTTSecretSender. Otherwise, when secrets are not ready and messages keep coming will cause the memory usage to keep rising. - mqttFactory := secure.NewMqttFactory(ctx, ctx.LoggingClient(), config.AuthMode, config.SecretPath, config.SkipCertVerify, nil) + mqttFactory := secure.NewMqttFactory(ctx, ctx.LoggingClient(), config.AuthMode, config.SecretPath, config.SkipCertVerify) if len(sender.mqttConfig.KeepAlive) > 0 { keepAlive, err := time.ParseDuration(sender.mqttConfig.KeepAlive) From 1e2cbf3a5badc5e55b65310aca366435d1ef5e67 Mon Sep 17 00:00:00 2001 From: Felix Ting Date: Mon, 14 Mar 2022 12:20:54 +0800 Subject: [PATCH 2/3] fix: Improve service initialization process - Start webserver before trigger initialization - Add retry mechanism for External MQTT Trigger initialization Signed-off-by: Felix Ting --- internal/app/service.go | 10 ++--- internal/common/config.go | 4 ++ internal/trigger/mqtt/mqtt.go | 74 +++++++++++++++++++++++++++-------- 3 files changed, 66 insertions(+), 22 deletions(-) diff --git a/internal/app/service.go b/internal/app/service.go index 588ad2450..1ce0f3fc7 100644 --- a/internal/app/service.go +++ b/internal/app/service.go @@ -173,6 +173,11 @@ func (svc *Service) MakeItRun() error { svc.ctx.stop = stop + httpErrors := make(chan error) + defer close(httpErrors) + + svc.webserver.StartWebServer(httpErrors) + // determine input type and create trigger for it t := svc.setupTrigger(svc.config) if t == nil { @@ -200,11 +205,6 @@ func (svc *Service) MakeItRun() error { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) - httpErrors := make(chan error) - defer close(httpErrors) - - svc.webserver.StartWebServer(httpErrors) - select { case httpError := <-httpErrors: svc.lc.Info("Http error received: ", httpError.Error()) diff --git a/internal/common/config.go b/internal/common/config.go index 42ade55da..a448aec68 100644 --- a/internal/common/config.go +++ b/internal/common/config.go @@ -146,6 +146,10 @@ type ExternalMqttConfig struct { // AuthMode indicates what to use when connecting to the broker. Options are "none", "cacert" , "usernamepassword", "clientcert". // If a CA Cert exists in the SecretPath then it will be used for all modes except "none". AuthMode string + // RetryDuration indicates how long (in seconds) to wait timing out on the MQTT client creation + RetryDuration int + // RetryInterval indicates the time (in seconds) that will be waited between attempts to create MQTT client + RetryInterval int } // PipelineInfo defines the top level data for configurable pipelines diff --git a/internal/trigger/mqtt/mqtt.go b/internal/trigger/mqtt/mqtt.go index 26eefdc59..a0d1e95a3 100644 --- a/internal/trigger/mqtt/mqtt.go +++ b/internal/trigger/mqtt/mqtt.go @@ -21,17 +21,21 @@ import ( "context" "errors" "fmt" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger" "net/url" "strings" "sync" "time" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/common" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/secure" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/util" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap" + "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/messaging" + "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/startup" + "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" commonContracts "github.com/edgexfoundry/go-mod-core-contracts/v2/common" "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" @@ -39,6 +43,11 @@ import ( "github.com/google/uuid" ) +const ( + defaultRetryDuration = 600 + defaultRetryInterval = 5 +) + // Trigger implements Trigger to support Triggers type Trigger struct { messageProcessor trigger.MessageProcessor @@ -100,26 +109,28 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, _ context.Context, backgro opts.KeepAlive = brokerConfig.KeepAlive opts.Servers = []*url.URL{brokerUrl} - mqttFactory := secure.NewMqttFactory( - trigger.serviceBinding.SecretProvider(), - trigger.serviceBinding.LoggingClient(), - brokerConfig.AuthMode, - brokerConfig.SecretPath, - brokerConfig.SkipCertVerify, - ) - - mqttClient, err := mqttFactory.Create(opts) - if err != nil { - return nil, fmt.Errorf("unable to create secure MQTT Client: %s", err.Error()) + if brokerConfig.RetryDuration <= 0 { + brokerConfig.RetryDuration = defaultRetryDuration + } + if brokerConfig.RetryInterval <= 0 { + brokerConfig.RetryInterval = defaultRetryInterval } - lc.Infof("Connecting to mqtt broker for MQTT trigger at: %s", brokerUrl) - - if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { - return nil, fmt.Errorf("could not connect to broker for MQTT trigger: %s", token.Error().Error()) + sp := trigger.serviceBinding.SecretProvider() + var mqttClient pahoMqtt.Client + timer := startup.NewTimer(brokerConfig.RetryDuration, brokerConfig.RetryInterval) + for timer.HasNotElapsed() { + if mqttClient, err = createMqttClient(sp, lc, brokerConfig, opts); err != nil { + lc.Errorf("%s. Attempt to create MQTT client again after %d seconds...", err.Error(), brokerConfig.RetryInterval) + timer.SleepForInterval() + continue + } + break } - lc.Info("Connected to mqtt server for MQTT trigger") + if err != nil { + return nil, fmt.Errorf("unable to create MQTT Client: %s", err.Error()) + } deferred := func() { lc.Info("Disconnecting from broker for MQTT trigger") @@ -216,3 +227,32 @@ func (trigger *Trigger) responseHandler(appContext interfaces.AppFunctionContext } return nil } + +func createMqttClient(sp messaging.SecretDataProvider, lc logger.LoggingClient, config common.ExternalMqttConfig, + opts *pahoMqtt.ClientOptions) (pahoMqtt.Client, error) { + mqttFactory := secure.NewMqttFactory( + sp, + lc, + config.AuthMode, + config.SecretPath, + config.SkipCertVerify, + ) + mqttClient, err := mqttFactory.Create(opts) + if err != nil { + return nil, fmt.Errorf("unable to create secure MQTT Client: %s", err.Error()) + } + + brokerUrl, err := url.Parse(config.Url) + if err != nil { + return nil, fmt.Errorf("invalid MQTT Broker Url '%s': %s", config.Url, err.Error()) + } + + lc.Infof("Connecting to mqtt broker for MQTT trigger at: %s", brokerUrl) + + if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { + return nil, fmt.Errorf("could not connect to broker for MQTT trigger: %s", token.Error().Error()) + } + + lc.Info("Connected to mqtt server for MQTT trigger") + return mqttClient, nil +} From 6d2e93ad25989b05315e58cba316399efaf54735 Mon Sep 17 00:00:00 2001 From: Felix Ting Date: Tue, 15 Mar 2022 09:20:33 +0800 Subject: [PATCH 3/3] fix: Update code per review comments Signed-off-by: Felix Ting --- internal/trigger/mqtt/mqtt.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/internal/trigger/mqtt/mqtt.go b/internal/trigger/mqtt/mqtt.go index a0d1e95a3..3b1e1a649 100644 --- a/internal/trigger/mqtt/mqtt.go +++ b/internal/trigger/mqtt/mqtt.go @@ -121,7 +121,7 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, _ context.Context, backgro timer := startup.NewTimer(brokerConfig.RetryDuration, brokerConfig.RetryInterval) for timer.HasNotElapsed() { if mqttClient, err = createMqttClient(sp, lc, brokerConfig, opts); err != nil { - lc.Errorf("%s. Attempt to create MQTT client again after %d seconds...", err.Error(), brokerConfig.RetryInterval) + lc.Warnf("%s. Attempt to create MQTT client again after %d seconds...", err.Error(), brokerConfig.RetryInterval) timer.SleepForInterval() continue } @@ -242,12 +242,7 @@ func createMqttClient(sp messaging.SecretDataProvider, lc logger.LoggingClient, return nil, fmt.Errorf("unable to create secure MQTT Client: %s", err.Error()) } - brokerUrl, err := url.Parse(config.Url) - if err != nil { - return nil, fmt.Errorf("invalid MQTT Broker Url '%s': %s", config.Url, err.Error()) - } - - lc.Infof("Connecting to mqtt broker for MQTT trigger at: %s", brokerUrl) + lc.Infof("Connecting to mqtt broker for MQTT trigger at: %s", config.Url) if token := mqttClient.Connect(); token.Wait() && token.Error() != nil { return nil, fmt.Errorf("could not connect to broker for MQTT trigger: %s", token.Error().Error())