Skip to content

Commit

Permalink
feat(sdk): Custom Trigger Multi-Pipeline Support
Browse files Browse the repository at this point in the history
Change service.defaultTriggerMessageProcessor to use default pipeline if configured or attempt to use topic matching logic.

Signed-off-by: Alex Ullrich <[email protected]>
  • Loading branch information
AlexCuse committed Sep 10, 2021
1 parent 262cc6a commit 29bcd0b
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 2 deletions.
6 changes: 6 additions & 0 deletions app-service-template/Attribution.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,18 @@ https://github.com/gorilla/mux/blob/master/LICENSE
hashicorp/consul/api (Mozilla Public License 2.0) - https://github.com/hashicorp/consul/api
https://github.com/hashicorp/consul/blob/master/LICENSE

hashicorp/errwrap (Mozilla Public License 2.0) https://github.com/hashicorp/errwrap
https://github.com/hashicorp/errwrap/blob/master/LICENSE

hashicorp/go-cleanhttp (Mozilla Public License 2.0) - https://github.com/hashicorp/go-cleanhttp
https://github.com/hashicorp/go-cleanhttp/blob/master/LICENSE

hashicorp/go-immutable-radix (Mozilla Public License 2.0) https://github.com/hashicorp/go-immutable-radix
https://github.com/hashicorp/go-immutable-radix/blob/master/LICENSE

hashicorp/go-multierror (Mozilla Public License 2.0) https://github.com/hashicorp/go-multierror
https://github.com/hashicorp/go-multierror/blob/master/LICENSE

hashicorp/go-rootcerts (Mozilla Public License 2.0) https://github.com/hashicorp/go-rootcerts
https://github.com/hashicorp/go-rootcerts/blob/master/LICENSE

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ require (
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-multierror v1.1.0
github.com/stretchr/testify v1.7.0
)
59 changes: 59 additions & 0 deletions internal/app/triggerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ package app
import (
"errors"
"fmt"
"github.com/hashicorp/go-multierror"
"strings"
"sync"

"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

Expand Down Expand Up @@ -61,13 +63,69 @@ func (svc *Service) RegisterCustomTriggerFactory(name string,
Logger: sdk.lc,
ContextBuilder: sdk.defaultTriggerContextBuilder,
MessageProcessor: sdk.defaultTriggerMessageProcessor,
ProcessMessage: sdk.processMessageOnRuntime,
ConfigLoader: sdk.defaultConfigLoader,
})
}

return nil
}

func (svc *Service) buildContextForRuntime(envelope types.MessageEnvelope) *appfunction.Context {
context := appfunction.NewContext(envelope.CorrelationID, svc.dic, envelope.ContentType)

if envelope.ReceivedTopic != "" {
context.AddValue(interfaces.RECEIVEDTOPIC, envelope.ReceivedTopic)
}

return context
}
func (svc *Service) processMessageOnRuntime(envelope types.MessageEnvelope) error {
context := svc.buildContextForRuntime(envelope)
defaultPipeline := svc.runtime.GetDefaultPipeline()

if defaultPipeline != nil { // then we aren't configured for topic matching, use default
context.LoggingClient().Debug("trigger using default pipeline")

messageError := svc.runtime.ProcessMessage(context, envelope, defaultPipeline)
if messageError != nil {
// ProcessMessage logs the error, so no need to log it here.
return messageError.Err
}
} else { // route to pipeline(s) via topic match
context.LoggingClient().Debug("trigger attempting to find pipeline(s) for topic %s", envelope.ReceivedTopic)

pipelines := svc.runtime.GetMatchingPipelines(envelope.ReceivedTopic)

context.LoggingClient().Debugf("trigger found %d pipeline(s) that match the incoming topic '%s'", len(pipelines), envelope.ReceivedTopic)

var finalErr error

pipelinesWaitGroup := sync.WaitGroup{}

for _, pipeline := range pipelines {
go func(p *interfaces.FunctionPipeline, e error, wg *sync.WaitGroup) {
wg.Add(1)
ctx := svc.buildContextForRuntime(envelope)
if msgErr := svc.runtime.ProcessMessage(ctx, envelope, p); msgErr != nil {
e = multierror.Append(e, msgErr.Err)
}

wg.Done()
}(pipeline, finalErr, &pipelinesWaitGroup)
}

pipelinesWaitGroup.Wait()

if finalErr != nil {
return finalErr
}
}

return nil
}

//Deprecated
func (svc *Service) defaultTriggerMessageProcessor(appContext interfaces.AppFunctionContext, envelope types.MessageEnvelope) error {
context, ok := appContext.(*appfunction.Context)
if !ok {
Expand All @@ -84,6 +142,7 @@ func (svc *Service) defaultTriggerMessageProcessor(appContext interfaces.AppFunc
return nil
}

// Deprecated
func (svc *Service) defaultTriggerContextBuilder(env types.MessageEnvelope) interfaces.AppFunctionContext {
return appfunction.NewContext(env.CorrelationID, svc.dic, env.ContentType)
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/interfaces/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ import (

// TriggerConfig provides a container to pass context needed for user defined triggers
type TriggerConfig struct {
Logger logger.LoggingClient
ContextBuilder TriggerContextBuilder
Logger logger.LoggingClient
// Deprecated
ContextBuilder TriggerContextBuilder
// Deprecated
MessageProcessor TriggerMessageProcessor
ProcessMessage MessageProcessor
ConfigLoader TriggerConfigLoader
}

Expand All @@ -40,9 +43,14 @@ type Trigger interface {
}

// TriggerMessageProcessor provides an interface that can be used by custom triggers to invoke the runtime
// Deprecated: use MessageProcessor instead
type TriggerMessageProcessor func(ctx AppFunctionContext, envelope types.MessageEnvelope) error

// TriggerContextBuilder provides an interface to construct an AppFunctionContext for message
// Deprecated: only used with legacy TriggerMessageProcessor
type TriggerContextBuilder func(env types.MessageEnvelope) AppFunctionContext

// MessageProcessor provides an interface that can be used by custom triggers to invoke the runtime
type MessageProcessor func(envelope types.MessageEnvelope) error

type TriggerConfigLoader func(config UpdatableConfig, sectionName string) error

0 comments on commit 29bcd0b

Please sign in to comment.