Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): Add background publisher to MessageBus #466

Merged
merged 1 commit into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions appsdk/backgroundpublisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
// Copyright (c) 2020 Technotects
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package appsdk
lenny-goodell marked this conversation as resolved.
Show resolved Hide resolved

import "github.com/edgexfoundry/go-mod-messaging/pkg/types"

// BackgroundPublisher provides an interface to send messages from background processes
// through the service's configured MessageBus output
type BackgroundPublisher interface {
lenny-goodell marked this conversation as resolved.
Show resolved Hide resolved
// Publish provided message through the configured MessageBus output
Publish(payload []byte, correlationID string, contentType string)
}

type backgroundPublisher struct {
output chan<- types.MessageEnvelope
}

// Publish provided message through the configured MessageBus output
func (pub *backgroundPublisher) Publish(payload []byte, correlationID string, contentType string) {
outputEnvelope := types.MessageEnvelope{
CorrelationID: correlationID,
Payload: payload,
ContentType: contentType,
}

pub.output <- outputEnvelope
}

func newBackgroundPublisher(capacity int) (<-chan types.MessageEnvelope, BackgroundPublisher) {
backgroundChannel := make(chan types.MessageEnvelope, capacity)
return backgroundChannel, &backgroundPublisher{output: backgroundChannel}
}
48 changes: 48 additions & 0 deletions appsdk/backgroundpublisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//
// Copyright (c) 2020 Technotects
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package appsdk

import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestNewBackgroundPublisherAndPublish(t *testing.T) {
background, pub := newBackgroundPublisher(1)

payload := []byte("something")
correlationId := "id"
contentType := "type"

pub.Publish(payload, correlationId, contentType)

waiting := true

for waiting {
select {
case msgs := <-background:
assert.Equal(t, correlationId, msgs.CorrelationID)
assert.Equal(t, contentType, msgs.ContentType)
assert.Equal(t, payload, msgs.Payload)
waiting = false
case <-time.After(1 * time.Second):
assert.Fail(t, "message timed out, background channel likely not configured correctly")
waiting = false
}
}
}
12 changes: 11 additions & 1 deletion appsdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"github.com/edgexfoundry/go-mod-messaging/pkg/types"
nethttp "net/http"
"net/url"
"os"
Expand Down Expand Up @@ -111,6 +112,7 @@ type AppFunctionsSDK struct {
appCancelCtx context.CancelFunc
deferredFunctions []bootstrap.Deferred
serviceKeyOverride string
backgroundChannel <-chan types.MessageEnvelope
}

// AddRoute allows you to leverage the existing webserver to add routes.
Expand All @@ -125,6 +127,14 @@ func (sdk *AppFunctionsSDK) AddRoute(route string, handler func(nethttp.Response
return sdk.webserver.AddRoute(route, sdk.addContext(handler), methods...)
}

// AddBackgroundPublisher will create a channel of provided capacity to be
// consumed by the MessageBus output and return a publisher that writes to it
func (sdk *AppFunctionsSDK) AddBackgroundPublisher(capacity int) BackgroundPublisher {
bgchan, pub := newBackgroundPublisher(capacity)
sdk.backgroundChannel = bgchan
return pub
}

// MakeItRun will initialize and start the trigger as specifed in the
// configuration. It will also configure the webserver and start listening on
// the specified port.
Expand All @@ -143,7 +153,7 @@ func (sdk *AppFunctionsSDK) MakeItRun() error {
t := sdk.setupTrigger(sdk.config, sdk.runtime)

// Initialize the trigger (i.e. start a web server, or connect to message bus)
deferred, err := t.Initialize(sdk.appWg, sdk.appCtx)
deferred, err := t.Initialize(sdk.appWg, sdk.appCtx, sdk.backgroundChannel)
if err != nil {
sdk.LoggingClient.Error(err.Error())
}
Expand Down
24 changes: 20 additions & 4 deletions appsdk/sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
package appsdk

import (
"fmt"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net/http"
"os"
"reflect"
"testing"

"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/edgexfoundry/go-mod-core-contracts/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/models"

Expand Down Expand Up @@ -68,6 +68,22 @@ func TestAddRoute(t *testing.T) {

}

func TestAddBackgroundPublisher(t *testing.T) {
sdk := AppFunctionsSDK{}
pub, ok := sdk.AddBackgroundPublisher(1).(*backgroundPublisher)

if !ok {
assert.Fail(t, fmt.Sprintf("Unexpected BackgroundPublisher implementation encountered: %T", pub))
}

require.NotNil(t, pub.output, "publisher should have an output channel set")
require.NotNil(t, sdk.backgroundChannel, "sdk should have a background channel set for passing to trigger intitialization")

// compare addresses since types will not match
assert.Equal(t, fmt.Sprintf("%p", sdk.backgroundChannel), fmt.Sprintf("%p", pub.output),
lenny-goodell marked this conversation as resolved.
Show resolved Hide resolved
"same channel should be referenced by the BackgroundPublisher and the SDK.")
}

func TestSetupHTTPTrigger(t *testing.T) {
sdk := AppFunctionsSDK{
LoggingClient: lc,
Expand Down
7 changes: 6 additions & 1 deletion internal/trigger/http/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package http

import (
"context"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -43,9 +44,13 @@ type Trigger struct {
}

// Initialize initializes the Trigger for logging and REST route
func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context) (bootstrap.Deferred, error) {
func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context, background <-chan types.MessageEnvelope) (bootstrap.Deferred, error) {
logger := trigger.EdgeXClients.LoggingClient

if background != nil {
return nil, errors.New("background publishing not supported for services using HTTP trigger")
}

logger.Info("Initializing HTTP Trigger")
trigger.Webserver.SetupTriggerRoute(internal.ApiTriggerRoute, trigger.requestHandler)
// Note: Trigger endpoint doesn't change for V2 API, so just using same handler.
Expand Down
34 changes: 34 additions & 0 deletions internal/trigger/http/rest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//
// Copyright (c) 2020 Technotects
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package http

import (
"github.com/edgexfoundry/go-mod-messaging/pkg/types"
"github.com/stretchr/testify/assert"
"testing"
)

func TestTriggerInitializeWitBackgroundChannel(t *testing.T) {
background := make(chan types.MessageEnvelope)
trigger := Trigger{}

deferred, err := trigger.Initialize(nil, nil, background)

assert.Nil(t, deferred)
assert.NotNil(t, err)
assert.Equal(t, "background publishing not supported for services using HTTP trigger", err.Error())
}
13 changes: 12 additions & 1 deletion internal/trigger/messagebus/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Trigger struct {
}

// Initialize ...
func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context) (bootstrap.Deferred, error) {
func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context, background <-chan types.MessageEnvelope) (bootstrap.Deferred, error) {
var err error
logger := trigger.EdgeXClients.LoggingClient

Expand Down Expand Up @@ -117,11 +117,22 @@ func (trigger *Trigger) Initialize(appWg *sync.WaitGroup, appCtx context.Context
err := trigger.client.Publish(outputEnvelope, trigger.Configuration.Binding.PublishTopic)
if err != nil {
logger.Error(fmt.Sprintf("Failed to publish Message to bus, %v", err))
return
}

logger.Trace("Published message to bus", "topic", trigger.Configuration.Binding.PublishTopic, clients.CorrelationHeader, msgs.CorrelationID)
}
}()
case bg := <-background:
go func() {
err := trigger.client.Publish(bg, trigger.Configuration.Binding.PublishTopic)
if err != nil {
logger.Error(fmt.Sprintf("Failed to publish background Message to bus, %v", err))
lenny-goodell marked this conversation as resolved.
Show resolved Hide resolved
return
}

logger.Trace("Published background message to bus", "topic", trigger.Configuration.Binding.PublishTopic, clients.CorrelationHeader, bg.CorrelationID)
}()
}
}
}()
Expand Down
86 changes: 82 additions & 4 deletions internal/trigger/messagebus/messaging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestInitialize(t *testing.T) {
runtime := &runtime.GolangRuntime{}

trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}}
trigger.Initialize(&sync.WaitGroup{}, context.Background())
trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil)
assert.NotNil(t, trigger.client, "Expected client to be set")
assert.Equal(t, 1, len(trigger.topics))
assert.Equal(t, "events", trigger.topics[0].Topic)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestInitializeBadConfiguration(t *testing.T) {
runtime := &runtime.GolangRuntime{}

trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}}
_, err := trigger.Initialize(&sync.WaitGroup{}, context.Background())
_, err := trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil)
assert.Error(t, err)
}

Expand Down Expand Up @@ -146,7 +146,7 @@ func TestInitializeAndProcessEventWithNoOutput(t *testing.T) {
runtime.Initialize(nil, nil)
runtime.SetTransforms([]appcontext.AppFunction{transform1})
trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}}
trigger.Initialize(&sync.WaitGroup{}, context.Background())
trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil)

message := types.MessageEnvelope{
CorrelationID: expectedCorrelationID,
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestInitializeAndProcessEventWithOutput(t *testing.T) {

testClient.Subscribe(testTopics, testMessageErrors) //subscribe in order to receive transformed output to the bus

trigger.Initialize(&sync.WaitGroup{}, context.Background())
trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil)

message := types.MessageEnvelope{
CorrelationID: expectedCorrelationID,
Expand Down Expand Up @@ -268,3 +268,81 @@ func TestInitializeAndProcessEventWithOutput(t *testing.T) {
}
}
}

func TestInitializeAndProcessBackgroundMessage(t *testing.T) {

config := common.ConfigurationStruct{
Binding: common.BindingInfo{
Type: "meSsaGebus",
PublishTopic: "PublishTopic",
SubscribeTopic: "SubscribeTopic",
},
MessageBus: types.MessageBusConfig{
Type: "zero",
PublishHost: types.HostInfo{
Host: "*",
Port: 5588,
Protocol: "tcp",
},
SubscribeHost: types.HostInfo{
Host: "localhost",
Port: 5590,
Protocol: "tcp",
},
},
}

expectedCorrelationID := "123"

expectedPayload := []byte(`{"id":"5888dea1bd36573f4681d6f9","created":1485364897029,"modified":1485364897029,"origin":1471806386919,"pushed":0,"device":"livingroomthermostat","readings":[{"id":"5888dea0bd36573f4681d6f8","created":1485364896983,"modified":1485364896983,"origin":1471806386919,"pushed":0,"name":"temperature","value":"38","device":"livingroomthermostat"}]}`)

runtime := &runtime.GolangRuntime{}
runtime.Initialize(nil, nil)
trigger := Trigger{Configuration: &config, Runtime: runtime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}}

testClientConfig := types.MessageBusConfig{
SubscribeHost: types.HostInfo{
Host: "localhost",
Port: 5588,
Protocol: "tcp",
},
PublishHost: types.HostInfo{
Host: "*",
Port: 5590,
Protocol: "tcp",
},
Type: "zero",
}
testClient, err := messaging.NewMessageClient(testClientConfig) //new client to publish & subscribe
require.NoError(t, err, "Failed to create test client")

testTopics := []types.TopicChannel{{Topic: trigger.Configuration.Binding.PublishTopic, Messages: make(chan types.MessageEnvelope)}}
testMessageErrors := make(chan error)

testClient.Subscribe(testTopics, testMessageErrors) //subscribe in order to receive transformed output to the bus

background := make(chan types.MessageEnvelope)

trigger.Initialize(&sync.WaitGroup{}, context.Background(), background)

message := types.MessageEnvelope{
CorrelationID: expectedCorrelationID,
Payload: expectedPayload,
ContentType: clients.ContentTypeJSON,
}

background <- message

receiveMessage := true

for receiveMessage {
select {
case msgErr := <-testMessageErrors:
receiveMessage = false
assert.Error(t, msgErr)
case msgs := <-testTopics[0].Messages:
receiveMessage = false
assert.Equal(t, expectedPayload, msgs.Payload)
}
}
}
Loading