Skip to content

Commit

Permalink
feat(triggers): Support Multi-Pipeline Responses from Custom Triggers
Browse files Browse the repository at this point in the history
Enable passing a handler to process pipeline responses when invoking
runtime from a custom trigger.

fixes edgexfoundry#958

Signed-off-by: Alex Ullrich <[email protected]>
  • Loading branch information
AlexCuse committed Nov 2, 2021
1 parent 8fa13c6 commit 1d92705
Show file tree
Hide file tree
Showing 9 changed files with 539 additions and 214 deletions.
82 changes: 9 additions & 73 deletions internal/app/customtrigger.go → internal/app/triggerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,13 @@ package app

import (
"fmt"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
"github.com/hashicorp/go-multierror"
"strings"
"sync"

"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/common"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/http"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/messagebus"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/mqtt"
"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces"
"strings"
)

const (
Expand All @@ -44,79 +36,21 @@ const (
TriggerTypeHTTP = "HTTP"
)

type customTriggerBinding struct {
TriggerServiceBinding
dic *di.Container
log logger.LoggingClient
}

func newCustomTriggerBinding(svc *Service) *customTriggerBinding {
return &customTriggerBinding{
TriggerServiceBinding: simpleTriggerServiceBinding{
svc,
svc.runtime,
},
log: svc.LoggingClient(),
dic: svc.dic,
}
}

func (bnd *customTriggerBinding) buildContext(env types.MessageEnvelope) interfaces.AppFunctionContext {
return appfunction.NewContext(env.CorrelationID, bnd.dic, env.ContentType)
}

// processMessage provides an interface for custom triggers to execute service's configured pipelines through the runtime.
func (bnd *customTriggerBinding) processMessage(ctx interfaces.AppFunctionContext, envelope types.MessageEnvelope) error {
bnd.log.Debugf("custom trigger attempting to find pipeline(s) for topic %s", envelope.ReceivedTopic)

// ensure we have a context established that we can safely cast to *appfunction.Context to pass to runtime
if _, ok := ctx.(*appfunction.Context); ctx == nil || !ok {
ctx = bnd.buildContext(envelope)
}

pipelines := bnd.GetMatchingPipelines(envelope.ReceivedTopic)

bnd.log.Debugf("custom trigger found %d pipeline(s) that match the incoming topic '%s'", len(pipelines), envelope.ReceivedTopic)

var finalErr error

pipelinesWaitGroup := sync.WaitGroup{}

for _, pipeline := range pipelines {
pipelinesWaitGroup.Add(1)
go func(p *interfaces.FunctionPipeline, wg *sync.WaitGroup, errCollector func(error)) {
defer wg.Done()

bnd.log.Tracef("custom trigger sending message to pipeline %s (%s)", p.Id, envelope.CorrelationID)

if msgErr := bnd.ProcessMessage(ctx.Clone().(*appfunction.Context), envelope, p); msgErr != nil {
bnd.log.Tracef("custom trigger message error in pipeline %s (%s) %s", p.Id, envelope.CorrelationID, msgErr.Err.Error())
errCollector(msgErr.Err)
}
}(pipeline, &pipelinesWaitGroup, func(e error) { finalErr = multierror.Append(finalErr, e) })
}

pipelinesWaitGroup.Wait()

return finalErr
}

func (svc *Service) setupTrigger(configuration *common.ConfigurationStruct, runtime *runtime.GolangRuntime) interfaces.Trigger {
var t interfaces.Trigger
// Need to make dynamic, search for the trigger that is input

switch triggerType := strings.ToUpper(configuration.Trigger.Type); triggerType {
case TriggerTypeHTTP:
svc.LoggingClient().Info("HTTP trigger selected")
t = http.NewTrigger(svc.dic, runtime, svc.webserver)
t = http.NewTrigger(svc.dic, svc.runtime, svc.webserver)

case TriggerTypeMessageBus:
svc.LoggingClient().Info("EdgeX MessageBus trigger selected")
t = messagebus.NewTrigger(svc.dic, runtime)
t = messagebus.NewTrigger(svc.dic, svc.runtime)

case TriggerTypeMQTT:
svc.LoggingClient().Info("External MQTT trigger selected")
t = mqtt.NewTrigger(svc.dic, runtime)
t = mqtt.NewTrigger(svc.dic, svc.runtime)

default:
if factory, found := svc.customTriggerFactories[triggerType]; found {
Expand Down Expand Up @@ -150,12 +84,14 @@ func (svc *Service) RegisterCustomTriggerFactory(name string,
}

svc.customTriggerFactories[nu] = func(sdk *Service) (interfaces.Trigger, error) {
binding := newCustomTriggerBinding(sdk)
binding := NewTriggerServiceBinding(sdk)
messageProcessor := &triggerMessageProcessor{binding}

cfg := interfaces.TriggerConfig{
Logger: sdk.lc,
ContextBuilder: binding.buildContext,
MessageProcessor: binding.processMessage,
ContextBuilder: binding.BuildContext,
MessageProcessor: messageProcessor.Process,
MessageReceived: messageProcessor.MessageReceived,
ConfigLoader: binding.LoadCustomConfig,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@ package app
import (
"context"
"errors"
"fmt"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/app/mocks"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
"github.com/hashicorp/go-multierror"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -110,7 +102,7 @@ func TestSetupTrigger_HTTP(t *testing.T) {
lc: logger.MockLogger{},
}

trigger := sdk.setupTrigger(sdk.config, sdk.runtime)
trigger := sdk.setupTrigger(sdk.config, nil)

require.NotNil(t, trigger, "should be defined")
require.IsType(t, &http.Trigger{}, trigger, "should be an http trigger")
Expand All @@ -126,7 +118,7 @@ func TestSetupTrigger_EdgeXMessageBus(t *testing.T) {
lc: logger.MockLogger{},
}

trigger := sdk.setupTrigger(sdk.config, sdk.runtime)
trigger := sdk.setupTrigger(sdk.config, nil)

require.NotNil(t, trigger, "should be defined")
require.IsType(t, &messagebus.Trigger{}, trigger, "should be an edgex-messagebus trigger")
Expand All @@ -151,7 +143,7 @@ func TestSetupTrigger_MQTT(t *testing.T) {
lc: lc,
}

trigger := sdk.setupTrigger(sdk.config, sdk.runtime)
trigger := sdk.setupTrigger(sdk.config, nil)

require.NotNil(t, trigger, "should be defined")
require.IsType(t, &mqtt.Trigger{}, trigger, "should be an external-MQTT trigger")
Expand Down Expand Up @@ -181,7 +173,7 @@ func Test_Service_setupTrigger_CustomType(t *testing.T) {
})
require.NoError(t, err)

trigger := sdk.setupTrigger(sdk.config, sdk.runtime)
trigger := sdk.setupTrigger(sdk.config, nil)

require.NotNil(t, trigger, "should be defined")
require.IsType(t, &mockCustomTrigger{}, trigger, "should be a custom trigger")
Expand All @@ -204,7 +196,7 @@ func Test_Service_SetupTrigger_CustomTypeError(t *testing.T) {
})
require.NoError(t, err)

trigger := sdk.setupTrigger(sdk.config, sdk.runtime)
trigger := sdk.setupTrigger(sdk.config, nil)

require.Nil(t, trigger, "should be nil")
}
Expand All @@ -221,126 +213,7 @@ func Test_Service_SetupTrigger_CustomTypeNotFound(t *testing.T) {
lc: logger.MockLogger{},
}

trigger := sdk.setupTrigger(sdk.config, sdk.runtime)
trigger := sdk.setupTrigger(sdk.config, nil)

require.Nil(t, trigger, "should be nil")
}

func Test_customTriggerBinding_buildContext(t *testing.T) {
container := &di.Container{}
correlationId := uuid.NewString()
contentType := uuid.NewString()

bnd := &customTriggerBinding{
dic: container,
}

got := bnd.buildContext(types.MessageEnvelope{CorrelationID: correlationId, ContentType: contentType})

require.NotNil(t, got)

assert.Equal(t, correlationId, got.CorrelationID())
assert.Equal(t, contentType, got.InputContentType())

ctx, ok := got.(*appfunction.Context)
require.True(t, ok)
assert.Equal(t, container, ctx.Dic)
}

func Test_customTriggerBinding_processMessage(t *testing.T) {
type returns struct {
runtimeProcessor interface{}
pipelineMatcher interface{}
}
type args struct {
ctx interfaces.AppFunctionContext
envelope types.MessageEnvelope
}
tests := []struct {
name string
setup returns
args args
wantErr int
}{
{
name: "no pipelines",
setup: returns{},
args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}},
wantErr: 0,
},
{
name: "single pipeline",
setup: returns{
pipelineMatcher: []*interfaces.FunctionPipeline{{}},
},
args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}},
wantErr: 0,
},
{
name: "single pipeline error",
setup: returns{
pipelineMatcher: []*interfaces.FunctionPipeline{{}},
runtimeProcessor: &runtime.MessageError{Err: fmt.Errorf("some error")},
},
args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}},
wantErr: 1,
},
{
name: "multi pipeline",
setup: returns{
pipelineMatcher: []*interfaces.FunctionPipeline{{}, {}, {}},
},
args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}},
wantErr: 0,
},
{
name: "multi pipeline single err",
setup: returns{
pipelineMatcher: []*interfaces.FunctionPipeline{{}, {Id: "errorid"}, {}},
runtimeProcessor: func(appContext *appfunction.Context, envelope types.MessageEnvelope, pipeline *interfaces.FunctionPipeline) *runtime.MessageError {
if pipeline.Id == "errorid" {
return &runtime.MessageError{Err: fmt.Errorf("new error")}
}
return nil
},
},
args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}},
wantErr: 1,
},
{
name: "multi pipeline multi err",
setup: returns{
pipelineMatcher: []*interfaces.FunctionPipeline{{}, {}, {}},
runtimeProcessor: &runtime.MessageError{Err: fmt.Errorf("new error")},
},
args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}},
wantErr: 3,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tsb := mocks.TriggerServiceBinding{}

tsb.On("ProcessMessage", mock.Anything, mock.Anything, mock.Anything).Return(tt.setup.runtimeProcessor)
tsb.On("GetMatchingPipelines", tt.args.envelope.ReceivedTopic).Return(tt.setup.pipelineMatcher)

bnd := &customTriggerBinding{
TriggerServiceBinding: &tsb,
log: logger.NewMockClient(),
}

err := bnd.processMessage(tt.args.ctx, tt.args.envelope)

require.Equal(t, err == nil, tt.wantErr == 0)

if err != nil {
merr, ok := err.(*multierror.Error)

require.True(t, ok)

assert.Equal(t, tt.wantErr, merr.Len())
}
})
}
}
Loading

0 comments on commit 1d92705

Please sign in to comment.