From d953ae32bf6ff894520b813daf09cde263dc719e Mon Sep 17 00:00:00 2001 From: Alex Ullrich Date: Fri, 4 Sep 2020 08:53:09 -0400 Subject: [PATCH] feat(sdk): Add background publisher to MessageBus Creates a channel of provided capacity and attaches to the sdk instance. This channel is then passed to the trigger so that MessageEnvelopes can be pulled off of it and dropped on the queue. Publisher handles formatting the passed message so there is no need to couple using types from go-mod-messaging. Closes: #462 Signed-off-by: Alex Ullrich --- appsdk/backgroundpublisher.go | 46 ++++++++++ appsdk/backgroundpublisher_test.go | 48 +++++++++++ appsdk/sdk.go | 12 ++- appsdk/sdk_test.go | 24 +++++- internal/trigger/http/rest.go | 7 +- internal/trigger/http/rest_test.go | 34 ++++++++ internal/trigger/messagebus/messaging.go | 13 ++- internal/trigger/messagebus/messaging_test.go | 86 ++++++++++++++++++- internal/trigger/trigger.go | 3 +- 9 files changed, 261 insertions(+), 12 deletions(-) create mode 100644 appsdk/backgroundpublisher.go create mode 100644 appsdk/backgroundpublisher_test.go create mode 100644 internal/trigger/http/rest_test.go diff --git a/appsdk/backgroundpublisher.go b/appsdk/backgroundpublisher.go new file mode 100644 index 000000000..bb26fc9a1 --- /dev/null +++ b/appsdk/backgroundpublisher.go @@ -0,0 +1,46 @@ +// +// Copyright (c) 2020 Technotects +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package appsdk + +import "github.com/edgexfoundry/go-mod-messaging/pkg/types" + +// BackgroundPublisher provides an interface to send messages from background processes +// through the service's configured MessageBus output +type BackgroundPublisher interface { + // Publish provided message through the configured MessageBus output + Publish(payload []byte, correlationID string, contentType string) +} + +type backgroundPublisher struct { + output chan<- types.MessageEnvelope +} + +// Publish provided message through the configured MessageBus output +func (pub *backgroundPublisher) Publish(payload []byte, correlationID string, contentType string) { + outputEnvelope := types.MessageEnvelope{ + CorrelationID: correlationID, + Payload: payload, + ContentType: contentType, + } + + pub.output <- outputEnvelope +} + +func newBackgroundPublisher(capacity int) (<-chan types.MessageEnvelope, BackgroundPublisher) { + backgroundChannel := make(chan types.MessageEnvelope, capacity) + return backgroundChannel, &backgroundPublisher{output: backgroundChannel} +} diff --git a/appsdk/backgroundpublisher_test.go b/appsdk/backgroundpublisher_test.go new file mode 100644 index 000000000..9cc55ce71 --- /dev/null +++ b/appsdk/backgroundpublisher_test.go @@ -0,0 +1,48 @@ +// +// Copyright (c) 2020 Technotects +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package appsdk + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestNewBackgroundPublisherAndPublish(t *testing.T) { + background, pub := newBackgroundPublisher(1) + + payload := []byte("something") + correlationId := "id" + contentType := "type" + + pub.Publish(payload, correlationId, contentType) + + waiting := true + + for waiting { + select { + case msgs := <-background: + assert.Equal(t, correlationId, msgs.CorrelationID) + assert.Equal(t, contentType, msgs.ContentType) + assert.Equal(t, payload, msgs.Payload) + waiting = false + case <-time.After(1 * time.Second): + assert.Fail(t, "message timed out, background channel likely not configured correctly") + waiting = false + } + } +} diff --git a/appsdk/sdk.go b/appsdk/sdk.go index 7a6a34848..3c566be81 100644 --- a/appsdk/sdk.go +++ b/appsdk/sdk.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "github.com/edgexfoundry/go-mod-messaging/pkg/types" nethttp "net/http" "net/url" "os" @@ -111,6 +112,7 @@ type AppFunctionsSDK struct { appCancelCtx context.CancelFunc deferredFunctions []bootstrap.Deferred serviceKeyOverride string + backgroundChannel <-chan types.MessageEnvelope } // AddRoute allows you to leverage the existing webserver to add routes. @@ -125,6 +127,14 @@ func (sdk *AppFunctionsSDK) AddRoute(route string, handler func(nethttp.Response return sdk.webserver.AddRoute(route, sdk.addContext(handler), methods...) } +// AddBackgroundPublisher will create a channel of provided capacity to be +// consumed by the MessageBus output and return a publisher that writes to it +func (sdk *AppFunctionsSDK) AddBackgroundPublisher(capacity int) BackgroundPublisher { + bgchan, pub := newBackgroundPublisher(capacity) + sdk.backgroundChannel = bgchan + return pub +} + // MakeItRun will initialize and start the trigger as specifed in the // configuration. It will also configure the webserver and start listening on // the specified port. @@ -143,7 +153,7 @@ func (sdk *AppFunctionsSDK) MakeItRun() error { t := sdk.setupTrigger(sdk.config, sdk.runtime) // Initialize the trigger (i.e. start a web server, or connect to message bus) - deferred, err := t.Initialize(sdk.appWg, sdk.appCtx) + deferred, err := t.Initialize(sdk.appWg, sdk.appCtx, sdk.backgroundChannel) if err != nil { sdk.LoggingClient.Error(err.Error()) } diff --git a/appsdk/sdk_test.go b/appsdk/sdk_test.go index 7b4ac87aa..4ae4db4a6 100644 --- a/appsdk/sdk_test.go +++ b/appsdk/sdk_test.go @@ -17,15 +17,15 @@ package appsdk import ( + "fmt" + "github.com/gorilla/mux" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "net/http" "os" "reflect" "testing" - "github.com/gorilla/mux" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/edgexfoundry/go-mod-core-contracts/clients/logger" "github.com/edgexfoundry/go-mod-core-contracts/models" @@ -68,6 +68,22 @@ func TestAddRoute(t *testing.T) { } +func TestAddBackgroundPublisher(t *testing.T) { + sdk := AppFunctionsSDK{} + pub, ok := sdk.AddBackgroundPublisher(1).(*backgroundPublisher) + + if !ok { + assert.Fail(t, fmt.Sprintf("Unexpected BackgroundPublisher implementation encountered: %T", pub)) + } + + require.NotNil(t, pub.output, "publisher should have an output channel set") + require.NotNil(t, sdk.backgroundChannel, "sdk should have a background channel set for passing to trigger intitialization") + + // compare addresses since types will not match + assert.Equal(t, fmt.Sprintf("%p", sdk.backgroundChannel), fmt.Sprintf("%p", pub.output), + "same channel should be referenced by the BackgroundPublisher and the SDK.") +} + func TestSetupHTTPTrigger(t *testing.T) { sdk := AppFunctionsSDK{ LoggingClient: lc, diff --git a/internal/trigger/http/rest.go b/internal/trigger/http/rest.go index 5766ac203..b39cbb387 100644 --- a/internal/trigger/http/rest.go +++ b/internal/trigger/http/rest.go @@ -18,6 +18,7 @@ package http import ( "context" + "errors" "fmt" "io/ioutil" "net/http" @@ -43,9 +44,13 @@ type Trigger struct { } // Initialize initializes the Trigger for logging and REST route -func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context) (bootstrap.Deferred, error) { +func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context, background <-chan types.MessageEnvelope) (bootstrap.Deferred, error) { logger := trigger.EdgeXClients.LoggingClient + if background != nil { + return nil, errors.New("background publishing not supported for services using HTTP trigger") + } + logger.Info("Initializing HTTP Trigger") trigger.Webserver.SetupTriggerRoute(internal.ApiTriggerRoute, trigger.requestHandler) // Note: Trigger endpoint doesn't change for V2 API, so just using same handler. diff --git a/internal/trigger/http/rest_test.go b/internal/trigger/http/rest_test.go new file mode 100644 index 000000000..fc1d946e7 --- /dev/null +++ b/internal/trigger/http/rest_test.go @@ -0,0 +1,34 @@ +// +// Copyright (c) 2020 Technotects +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package http + +import ( + "github.com/edgexfoundry/go-mod-messaging/pkg/types" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestTriggerInitializeWitBackgroundChannel(t *testing.T) { + background := make(chan types.MessageEnvelope) + trigger := Trigger{} + + deferred, err := trigger.Initialize(nil, nil, background) + + assert.Nil(t, deferred) + assert.NotNil(t, err) + assert.Equal(t, "background publishing not supported for services using HTTP trigger", err.Error()) +} diff --git a/internal/trigger/messagebus/messaging.go b/internal/trigger/messagebus/messaging.go index 841119407..d40f36c07 100644 --- a/internal/trigger/messagebus/messaging.go +++ b/internal/trigger/messagebus/messaging.go @@ -40,7 +40,7 @@ type Trigger struct { } // Initialize ... -func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context) (bootstrap.Deferred, error) { +func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context, background <-chan types.MessageEnvelope) (bootstrap.Deferred, error) { var err error logger := trigger.EdgeXClients.LoggingClient @@ -117,11 +117,22 @@ func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context err := trigger.client.Publish(outputEnvelope, trigger.Configuration.Binding.PublishTopic) if err != nil { logger.Error(fmt.Sprintf("Failed to publish Message to bus, %v", err)) + return } logger.Trace("Published message to bus", "topic", trigger.Configuration.Binding.PublishTopic, clients.CorrelationHeader, msgs.CorrelationID) } }() + case bg := <-background: + go func() { + err := trigger.client.Publish(bg, trigger.Configuration.Binding.PublishTopic) + if err != nil { + logger.Error(fmt.Sprintf("Failed to publish background Message to bus, %v", err)) + return + } + + logger.Trace("Published background message to bus", "topic", trigger.Configuration.Binding.PublishTopic, clients.CorrelationHeader, bg.CorrelationID) + }() } } }() diff --git a/internal/trigger/messagebus/messaging_test.go b/internal/trigger/messagebus/messaging_test.go index 271048a2d..489bf74f1 100644 --- a/internal/trigger/messagebus/messaging_test.go +++ b/internal/trigger/messagebus/messaging_test.go @@ -68,7 +68,7 @@ func TestInitialize(t *testing.T) { runtime := &runtime.GolangRuntime{} trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}} - trigger.Initialize(&sync.WaitGroup{}, context.Background()) + trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) assert.NotNil(t, trigger.client, "Expected client to be set") assert.Equal(t, 1, len(trigger.topics)) assert.Equal(t, "events", trigger.topics[0].Topic) @@ -101,7 +101,7 @@ func TestInitializeBadConfiguration(t *testing.T) { runtime := &runtime.GolangRuntime{} trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}} - _, err := trigger.Initialize(&sync.WaitGroup{}, context.Background()) + _, err := trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) assert.Error(t, err) } @@ -146,7 +146,7 @@ func TestInitializeAndProcessEventWithNoOutput(t *testing.T) { runtime.Initialize(nil, nil) runtime.SetTransforms([]appcontext.AppFunction{transform1}) trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}} - trigger.Initialize(&sync.WaitGroup{}, context.Background()) + trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) message := types.MessageEnvelope{ CorrelationID: expectedCorrelationID, @@ -239,7 +239,7 @@ func TestInitializeAndProcessEventWithOutput(t *testing.T) { testClient.Subscribe(testTopics, testMessageErrors) //subscribe in order to receive transformed output to the bus - trigger.Initialize(&sync.WaitGroup{}, context.Background()) + trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) message := types.MessageEnvelope{ CorrelationID: expectedCorrelationID, @@ -268,3 +268,81 @@ func TestInitializeAndProcessEventWithOutput(t *testing.T) { } } } + +func TestInitializeAndProcessBackgroundMessage(t *testing.T) { + + config := common.ConfigurationStruct{ + Binding: common.BindingInfo{ + Type: "meSsaGebus", + PublishTopic: "PublishTopic", + SubscribeTopic: "SubscribeTopic", + }, + MessageBus: types.MessageBusConfig{ + Type: "zero", + PublishHost: types.HostInfo{ + Host: "*", + Port: 5588, + Protocol: "tcp", + }, + SubscribeHost: types.HostInfo{ + Host: "localhost", + Port: 5590, + Protocol: "tcp", + }, + }, + } + + expectedCorrelationID := "123" + + expectedPayload := []byte(`{"id":"5888dea1bd36573f4681d6f9","created":1485364897029,"modified":1485364897029,"origin":1471806386919,"pushed":0,"device":"livingroomthermostat","readings":[{"id":"5888dea0bd36573f4681d6f8","created":1485364896983,"modified":1485364896983,"origin":1471806386919,"pushed":0,"name":"temperature","value":"38","device":"livingroomthermostat"}]}`) + + runtime := &runtime.GolangRuntime{} + runtime.Initialize(nil, nil) + trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}} + + testClientConfig := types.MessageBusConfig{ + SubscribeHost: types.HostInfo{ + Host: "localhost", + Port: 5588, + Protocol: "tcp", + }, + PublishHost: types.HostInfo{ + Host: "*", + Port: 5590, + Protocol: "tcp", + }, + Type: "zero", + } + testClient, err := messaging.NewMessageClient(testClientConfig) //new client to publish & subscribe + require.NoError(t, err, "Failed to create test client") + + testTopics := []types.TopicChannel{{Topic: trigger.Configuration.Binding.PublishTopic, Messages: make(chan types.MessageEnvelope)}} + testMessageErrors := make(chan error) + + testClient.Subscribe(testTopics, testMessageErrors) //subscribe in order to receive transformed output to the bus + + background := make(chan types.MessageEnvelope) + + trigger.Initialize(&sync.WaitGroup{}, context.Background(), background) + + message := types.MessageEnvelope{ + CorrelationID: expectedCorrelationID, + Payload: expectedPayload, + ContentType: clients.ContentTypeJSON, + } + + background <- message + + receiveMessage := true + + for receiveMessage { + select { + case msgErr := <-testMessageErrors: + receiveMessage = false + assert.Error(t, msgErr) + case msgs := <-testTopics[0].Messages: + receiveMessage = false + assert.Equal(t, expectedPayload, msgs.Payload) + } + } +} diff --git a/internal/trigger/trigger.go b/internal/trigger/trigger.go index aa022d068..2609741e3 100644 --- a/internal/trigger/trigger.go +++ b/internal/trigger/trigger.go @@ -18,6 +18,7 @@ package trigger import ( "context" + "github.com/edgexfoundry/go-mod-messaging/pkg/types" "sync" "github.com/edgexfoundry/go-mod-bootstrap/bootstrap" @@ -26,5 +27,5 @@ import ( // Trigger interface is used to hold event data and allow function to type Trigger interface { // Initialize performs post creation initializations - Initialize(wg *sync.WaitGroup, ctx context.Context) (bootstrap.Deferred, error) + Initialize(wg *sync.WaitGroup, ctx context.Context, background <-chan types.MessageEnvelope) (bootstrap.Deferred, error) }