Skip to content

Commit

Permalink
feat(sdk): Add background publisher to MessageBus
Browse files Browse the repository at this point in the history
Creates a channel of provided capacity and attaches to the sdk instance.
This channel is then passed to the trigger so that MessageEnvelopes can
be pulled off of it and dropped on the queue.  Publisher handles
formatting the passed message so there is no need to couple using types
from go-mod-messaging.

Closes: #462
Signed-off-by: Alex Ullrich <[email protected]>
  • Loading branch information
Alex Ullrich committed Sep 4, 2020
1 parent 337bfa7 commit aa9f94a
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 12 deletions.
46 changes: 46 additions & 0 deletions appsdk/backgroundpublisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
// Copyright (c) 2020 Technotects
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package appsdk

import "github.com/edgexfoundry/go-mod-messaging/pkg/types"

// BackgroundPublisher provides an interface to send messages from background processes
// through the service's configured MessageBus output
type BackgroundPublisher interface {
// Publish provided message through the configured MessageBus output
Publish(payload []byte, correlationID string, contentType string)
}

type backgroundPublisher struct {
output chan<- types.MessageEnvelope
}

// Publish provided message through the configured MessageBus output
func (pub *backgroundPublisher) Publish(payload []byte, correlationID string, contentType string) {
outputEnvelope := types.MessageEnvelope{
CorrelationID: correlationID,
Payload: payload,
ContentType: contentType,
}

pub.output <- outputEnvelope
}

func newBackgroundPublisher(capacity int) (<-chan types.MessageEnvelope, BackgroundPublisher) {
backgroundChannel := make(chan types.MessageEnvelope, capacity)
return backgroundChannel, &backgroundPublisher{output: backgroundChannel}
}
48 changes: 48 additions & 0 deletions appsdk/backgroundpublisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//
// Copyright (c) 2020 Technotects
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package appsdk

import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestNewBackgroundPublisherAndPublish(t *testing.T) {
background, pub := newBackgroundPublisher(1)

payload := []byte("something")
correlationId := "id"
contentType := "type"

pub.Publish(payload, correlationId, contentType)

waiting := true

for waiting {
select {
case msgs := <-background:
assert.Equal(t, correlationId, msgs.CorrelationID)
assert.Equal(t, contentType, msgs.ContentType)
assert.Equal(t, payload, msgs.Payload)
waiting = false
case <-time.After(1 * time.Second):
assert.Fail(t, "message timed out, background channel likely not configured correctly")
waiting = false
}
}
}
12 changes: 11 additions & 1 deletion appsdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"github.com/edgexfoundry/go-mod-messaging/pkg/types"
nethttp "net/http"
"net/url"
"os"
Expand Down Expand Up @@ -111,6 +112,7 @@ type AppFunctionsSDK struct {
appCancelCtx context.CancelFunc
deferredFunctions []bootstrap.Deferred
serviceKeyOverride string
backgroundChannel <-chan types.MessageEnvelope
}

// AddRoute allows you to leverage the existing webserver to add routes.
Expand All @@ -125,6 +127,14 @@ func (sdk *AppFunctionsSDK) AddRoute(route string, handler func(nethttp.Response
return sdk.webserver.AddRoute(route, sdk.addContext(handler), methods...)
}

// AddBackgroundPublisher will create a channel of provided capacity to be
// consumed by the MessageBus output and return a publisher that writes to it
func (sdk *AppFunctionsSDK) AddBackgroundPublisher(capacity int) BackgroundPublisher {
bgchan, pub := newBackgroundPublisher(capacity)
sdk.backgroundChannel = bgchan
return pub
}

// MakeItRun will initialize and start the trigger as specifed in the
// configuration. It will also configure the webserver and start listening on
// the specified port.
Expand All @@ -143,7 +153,7 @@ func (sdk *AppFunctionsSDK) MakeItRun() error {
t := sdk.setupTrigger(sdk.config, sdk.runtime)

// Initialize the trigger (i.e. start a web server, or connect to message bus)
deferred, err := t.Initialize(sdk.appWg, sdk.appCtx)
deferred, err := t.Initialize(sdk.appWg, sdk.appCtx, sdk.backgroundChannel)
if err != nil {
sdk.LoggingClient.Error(err.Error())
}
Expand Down
24 changes: 20 additions & 4 deletions appsdk/sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
package appsdk

import (
"fmt"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net/http"
"os"
"reflect"
"testing"

"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/edgexfoundry/go-mod-core-contracts/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/models"

Expand Down Expand Up @@ -68,6 +68,22 @@ func TestAddRoute(t *testing.T) {

}

func TestAddBackgroundPublisher(t *testing.T) {
sdk := AppFunctionsSDK{}
pub, ok := sdk.AddBackgroundPublisher(1).(*backgroundPublisher)

if !ok {
assert.Fail(t, fmt.Sprintf("Unexpected BackgroundPublisher implementation encountered: %T", pub))
}

require.NotNil(t, pub.output, "publisher should have an output channel set")
require.NotNil(t, sdk.backgroundChannel, "sdk should have a background channel set for passing to trigger intitialization")

// compare addresses since types will not match
assert.Equal(t, fmt.Sprintf("%p", sdk.backgroundChannel), fmt.Sprintf("%p", pub.output),
"same channel should be referenced by the BackgroundPublisher and the SDK.")
}

func TestSetupHTTPTrigger(t *testing.T) {
sdk := AppFunctionsSDK{
LoggingClient: lc,
Expand Down
7 changes: 6 additions & 1 deletion internal/trigger/http/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package http

import (
"context"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -43,9 +44,13 @@ type Trigger struct {
}

// Initialize initializes the Trigger for logging and REST route
func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context) (bootstrap.Deferred, error) {
func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context, background <-chan types.MessageEnvelope) (bootstrap.Deferred, error) {
logger := trigger.EdgeXClients.LoggingClient

if background != nil {
return nil, errors.New("background publishing not supported for services using HTTP trigger")
}

logger.Info("Initializing HTTP Trigger")
trigger.Webserver.SetupTriggerRoute(internal.ApiTriggerRoute, trigger.requestHandler)
// Note: Trigger endpoint doesn't change for V2 API, so just using same handler.
Expand Down
34 changes: 34 additions & 0 deletions internal/trigger/http/rest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//
// Copyright (c) 2020 Technotects
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package http

import (
"github.com/edgexfoundry/go-mod-messaging/pkg/types"
"github.com/stretchr/testify/assert"
"testing"
)

func TestTriggerInitializeWitBackgroundChannel(t *testing.T) {
background := make(chan types.MessageEnvelope)
trigger := Trigger{}

deferred, err := trigger.Initialize(nil, nil, background)

assert.Nil(t, deferred)
assert.NotNil(t, err)
assert.Equal(t, "background publishing not supported for services using HTTP trigger", err.Error())
}
13 changes: 12 additions & 1 deletion internal/trigger/messagebus/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Trigger struct {
}

// Initialize ...
func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context) (bootstrap.Deferred, error) {
func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context, background <-chan types.MessageEnvelope) (bootstrap.Deferred, error) {
var err error
logger := trigger.EdgeXClients.LoggingClient

Expand Down Expand Up @@ -117,11 +117,22 @@ func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context
err := trigger.client.Publish(outputEnvelope, trigger.Configuration.Binding.PublishTopic)
if err != nil {
logger.Error(fmt.Sprintf("Failed to publish Message to bus, %v", err))
return
}

logger.Trace("Published message to bus", "topic", trigger.Configuration.Binding.PublishTopic, clients.CorrelationHeader, msgs.CorrelationID)
}
}()
case bg := <-background:
go func() {
err := trigger.client.Publish(bg, trigger.Configuration.Binding.PublishTopic)
if err != nil {
logger.Error(fmt.Sprintf("Failed to publish background Message to bus, %v", err))
return
}

logger.Trace("Published background message to bus", "topic", trigger.Configuration.Binding.PublishTopic, clients.CorrelationHeader, bg.CorrelationID)
}()
}
}
}()
Expand Down
86 changes: 82 additions & 4 deletions internal/trigger/messagebus/messaging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestInitialize(t *testing.T) {
runtime := &runtime.GolangRuntime{}

trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}}
trigger.Initialize(&sync.WaitGroup{}, context.Background())
trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil)
assert.NotNil(t, trigger.client, "Expected client to be set")
assert.Equal(t, 1, len(trigger.topics))
assert.Equal(t, "events", trigger.topics[0].Topic)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestInitializeBadConfiguration(t *testing.T) {
runtime := &runtime.GolangRuntime{}

trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}}
_, err := trigger.Initialize(&sync.WaitGroup{}, context.Background())
_, err := trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil)
assert.Error(t, err)
}

Expand Down Expand Up @@ -146,7 +146,7 @@ func TestInitializeAndProcessEventWithNoOutput(t *testing.T) {
runtime.Initialize(nil, nil)
runtime.SetTransforms([]appcontext.AppFunction{transform1})
trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}}
trigger.Initialize(&sync.WaitGroup{}, context.Background())
trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil)

message := types.MessageEnvelope{
CorrelationID: expectedCorrelationID,
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestInitializeAndProcessEventWithOutput(t *testing.T) {

testClient.Subscribe(testTopics, testMessageErrors) //subscribe in order to receive transformed output to the bus

trigger.Initialize(&sync.WaitGroup{}, context.Background())
trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil)

message := types.MessageEnvelope{
CorrelationID: expectedCorrelationID,
Expand Down Expand Up @@ -268,3 +268,81 @@ func TestInitializeAndProcessEventWithOutput(t *testing.T) {
}
}
}

func TestInitializeAndProcessBackgroundMessage(t *testing.T) {

config := common.ConfigurationStruct{
Binding: common.BindingInfo{
Type: "meSsaGebus",
PublishTopic: "PublishTopic",
SubscribeTopic: "SubscribeTopic",
},
MessageBus: types.MessageBusConfig{
Type: "zero",
PublishHost: types.HostInfo{
Host: "*",
Port: 5588,
Protocol: "tcp",
},
SubscribeHost: types.HostInfo{
Host: "localhost",
Port: 5590,
Protocol: "tcp",
},
},
}

expectedCorrelationID := "123"

expectedPayload := []byte(`{"id":"5888dea1bd36573f4681d6f9","created":1485364897029,"modified":1485364897029,"origin":1471806386919,"pushed":0,"device":"livingroomthermostat","readings":[{"id":"5888dea0bd36573f4681d6f8","created":1485364896983,"modified":1485364896983,"origin":1471806386919,"pushed":0,"name":"temperature","value":"38","device":"livingroomthermostat"}]}`)

runtime := &runtime.GolangRuntime{}
runtime.Initialize(nil, nil)
trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}}

testClientConfig := types.MessageBusConfig{
SubscribeHost: types.HostInfo{
Host: "localhost",
Port: 5588,
Protocol: "tcp",
},
PublishHost: types.HostInfo{
Host: "*",
Port: 5590,
Protocol: "tcp",
},
Type: "zero",
}
testClient, err := messaging.NewMessageClient(testClientConfig) //new client to publish & subscribe
require.NoError(t, err, "Failed to create test client")

testTopics := []types.TopicChannel{{Topic: trigger.Configuration.Binding.PublishTopic, Messages: make(chan types.MessageEnvelope)}}
testMessageErrors := make(chan error)

testClient.Subscribe(testTopics, testMessageErrors) //subscribe in order to receive transformed output to the bus

background := make(chan types.MessageEnvelope)

trigger.Initialize(&sync.WaitGroup{}, context.Background(), background)

message := types.MessageEnvelope{
CorrelationID: expectedCorrelationID,
Payload: expectedPayload,
ContentType: clients.ContentTypeJSON,
}

background <- message

receiveMessage := true

for receiveMessage {
select {
case msgErr := <-testMessageErrors:
receiveMessage = false
assert.Error(t, msgErr)
case msgs := <-testTopics[0].Messages:
receiveMessage = false
assert.Equal(t, expectedPayload, msgs.Payload)
}
}
}
Loading

0 comments on commit aa9f94a

Please sign in to comment.