Skip to content

Commit

Permalink
feat(sdk): Add background publisher to MessageBus
Browse files Browse the repository at this point in the history
Creates a channel of provided capacity and attaches to the sdk instance.
This channel is then passed to the trigger so that MessageEnvelopes can
be pulled off of it and dropped on the queue.  Publisher handles
formatting the passed message so there is no need to couple using types
from go-mod-messaging.

Closes: #462
  • Loading branch information
Alex Ullrich committed Sep 4, 2020
1 parent 6b45ea7 commit fbe2c0a
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 12 deletions.
26 changes: 26 additions & 0 deletions appsdk/backgroundpublisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package appsdk

import "github.com/edgexfoundry/go-mod-messaging/pkg/types"

type BackgroundPublisher interface {
Publish(payload []byte, correlationID string, contentType string)
}

type backgroundPublisher struct {
output chan<- types.MessageEnvelope
}

func (pub *backgroundPublisher) Publish(payload []byte, correlationID string, contentType string) {
outputEnvelope := types.MessageEnvelope{
CorrelationID: correlationID,
Payload: payload,
ContentType: contentType,
}

pub.output <- outputEnvelope
}

func newBackgroundPublisher(capacity int) (<-chan types.MessageEnvelope, BackgroundPublisher) {
bgch := make(chan types.MessageEnvelope, capacity)
return bgch, &backgroundPublisher{output: bgch}
}
32 changes: 32 additions & 0 deletions appsdk/backgroundpublisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package appsdk

import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestNewBackgroundPublisherAndPublish(t *testing.T) {
background, pub := newBackgroundPublisher(1)

payload := []byte("something")
correlationId := "id"
contentType := "type"

pub.Publish(payload, correlationId, contentType)

waiting := true

for waiting {
select {
case msgs := <-background:
assert.Equal(t, correlationId, msgs.CorrelationID)
assert.Equal(t, contentType, msgs.ContentType)
assert.Equal(t, payload, msgs.Payload)
waiting = false
case <-time.After(1 * time.Second):
assert.Fail(t, "Message timed out, background channel likely not configured correctly")
waiting = false
}
}
}
11 changes: 10 additions & 1 deletion appsdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"github.com/edgexfoundry/go-mod-messaging/pkg/types"
nethttp "net/http"
"net/url"
"os"
Expand Down Expand Up @@ -111,6 +112,7 @@ type AppFunctionsSDK struct {
appCancelCtx context.CancelFunc
deferredFunctions []bootstrap.Deferred
serviceKeyOverride string
backgroundChannel <-chan types.MessageEnvelope
}

// AddRoute allows you to leverage the existing webserver to add routes.
Expand All @@ -125,6 +127,13 @@ func (sdk *AppFunctionsSDK) AddRoute(route string, handler func(nethttp.Response
return sdk.webserver.AddRoute(route, sdk.addContext(handler), methods...)
}

// AddBackgroundPublisher will create a back channel of provided capacity to the trigger output and return a publisher that writes to it
func (sdk *AppFunctionsSDK) AddBackgroundPublisher(capacity int) BackgroundPublisher {
bgchan, pub := newBackgroundPublisher(capacity)
sdk.backgroundChannel = bgchan
return pub
}

// MakeItRun will initialize and start the trigger as specifed in the
// configuration. It will also configure the webserver and start listening on
// the specified port.
Expand All @@ -143,7 +152,7 @@ func (sdk *AppFunctionsSDK) MakeItRun() error {
t := sdk.setupTrigger(sdk.config, sdk.runtime)

// Initialize the trigger (i.e. start a web server, or connect to message bus)
deferred, err := t.Initialize(sdk.appWg, sdk.appCtx)
deferred, err := t.Initialize(sdk.appWg, sdk.appCtx, sdk.backgroundChannel)
if err != nil {
sdk.LoggingClient.Error(err.Error())
}
Expand Down
21 changes: 17 additions & 4 deletions appsdk/sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
package appsdk

import (
"fmt"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net/http"
"os"
"reflect"
"testing"

"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/edgexfoundry/go-mod-core-contracts/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/models"

Expand Down Expand Up @@ -68,6 +68,19 @@ func TestAddRoute(t *testing.T) {

}

func TestAddBackgroundPublisher(t *testing.T) {
sdk := AppFunctionsSDK{}
pub, ok := sdk.AddBackgroundPublisher(1).(*backgroundPublisher)

if !ok {
assert.Fail(t, fmt.Sprintf("Unexpected BackgroundPublisher implementation encountered: %T", pub))
}

//compare addresses since types do not match
assert.Equal(t, fmt.Sprintf("%p", sdk.backgroundChannel), fmt.Sprintf("%p", pub.output),
"Types will not but addresses must match so SDK can pass correct channel to trigger when running.")
}

func TestSetupHTTPTrigger(t *testing.T) {
sdk := AppFunctionsSDK{
LoggingClient: lc,
Expand Down
7 changes: 6 additions & 1 deletion internal/trigger/http/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ type Trigger struct {
}

// Initialize initializes the Trigger for logging and REST route
func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context) (bootstrap.Deferred, error) {
func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context, background <-chan types.MessageEnvelope) (bootstrap.Deferred, error) {
logger := trigger.EdgeXClients.LoggingClient

if background != nil {
// Is error sufficient here or should we block startup?
logger.Error("Background publishing not valid for services using HTTP trigger")
}

logger.Info("Initializing HTTP Trigger")
trigger.Webserver.SetupTriggerRoute(internal.ApiTriggerRoute, trigger.requestHandler)
// Note: Trigger endpoint doesn't change for V2 API, so just using same handler.
Expand Down
11 changes: 10 additions & 1 deletion internal/trigger/messagebus/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Trigger struct {
}

// Initialize ...
func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context) (bootstrap.Deferred, error) {
func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context, background <-chan types.MessageEnvelope) (bootstrap.Deferred, error) {
var err error
logger := trigger.EdgeXClients.LoggingClient

Expand Down Expand Up @@ -122,6 +122,15 @@ func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context
logger.Trace("Published message to bus", "topic", trigger.Configuration.Binding.PublishTopic, clients.CorrelationHeader, msgs.CorrelationID)
}
}()
case bg := <-background:
go func() {
err := trigger.client.Publish(bg, trigger.Configuration.Binding.PublishTopic)
if err != nil {
logger.Error(fmt.Sprintf("Failed to publish background Message to bus, %v", err))
}

logger.Trace("Published background message to bus", "topic", trigger.Configuration.Binding.PublishTopic, clients.CorrelationHeader, bg.CorrelationID)
}()
}
}
}()
Expand Down
86 changes: 82 additions & 4 deletions internal/trigger/messagebus/messaging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestInitialize(t *testing.T) {
runtime := &runtime.GolangRuntime{}

trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}}
trigger.Initialize(&sync.WaitGroup{}, context.Background())
trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil)
assert.NotNil(t, trigger.client, "Expected client to be set")
assert.Equal(t, 1, len(trigger.topics))
assert.Equal(t, "events", trigger.topics[0].Topic)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestInitializeBadConfiguration(t *testing.T) {
runtime := &runtime.GolangRuntime{}

trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}}
_, err := trigger.Initialize(&sync.WaitGroup{}, context.Background())
_, err := trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil)
assert.Error(t, err)
}

Expand Down Expand Up @@ -146,7 +146,7 @@ func TestInitializeAndProcessEventWithNoOutput(t *testing.T) {
runtime.Initialize(nil, nil)
runtime.SetTransforms([]appcontext.AppFunction{transform1})
trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}}
trigger.Initialize(&sync.WaitGroup{}, context.Background())
trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil)

message := types.MessageEnvelope{
CorrelationID: expectedCorrelationID,
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestInitializeAndProcessEventWithOutput(t *testing.T) {

testClient.Subscribe(testTopics, testMessageErrors) //subscribe in order to receive transformed output to the bus

trigger.Initialize(&sync.WaitGroup{}, context.Background())
trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil)

message := types.MessageEnvelope{
CorrelationID: expectedCorrelationID,
Expand Down Expand Up @@ -268,3 +268,81 @@ func TestInitializeAndProcessEventWithOutput(t *testing.T) {
}
}
}

func TestInitializeAndProcessBackgroundMessage(t *testing.T) {

config := common.ConfigurationStruct{
Binding: common.BindingInfo{
Type: "meSsaGebus",
PublishTopic: "PublishTopic",
SubscribeTopic: "SubscribeTopic",
},
MessageBus: types.MessageBusConfig{
Type: "zero",
PublishHost: types.HostInfo{
Host: "*",
Port: 5588,
Protocol: "tcp",
},
SubscribeHost: types.HostInfo{
Host: "localhost",
Port: 5590,
Protocol: "tcp",
},
},
}

expectedCorrelationID := "123"

expectedPayload := []byte(`{"id":"5888dea1bd36573f4681d6f9","created":1485364897029,"modified":1485364897029,"origin":1471806386919,"pushed":0,"device":"livingroomthermostat","readings":[{"id":"5888dea0bd36573f4681d6f8","created":1485364896983,"modified":1485364896983,"origin":1471806386919,"pushed":0,"name":"temperature","value":"38","device":"livingroomthermostat"}]}`)

runtime := &runtime.GolangRuntime{}
runtime.Initialize(nil, nil)
trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}}

testClientConfig := types.MessageBusConfig{
SubscribeHost: types.HostInfo{
Host: "localhost",
Port: 5588,
Protocol: "tcp",
},
PublishHost: types.HostInfo{
Host: "*",
Port: 5590,
Protocol: "tcp",
},
Type: "zero",
}
testClient, err := messaging.NewMessageClient(testClientConfig) //new client to publish & subscribe
require.NoError(t, err, "Failed to create test client")

testTopics := []types.TopicChannel{{Topic: trigger.Configuration.Binding.PublishTopic, Messages: make(chan types.MessageEnvelope)}}
testMessageErrors := make(chan error)

testClient.Subscribe(testTopics, testMessageErrors) //subscribe in order to receive transformed output to the bus

background := make(chan types.MessageEnvelope)

trigger.Initialize(&sync.WaitGroup{}, context.Background(), background)

message := types.MessageEnvelope{
CorrelationID: expectedCorrelationID,
Payload: expectedPayload,
ContentType: clients.ContentTypeJSON,
}

background <- message

receiveMessage := true

for receiveMessage {
select {
case msgErr := <-testMessageErrors:
receiveMessage = false
assert.Error(t, msgErr)
case msgs := <-testTopics[0].Messages:
receiveMessage = false
assert.Equal(t, expectedPayload, msgs.Payload)
}
}
}
3 changes: 2 additions & 1 deletion internal/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package trigger

import (
"context"
"github.com/edgexfoundry/go-mod-messaging/pkg/types"
"sync"

"github.com/edgexfoundry/go-mod-bootstrap/bootstrap"
Expand All @@ -26,5 +27,5 @@ import (
// Trigger interface is used to hold event data and allow function to
type Trigger interface {
// Initialize performs post creation initializations
Initialize(wg *sync.WaitGroup, ctx context.Context) (bootstrap.Deferred, error)
Initialize(wg *sync.WaitGroup, ctx context.Context, background <-chan types.MessageEnvelope) (bootstrap.Deferred, error)
}

0 comments on commit fbe2c0a

Please sign in to comment.