Skip to content

Commit

Permalink
feat(triggers): Support Multi-Pipeline Responses from Custom Triggers
Browse files Browse the repository at this point in the history
Enable passing a handler to process pipeline responses when invoking
runtime from a custom trigger.

fixes edgexfoundry#958

Signed-off-by: Alex Ullrich <[email protected]>
  • Loading branch information
AlexCuse committed Oct 29, 2021
1 parent 9cffd47 commit 2ee7795
Show file tree
Hide file tree
Showing 11 changed files with 551 additions and 225 deletions.
4 changes: 2 additions & 2 deletions app-service-template/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ go 1.15

require (
github.com/edgexfoundry/app-functions-sdk-go/v2 v2.0.1
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.1-dev.20
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.1-dev.26
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.7.0
)

replace github.com/edgexfoundry/app-functions-sdk-go/v2 => ../
replace github.com/edgexfoundry/app-functions-sdk-go/v2 => ../
19 changes: 10 additions & 9 deletions app-service-template/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,19 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/edgexfoundry/go-mod-bootstrap/v2 v2.0.1-dev.11 h1:nmcrzJvGaIZRQ1HQYXddbrX+g8Rk8jV0+GxsSBM7v74=
github.com/edgexfoundry/go-mod-bootstrap/v2 v2.0.1-dev.11/go.mod h1:tmM3GI64HIt3rIG3JMu52oq8otaC7GO+TF4QhzzFhfU=
github.com/edgexfoundry/go-mod-bootstrap/v2 v2.0.1-dev.15 h1:Gtk7eRe70LCdxww8tIkhGEvgxPBECb4lU/8xdRmPUI0=
github.com/edgexfoundry/go-mod-bootstrap/v2 v2.0.1-dev.15/go.mod h1:84xs+nDgmAu8X2qTWsj9GTkgcIQDzS+O/O6RjvFE38M=
github.com/edgexfoundry/go-mod-configuration/v2 v2.0.1-dev.5 h1:icE1aVlX7I3SJ0qPqZJchCr2JLe2TMRZlUMIM2qoivo=
github.com/edgexfoundry/go-mod-configuration/v2 v2.0.1-dev.5/go.mod h1:MvHit0MxBXN4bC8LL0NZRsw72ByRE1XwtVLQP9C+2vg=
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.0/go.mod h1:pfXURRetgIto0GR0sCjDrfa71hqJ1wxmQWi/mOzWfWU=
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.1-dev.20 h1:qNqnWSTeTs4dfnMzL/QKeORWwAujzdZc+rPF3GHo0m8=
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.1-dev.20/go.mod h1:I6UhBPCREubcU0ouIGBdZlNG5Xx4NijUVN5rvEtD03k=
github.com/edgexfoundry/go-mod-messaging/v2 v2.0.2-dev.6 h1:gu0+92a3ytXKDxl1IXrI7EzHJUTCgjLmAZXjHzFbDgA=
github.com/edgexfoundry/go-mod-messaging/v2 v2.0.2-dev.6/go.mod h1:bLKWB9yeOHLZoQtHLZlGwz8MjsMJIvHDFce7CcUb4fE=
github.com/edgexfoundry/go-mod-registry/v2 v2.0.1-dev.7 h1:ch3KwzwhAX78Aiwyc87jJE2y6QJboLS6ikth8JKhw90=
github.com/edgexfoundry/go-mod-registry/v2 v2.0.1-dev.7/go.mod h1:+MNlQm8Ks9ZmxqrMK4O3KdBA6E7nRK9M+qBtv/1lPcA=
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.1-dev.23/go.mod h1:I6UhBPCREubcU0ouIGBdZlNG5Xx4NijUVN5rvEtD03k=
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.1-dev.26 h1:8S8+GZR8fOSg3yvVQiIUZ+EpGtG6dDcwhZiOkvBWV8M=
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.1-dev.26/go.mod h1:I6UhBPCREubcU0ouIGBdZlNG5Xx4NijUVN5rvEtD03k=
github.com/edgexfoundry/go-mod-messaging/v2 v2.0.2-dev.7 h1:EQEcmdNcntGDrNRQgrRyHe9VLrKfQYlzlaBoJxrjsec=
github.com/edgexfoundry/go-mod-messaging/v2 v2.0.2-dev.7/go.mod h1:bLKWB9yeOHLZoQtHLZlGwz8MjsMJIvHDFce7CcUb4fE=
github.com/edgexfoundry/go-mod-registry/v2 v2.0.1-dev.8/go.mod h1:+MNlQm8Ks9ZmxqrMK4O3KdBA6E7nRK9M+qBtv/1lPcA=
github.com/edgexfoundry/go-mod-registry/v2 v2.0.1-dev.9 h1:l99AxpYUjXAILBo/WRjZ8LbvLZoW2fBZFGi0x/1M/po=
github.com/edgexfoundry/go-mod-registry/v2 v2.0.1-dev.9/go.mod h1:+MNlQm8Ks9ZmxqrMK4O3KdBA6E7nRK9M+qBtv/1lPcA=
github.com/edgexfoundry/go-mod-secrets/v2 v2.0.1-dev.7 h1:KBmLI8XkAX6mdJIEdlmuD7JiTd+KAnFNwxy8kQ8g+KQ=
github.com/edgexfoundry/go-mod-secrets/v2 v2.0.1-dev.7/go.mod h1:SbTMX0C+uGK7XCQYuD4oHmGSK5cYV/0mZLpLBu9HAmM=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
Expand Down Expand Up @@ -151,7 +153,6 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
Expand Down
82 changes: 9 additions & 73 deletions internal/app/customtrigger.go → internal/app/triggerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,13 @@ package app

import (
"fmt"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
"github.com/hashicorp/go-multierror"
"strings"
"sync"

"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/common"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/http"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/messagebus"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/mqtt"
"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces"
"strings"
)

const (
Expand All @@ -44,79 +36,21 @@ const (
TriggerTypeHTTP = "HTTP"
)

type customTriggerBinding struct {
TriggerServiceBinding
dic *di.Container
log logger.LoggingClient
}

func newCustomTriggerBinding(svc *Service) *customTriggerBinding {
return &customTriggerBinding{
TriggerServiceBinding: simpleTriggerServiceBinding{
svc,
svc.runtime,
},
log: svc.LoggingClient(),
dic: svc.dic,
}
}

func (bnd *customTriggerBinding) buildContext(env types.MessageEnvelope) interfaces.AppFunctionContext {
return appfunction.NewContext(env.CorrelationID, bnd.dic, env.ContentType)
}

// processMessage provides an interface for custom triggers to execute service's configured pipelines through the runtime.
func (bnd *customTriggerBinding) processMessage(ctx interfaces.AppFunctionContext, envelope types.MessageEnvelope) error {
bnd.log.Debugf("custom trigger attempting to find pipeline(s) for topic %s", envelope.ReceivedTopic)

// ensure we have a context established that we can safely cast to *appfunction.Context to pass to runtime
if _, ok := ctx.(*appfunction.Context); ctx == nil || !ok {
ctx = bnd.buildContext(envelope)
}

pipelines := bnd.GetMatchingPipelines(envelope.ReceivedTopic)

bnd.log.Debugf("custom trigger found %d pipeline(s) that match the incoming topic '%s'", len(pipelines), envelope.ReceivedTopic)

var finalErr error

pipelinesWaitGroup := sync.WaitGroup{}

for _, pipeline := range pipelines {
pipelinesWaitGroup.Add(1)
go func(p *interfaces.FunctionPipeline, wg *sync.WaitGroup, errCollector func(error)) {
defer wg.Done()

bnd.log.Tracef("custom trigger sending message to pipeline %s (%s)", p.Id, envelope.CorrelationID)

if msgErr := bnd.ProcessMessage(ctx.Clone().(*appfunction.Context), envelope, p); msgErr != nil {
bnd.log.Tracef("custom trigger message error in pipeline %s (%s) %s", p.Id, envelope.CorrelationID, msgErr.Err.Error())
errCollector(msgErr.Err)
}
}(pipeline, &pipelinesWaitGroup, func(e error) { finalErr = multierror.Append(finalErr, e) })
}

pipelinesWaitGroup.Wait()

return finalErr
}

func (svc *Service) setupTrigger(configuration *common.ConfigurationStruct, runtime *runtime.GolangRuntime) interfaces.Trigger {
var t interfaces.Trigger
// Need to make dynamic, search for the trigger that is input

switch triggerType := strings.ToUpper(configuration.Trigger.Type); triggerType {
case TriggerTypeHTTP:
svc.LoggingClient().Info("HTTP trigger selected")
t = http.NewTrigger(svc.dic, runtime, svc.webserver)
t = http.NewTrigger(svc.dic, svc.runtime, svc.webserver)

case TriggerTypeMessageBus:
svc.LoggingClient().Info("EdgeX MessageBus trigger selected")
t = messagebus.NewTrigger(svc.dic, runtime)
t = messagebus.NewTrigger(svc.dic, svc.runtime)

case TriggerTypeMQTT:
svc.LoggingClient().Info("External MQTT trigger selected")
t = mqtt.NewTrigger(svc.dic, runtime)
t = mqtt.NewTrigger(svc.dic, svc.runtime)

default:
if factory, found := svc.customTriggerFactories[triggerType]; found {
Expand Down Expand Up @@ -150,12 +84,14 @@ func (svc *Service) RegisterCustomTriggerFactory(name string,
}

svc.customTriggerFactories[nu] = func(sdk *Service) (interfaces.Trigger, error) {
binding := newCustomTriggerBinding(sdk)
binding := NewTriggerServiceBinding(sdk)
messageProcessor := &triggerMessageProcessor{binding}

cfg := interfaces.TriggerConfig{
Logger: sdk.lc,
ContextBuilder: binding.buildContext,
MessageProcessor: binding.processMessage,
ContextBuilder: binding.BuildContext,
MessageProcessor: messageProcessor.Process,
MessageReceived: messageProcessor.MessageReceived,
ConfigLoader: binding.LoadCustomConfig,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@ package app
import (
"context"
"errors"
"fmt"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/app/mocks"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
"github.com/hashicorp/go-multierror"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -110,7 +102,7 @@ func TestSetupTrigger_HTTP(t *testing.T) {
lc: logger.MockLogger{},
}

trigger := sdk.setupTrigger(sdk.config, sdk.runtime)
trigger := sdk.setupTrigger(sdk.config, nil)

require.NotNil(t, trigger, "should be defined")
require.IsType(t, &http.Trigger{}, trigger, "should be an http trigger")
Expand All @@ -126,7 +118,7 @@ func TestSetupTrigger_EdgeXMessageBus(t *testing.T) {
lc: logger.MockLogger{},
}

trigger := sdk.setupTrigger(sdk.config, sdk.runtime)
trigger := sdk.setupTrigger(sdk.config, nil)

require.NotNil(t, trigger, "should be defined")
require.IsType(t, &messagebus.Trigger{}, trigger, "should be an edgex-messagebus trigger")
Expand All @@ -151,7 +143,7 @@ func TestSetupTrigger_MQTT(t *testing.T) {
lc: lc,
}

trigger := sdk.setupTrigger(sdk.config, sdk.runtime)
trigger := sdk.setupTrigger(sdk.config, nil)

require.NotNil(t, trigger, "should be defined")
require.IsType(t, &mqtt.Trigger{}, trigger, "should be an external-MQTT trigger")
Expand Down Expand Up @@ -181,7 +173,7 @@ func Test_Service_setupTrigger_CustomType(t *testing.T) {
})
require.NoError(t, err)

trigger := sdk.setupTrigger(sdk.config, sdk.runtime)
trigger := sdk.setupTrigger(sdk.config, nil)

require.NotNil(t, trigger, "should be defined")
require.IsType(t, &mockCustomTrigger{}, trigger, "should be a custom trigger")
Expand All @@ -204,7 +196,7 @@ func Test_Service_SetupTrigger_CustomTypeError(t *testing.T) {
})
require.NoError(t, err)

trigger := sdk.setupTrigger(sdk.config, sdk.runtime)
trigger := sdk.setupTrigger(sdk.config, nil)

require.Nil(t, trigger, "should be nil")
}
Expand All @@ -221,126 +213,7 @@ func Test_Service_SetupTrigger_CustomTypeNotFound(t *testing.T) {
lc: logger.MockLogger{},
}

trigger := sdk.setupTrigger(sdk.config, sdk.runtime)
trigger := sdk.setupTrigger(sdk.config, nil)

require.Nil(t, trigger, "should be nil")
}

func Test_customTriggerBinding_buildContext(t *testing.T) {
container := &di.Container{}
correlationId := uuid.NewString()
contentType := uuid.NewString()

bnd := &customTriggerBinding{
dic: container,
}

got := bnd.buildContext(types.MessageEnvelope{CorrelationID: correlationId, ContentType: contentType})

require.NotNil(t, got)

assert.Equal(t, correlationId, got.CorrelationID())
assert.Equal(t, contentType, got.InputContentType())

ctx, ok := got.(*appfunction.Context)
require.True(t, ok)
assert.Equal(t, container, ctx.Dic)
}

func Test_customTriggerBinding_processMessage(t *testing.T) {
type returns struct {
runtimeProcessor interface{}
pipelineMatcher interface{}
}
type args struct {
ctx interfaces.AppFunctionContext
envelope types.MessageEnvelope
}
tests := []struct {
name string
setup returns
args args
wantErr int
}{
{
name: "no pipelines",
setup: returns{},
args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}},
wantErr: 0,
},
{
name: "single pipeline",
setup: returns{
pipelineMatcher: []*interfaces.FunctionPipeline{{}},
},
args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}},
wantErr: 0,
},
{
name: "single pipeline error",
setup: returns{
pipelineMatcher: []*interfaces.FunctionPipeline{{}},
runtimeProcessor: &runtime.MessageError{Err: fmt.Errorf("some error")},
},
args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}},
wantErr: 1,
},
{
name: "multi pipeline",
setup: returns{
pipelineMatcher: []*interfaces.FunctionPipeline{{}, {}, {}},
},
args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}},
wantErr: 0,
},
{
name: "multi pipeline single err",
setup: returns{
pipelineMatcher: []*interfaces.FunctionPipeline{{}, {Id: "errorid"}, {}},
runtimeProcessor: func(appContext *appfunction.Context, envelope types.MessageEnvelope, pipeline *interfaces.FunctionPipeline) *runtime.MessageError {
if pipeline.Id == "errorid" {
return &runtime.MessageError{Err: fmt.Errorf("new error")}
}
return nil
},
},
args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}},
wantErr: 1,
},
{
name: "multi pipeline multi err",
setup: returns{
pipelineMatcher: []*interfaces.FunctionPipeline{{}, {}, {}},
runtimeProcessor: &runtime.MessageError{Err: fmt.Errorf("new error")},
},
args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}},
wantErr: 3,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tsb := mocks.TriggerServiceBinding{}

tsb.On("ProcessMessage", mock.Anything, mock.Anything, mock.Anything).Return(tt.setup.runtimeProcessor)
tsb.On("GetMatchingPipelines", tt.args.envelope.ReceivedTopic).Return(tt.setup.pipelineMatcher)

bnd := &customTriggerBinding{
TriggerServiceBinding: &tsb,
log: logger.NewMockClient(),
}

err := bnd.processMessage(tt.args.ctx, tt.args.envelope)

require.Equal(t, err == nil, tt.wantErr == 0)

if err != nil {
merr, ok := err.(*multierror.Error)

require.True(t, ok)

assert.Equal(t, tt.wantErr, merr.Len())
}
})
}
}
Loading

0 comments on commit 2ee7795

Please sign in to comment.