Skip to content

Commit

Permalink
feat(sdk): Custom Trigger Multi-Pipeline Support
Browse files Browse the repository at this point in the history
Change service.defaultTriggerMessageProcessor to use default pipeline if configured or attempt to use topic matching logic.

Signed-off-by: Alex Ullrich <[email protected]>
  • Loading branch information
AlexCuse committed Sep 9, 2021
1 parent 262cc6a commit 38ed098
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 15 deletions.
6 changes: 6 additions & 0 deletions app-service-template/Attribution.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,18 @@ https://github.com/gorilla/mux/blob/master/LICENSE
hashicorp/consul/api (Mozilla Public License 2.0) - https://github.com/hashicorp/consul/api
https://github.com/hashicorp/consul/blob/master/LICENSE

hashicorp/errwrap (Mozilla Public License 2.0) https://github.com/hashicorp/errwrap
https://github.com/hashicorp/errwrap/blob/master/LICENSE

hashicorp/go-cleanhttp (Mozilla Public License 2.0) - https://github.com/hashicorp/go-cleanhttp
https://github.com/hashicorp/go-cleanhttp/blob/master/LICENSE

hashicorp/go-immutable-radix (Mozilla Public License 2.0) https://github.com/hashicorp/go-immutable-radix
https://github.com/hashicorp/go-immutable-radix/blob/master/LICENSE

hashicorp/go-multierror (Mozilla Public License 2.0) https://github.com/hashicorp/go-multierror
https://github.com/hashicorp/go-multierror/blob/master/LICENSE

hashicorp/go-rootcerts (Mozilla Public License 2.0) https://github.com/hashicorp/go-rootcerts
https://github.com/hashicorp/go-rootcerts/blob/master/LICENSE

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ require (
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-multierror v1.1.0
github.com/stretchr/testify v1.7.0
)
64 changes: 54 additions & 10 deletions internal/app/triggerfactory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//
// Copyright (c) 2020 Technotects
// Copyright (c) 2021 Intel Corporation
// Copyright (c) 2021 One Track Consulting

//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -19,9 +20,10 @@
package app

import (
"errors"
"fmt"
"github.com/hashicorp/go-multierror"
"strings"
"sync"

"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

Expand Down Expand Up @@ -59,7 +61,6 @@ func (svc *Service) RegisterCustomTriggerFactory(name string,
svc.customTriggerFactories[nu] = func(sdk *Service) (interfaces.Trigger, error) {
return factory(interfaces.TriggerConfig{
Logger: sdk.lc,
ContextBuilder: sdk.defaultTriggerContextBuilder,
MessageProcessor: sdk.defaultTriggerMessageProcessor,
ConfigLoader: sdk.defaultConfigLoader,
})
Expand All @@ -68,24 +69,67 @@ func (svc *Service) RegisterCustomTriggerFactory(name string,
return nil
}

func (svc *Service) defaultTriggerMessageProcessor(appContext interfaces.AppFunctionContext, envelope types.MessageEnvelope) error {
context, ok := appContext.(*appfunction.Context)
func (svc *Service) defaultTriggerMessageProcessor(envelope types.MessageEnvelope) error {
context, ok := svc.defaultTriggerContextBuilder(envelope).(*appfunction.Context)

if !ok {
return errors.New("App Context must be an instance of internal appfunction.Context. Use NewAppContext to create instance.")
return fmt.Errorf("need *appfunction.Context")
}

defaultPipeline := svc.runtime.GetDefaultPipeline()
messageError := svc.runtime.ProcessMessage(context, envelope, defaultPipeline)
if messageError != nil {
// ProcessMessage logs the error, so no need to log it here.
return messageError.Err

if defaultPipeline != nil { // then we aren't configured for topic matching, use default
context.LoggingClient().Debug("trigger using default pipeline")

messageError := svc.runtime.ProcessMessage(context, envelope, defaultPipeline)
if messageError != nil {
// ProcessMessage logs the error, so no need to log it here.
return messageError.Err
}
} else { // route to pipeline(s) via topic match
context.LoggingClient().Debug("trigger attempting to find pipeline(s) for topic %s", envelope.ReceivedTopic)

pipelines := svc.runtime.GetMatchingPipelines(envelope.ReceivedTopic)

context.LoggingClient().Debugf("trigger found %d pipeline(s) that match the incoming topic '%s'", len(pipelines), envelope.ReceivedTopic)

var finalErr error

pipelinesWaitGroup := sync.WaitGroup{}

for _, pipeline := range pipelines {
go func(p *interfaces.FunctionPipeline, e error, wg *sync.WaitGroup) {
wg.Add(1)
ctx := svc.defaultTriggerContextBuilder(envelope).(*appfunction.Context)

if !ok { // this should be OK per check above
e = multierror.Append(e, fmt.Errorf("need *appfunction.Context"))
} else if msgErr := svc.runtime.ProcessMessage(ctx, envelope, p); msgErr != nil {
e = multierror.Append(e, msgErr.Err)
}

wg.Done()
}(pipeline, finalErr, &pipelinesWaitGroup)
}

pipelinesWaitGroup.Wait()

if finalErr != nil {
return finalErr
}
}

return nil
}

func (svc *Service) defaultTriggerContextBuilder(env types.MessageEnvelope) interfaces.AppFunctionContext {
return appfunction.NewContext(env.CorrelationID, svc.dic, env.ContentType)
ctx := appfunction.NewContext(env.CorrelationID, svc.dic, env.ContentType)

if env.ReceivedTopic != "" {
ctx.AddValue(interfaces.RECEIVEDTOPIC, env.ReceivedTopic)
}

return ctx
}

func (svc *Service) defaultConfigLoader(updatableConfig interfaces.UpdatableConfig, name string) error {
Expand Down
6 changes: 1 addition & 5 deletions pkg/interfaces/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
// TriggerConfig provides a container to pass context needed for user defined triggers
type TriggerConfig struct {
Logger logger.LoggingClient
ContextBuilder TriggerContextBuilder
MessageProcessor TriggerMessageProcessor
ConfigLoader TriggerConfigLoader
}
Expand All @@ -40,9 +39,6 @@ type Trigger interface {
}

// TriggerMessageProcessor provides an interface that can be used by custom triggers to invoke the runtime
type TriggerMessageProcessor func(ctx AppFunctionContext, envelope types.MessageEnvelope) error

// TriggerContextBuilder provides an interface to construct an AppFunctionContext for message
type TriggerContextBuilder func(env types.MessageEnvelope) AppFunctionContext
type TriggerMessageProcessor func(envelope types.MessageEnvelope) error

type TriggerConfigLoader func(config UpdatableConfig, sectionName string) error

0 comments on commit 38ed098

Please sign in to comment.