Skip to content

Commit

Permalink
refactor(triggers): Normalize Orchestration
Browse files Browse the repository at this point in the history
Share functionality between custom and built-in triggers, expanding
testability of those built-in.

Signed-off-by: Alex Ullrich <[email protected]>
  • Loading branch information
AlexCuse committed Oct 12, 2021
1 parent 1d59920 commit fef4bc0
Show file tree
Hide file tree
Showing 29 changed files with 1,740 additions and 1,016 deletions.
65 changes: 0 additions & 65 deletions internal/app/mocks/TriggerServiceBinding.go

This file was deleted.

2 changes: 1 addition & 1 deletion internal/app/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (svc *Service) MakeItRun() error {
svc.ctx.stop = stop

// determine input type and create trigger for it
t := svc.setupTrigger(svc.config, svc.runtime)
t := svc.setupTrigger(svc.config)
if t == nil {
return errors.New("failed to create Trigger")
}
Expand Down
4 changes: 2 additions & 2 deletions internal/app/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestSetupHTTPTrigger(t *testing.T) {
testRuntime := runtime.NewGolangRuntime("", nil, dic)
err := testRuntime.SetDefaultFunctionsPipeline(nil)
require.NoError(t, err)
trigger := sdk.setupTrigger(sdk.config, testRuntime)
trigger := sdk.setupTrigger(sdk.config)
result := IsInstanceOf(trigger, (*triggerHttp.Trigger)(nil))
assert.True(t, result, "Expected Instance of HTTP Trigger")
}
Expand All @@ -261,7 +261,7 @@ func TestSetupMessageBusTrigger(t *testing.T) {
testRuntime := runtime.NewGolangRuntime("", nil, dic)
err := testRuntime.SetDefaultFunctionsPipeline(nil)
require.NoError(t, err)
trigger := sdk.setupTrigger(sdk.config, testRuntime)
trigger := sdk.setupTrigger(sdk.config)
result := IsInstanceOf(trigger, (*messagebus.Trigger)(nil))
assert.True(t, result, "Expected Instance of Message Bus Trigger")
}
Expand Down
88 changes: 13 additions & 75 deletions internal/app/customtrigger.go → internal/app/triggerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,12 @@ package app

import (
"fmt"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
"github.com/hashicorp/go-multierror"
"strings"
"sync"

"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/common"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/http"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/messagebus"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/mqtt"
"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces"
"strings"
)

const (
Expand All @@ -44,79 +35,25 @@ const (
TriggerTypeHTTP = "HTTP"
)

type customTriggerBinding struct {
TriggerServiceBinding
dic *di.Container
log logger.LoggingClient
}

func newCustomTriggerBinding(svc *Service) *customTriggerBinding {
return &customTriggerBinding{
TriggerServiceBinding: simpleTriggerServiceBinding{
svc,
svc.runtime,
},
log: svc.LoggingClient(),
dic: svc.dic,
}
}

func (bnd *customTriggerBinding) buildContext(env types.MessageEnvelope) interfaces.AppFunctionContext {
return appfunction.NewContext(env.CorrelationID, bnd.dic, env.ContentType)
}

// processMessage provides an interface for custom triggers to execute service's configured pipelines through the runtime.
func (bnd *customTriggerBinding) processMessage(ctx interfaces.AppFunctionContext, envelope types.MessageEnvelope) error {
bnd.log.Debugf("custom trigger attempting to find pipeline(s) for topic %s", envelope.ReceivedTopic)

// ensure we have a context established that we can safely cast to *appfunction.Context to pass to runtime
if _, ok := ctx.(*appfunction.Context); ctx == nil || !ok {
ctx = bnd.buildContext(envelope)
}

pipelines := bnd.GetMatchingPipelines(envelope.ReceivedTopic)

bnd.log.Debugf("custom trigger found %d pipeline(s) that match the incoming topic '%s'", len(pipelines), envelope.ReceivedTopic)

var finalErr error

pipelinesWaitGroup := sync.WaitGroup{}

for _, pipeline := range pipelines {
pipelinesWaitGroup.Add(1)
go func(p *interfaces.FunctionPipeline, wg *sync.WaitGroup, errCollector func(error)) {
defer wg.Done()

bnd.log.Tracef("custom trigger sending message to pipeline %s (%s)", p.Id, envelope.CorrelationID)

if msgErr := bnd.ProcessMessage(ctx.Clone().(*appfunction.Context), envelope, p); msgErr != nil {
bnd.log.Tracef("custom trigger message error in pipeline %s (%s) %s", p.Id, envelope.CorrelationID, msgErr.Err.Error())
errCollector(msgErr.Err)
}
}(pipeline, &pipelinesWaitGroup, func(e error) { finalErr = multierror.Append(finalErr, e) })
}

pipelinesWaitGroup.Wait()

return finalErr
}

func (svc *Service) setupTrigger(configuration *common.ConfigurationStruct, runtime *runtime.GolangRuntime) interfaces.Trigger {
func (svc *Service) setupTrigger(configuration *common.ConfigurationStruct) interfaces.Trigger {
var t interfaces.Trigger
// Need to make dynamic, search for the trigger that is input

// this is an extension on the standard trigger binding and can be used for both
bnd := NewTriggerServiceBinding(svc)
mp := &triggerMessageProcessor{bnd}

switch triggerType := strings.ToUpper(configuration.Trigger.Type); triggerType {
case TriggerTypeHTTP:
svc.LoggingClient().Info("HTTP trigger selected")
t = http.NewTrigger(svc.dic, runtime, svc.webserver)
t = http.NewTrigger(bnd, mp, svc.webserver)

case TriggerTypeMessageBus:
svc.LoggingClient().Info("EdgeX MessageBus trigger selected")
t = messagebus.NewTrigger(svc.dic, runtime)
t = messagebus.NewTrigger(bnd, mp)

case TriggerTypeMQTT:
svc.LoggingClient().Info("External MQTT trigger selected")
t = mqtt.NewTrigger(svc.dic, runtime)
t = mqtt.NewTrigger(bnd, mp)

default:
if factory, found := svc.customTriggerFactories[triggerType]; found {
Expand Down Expand Up @@ -150,12 +87,13 @@ func (svc *Service) RegisterCustomTriggerFactory(name string,
}

svc.customTriggerFactories[nu] = func(sdk *Service) (interfaces.Trigger, error) {
binding := newCustomTriggerBinding(sdk)
binding := NewTriggerServiceBinding(sdk)
messageProcessor := &triggerMessageProcessor{binding}

cfg := interfaces.TriggerConfig{
Logger: sdk.lc,
ContextBuilder: binding.buildContext,
MessageProcessor: binding.processMessage,
ContextBuilder: binding.BuildContext,
MessageProcessor: messageProcessor.Process,
ConfigLoader: binding.LoadCustomConfig,
}

Expand Down
Loading

0 comments on commit fef4bc0

Please sign in to comment.