Skip to content

Commit

Permalink
feat: Add Pipeline per Topic capability (#938)
Browse files Browse the repository at this point in the history
* feat: Add Pipeline per Topic capability

closes #575 & #713

Signed-off-by: Leonard Goodell <[email protected]>
  • Loading branch information
Lenny Goodell authored Aug 26, 2021
1 parent 4d69672 commit 262cc6a
Show file tree
Hide file tree
Showing 51 changed files with 1,395 additions and 549 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ GO=CGO_ENABLED=1 GO111MODULE=on go
build:
make -C ./app-service-template build

tidy:
go mod tidy

test-template:
make -C ./app-service-template test

Expand Down
9 changes: 4 additions & 5 deletions app-service-template/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ GOFLAGS=-ldflags "-X github.com/edgexfoundry/app-functions-sdk-go/v2/internal.SD
#GIT_SHA=$(shell git rev-parse HEAD)
GIT_SHA=no-sha

# `go mod tidy` is needed for the Depend a Bot PRs which don't update the go.sum in this folder, causing tests to fail
# in the pipeline since the tests call this make build.
# TODO: Remove `go mod tidy` when creating your new app services using this template
build:
go mod tidy
build: tidy
$(GO) build $(GOFLAGS) -o $(MICROSERVICE)

tidy:
go mod tidy

# TODO: Change the registries (edgexfoundry, nexus3.edgexfoundry.org:10004) below as needed.
# Leave them as is if service is to be upstreamed to EdgeX Foundry
# NOTE: This is only used for local development. Jenkins CI does not use this make target
Expand Down
41 changes: 23 additions & 18 deletions app-service-template/functions/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package functions

import (
"errors"
"fmt"
"strings"

"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces"
Expand All @@ -28,49 +28,54 @@ import (

// TODO: Create your custom type and function(s) and remove these samples

// NewSample ...
// TODO: Add parameters that the function(s) will need each time one is executed
func NewSample() Sample {
return Sample{}
}

// Sample ...
type Sample struct {
// TODO: Add properties that the function(s) will need each time one is executed
}

// LogEventDetails is example of processing an Event and passing the original Event to to next function in the pipeline
// LogEventDetails is example of processing an Event and passing the original Event to next function in the pipeline
// For more details on the Context API got here: https://docs.edgexfoundry.org/1.3/microservices/application/ContextAPI/
func (s *Sample) LogEventDetails(ctx interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
lc := ctx.LoggingClient()
lc.Debug("LogEventDetails called")
lc.Debugf("LogEventDetails called in pipeline '%s'", ctx.PipelineId())

if data == nil {
// Go here for details on Error Handle: https://docs.edgexfoundry.org/1.3/microservices/application/ErrorHandling/
return false, errors.New("no Event Received")
return false, fmt.Errorf("function LogEventDetails in pipeline '%s': No Data Received", ctx.PipelineId())
}

event, ok := data.(dtos.Event)
if !ok {
return false, errors.New("type received is not an Event")
return false, fmt.Errorf("function LogEventDetails in pipeline '%s', type received is not an Event", ctx.PipelineId())
}

lc.Infof("Event received: ID=%s, Device=%s, and ReadingCount=%d",
lc.Infof("Event received in pipeline '%s': ID=%s, Device=%s, and ReadingCount=%d",
ctx.PipelineId(),
event.Id,
event.DeviceName,
len(event.Readings))
for index, reading := range event.Readings {
switch strings.ToLower(reading.ValueType) {
case strings.ToLower(common.ValueTypeBinary):
lc.Infof(
"Reading #%d received with ID=%s, Resource=%s, ValueType=%s, MediaType=%s and BinaryValue of size=`%d`",
"Reading #%d received in pipeline '%s' with ID=%s, Resource=%s, ValueType=%s, MediaType=%s and BinaryValue of size=`%d`",
index+1,
ctx.PipelineId(),
reading.Id,
reading.ResourceName,
reading.ValueType,
reading.MediaType,
len(reading.BinaryValue))
default:
lc.Infof("Reading #%d received with ID=%s, Resource=%s, ValueType=%s, Value=`%s`",
lc.Infof("Reading #%d received in pipeline '%s' with ID=%s, Resource=%s, ValueType=%s, Value=`%s`",
index+1,
ctx.PipelineId(),
reading.Id,
reading.ResourceName,
reading.ValueType,
Expand All @@ -83,29 +88,29 @@ func (s *Sample) LogEventDetails(ctx interfaces.AppFunctionContext, data interfa
return true, event
}

// ConvertEventToXML is example of transforming an Event and passing the transformed data to to next function in the pipeline
// ConvertEventToXML is example of transforming an Event and passing the transformed data to next function in the pipeline
func (s *Sample) ConvertEventToXML(ctx interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
lc := ctx.LoggingClient()
lc.Debug("ConvertEventToXML called")
lc.Debugf("ConvertEventToXML called in pipeline '%s'", ctx.PipelineId())

if data == nil {
return false, errors.New("no Event Received")
return false, fmt.Errorf("function ConvertEventToXML in pipeline '%s': No Data Received", ctx.PipelineId())
}

event, ok := data.(dtos.Event)
if !ok {
return false, errors.New("type received is not an Event")
return false, fmt.Errorf("function ConvertEventToXML in pipeline '%s': type received is not an Event", ctx.PipelineId())
}

xml, err := event.ToXML()
if err != nil {
return false, errors.New("failed to convert event to XML")
return false, fmt.Errorf("function ConvertEventToXML in pipeline '%s': failed to convert event to XML", ctx.PipelineId())
}

// Example of DEBUG message which by default you don't want to be logged.
// To see debug log messages, Set WRITABLE_LOGLEVEL=DEBUG environment variable or
// change LogLevel in configuration.toml before running app service.
lc.Debug("Event converted to XML: " + xml)
lc.Debugf("Event converted to XML in pipeline '%s': %s", ctx.PipelineId(), xml)

// Returning true indicates that the pipeline execution should continue with the next function
// using the event passed as input in this case.
Expand All @@ -115,18 +120,18 @@ func (s *Sample) ConvertEventToXML(ctx interfaces.AppFunctionContext, data inter
// OutputXML is an example of processing transformed data
func (s *Sample) OutputXML(ctx interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
lc := ctx.LoggingClient()
lc.Debug("OutputXML called")
lc.Debugf("OutputXML called in pipeline '%s'", ctx.PipelineId())

if data == nil {
return false, errors.New("no XML Received")
return false, fmt.Errorf("function OutputXML in pipeline '%s': No Data Received", ctx.PipelineId())
}

xml, ok := data.(string)
if !ok {
return false, errors.New("type received is not an string")
return false, fmt.Errorf("function ConvertEventToXML in pipeline '%s': type received is not an string", ctx.PipelineId())
}

lc.Debugf("Outputting the following XML: %s", xml)
lc.Debugf("Outputting the following XML in pipeline '%s': %s", ctx.PipelineId(), xml)

// This sends the XML as a response. i.e. publish for MessageBus/MQTT triggers as configured or
// HTTP response to for the HTTP Trigger
Expand Down
2 changes: 1 addition & 1 deletion app-service-template/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go 1.15
require (
github.com/edgexfoundry/app-functions-sdk-go/v2 v2.0.1
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.0
github.com/google/uuid v1.2.0
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.7.0
)

Expand Down
6 changes: 4 additions & 2 deletions app-service-template/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fxamacker/cbor/v2 v2.2.0 h1:6eXqdDDe588rSYAi1HfZKbx6YYQO4mxQ9eC6xYpU/JQ=
github.com/fxamacker/cbor/v2 v2.2.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/fxamacker/cbor/v2 v2.3.0 h1:aM45YGMctNakddNNAezPxDUpv38j44Abh+hifNuqXik=
github.com/fxamacker/cbor/v2 v2.3.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA=
Expand All @@ -57,8 +58,9 @@ github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNu
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
Expand Down
38 changes: 35 additions & 3 deletions app-service-template/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
return -1
}

// TODO: Replace below functions with built in and/or your custom functions for your use case.
// See https://docs.edgexfoundry.org/2.0/microservices/application/BuiltIn/ for list of built-in functions
sample := functions.NewSample()

// TODO: Replace below functions with built in and/or your custom functions for your use case
// or remove is using Pipeline By Topic below.
// See https://docs.edgexfoundry.org/2.0/microservices/application/BuiltIn/ for list of built-in functions
err = app.service.SetFunctionsPipeline(
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
Expand All @@ -110,6 +112,36 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
return -1
}

// TODO: Remove adding functions pipelines by topic if default pipeline above is all your Use Case needs.
// Or remove default above if your use case needs multiple pipelines by topic.
// Example of adding functions pipelines by topic.
// These pipelines will only execute if the specified topic match the incoming topic.
// Note: Device services publish to the 'edgex/events/device/<profile-name>/<device-name>/<source-name>' topic
// Core Data publishes to the 'edgex/events/core/<profile-name>/<device-name>/<source-name>' topic
// Note: This example with default above causes Events from Random-Float-Device device to be processed twice
// resulting in the XML to be published back to the MessageBus twice.
// See <Pipeline By Topic documentation url TBD> for more details.
err = app.service.AddFunctionsPipelineForTopic("Floats", "edgex/events/#/#/Random-Float-Device/#",
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
sample.ConvertEventToXML,
sample.OutputXML)
if err != nil {
app.lc.Errorf("AddFunctionsPipelineForTopic returned error: %s", err.Error())
return -1
}
// Note: This example with default above causes Events from Int32 source to be processed twice
// resulting in the XML to be published back to the MessageBus twice.
err = app.service.AddFunctionsPipelineForTopic("Int32s", "edgex/events/#/#/#/Int32",
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
sample.ConvertEventToXML,
sample.OutputXML)
if err != nil {
app.lc.Errorf("AddFunctionsPipelineForTopic returned error: %s", err.Error())
return -1
}

if err := app.service.MakeItRun(); err != nil {
app.lc.Errorf("MakeItRun returned error: %s", err.Error())
return -1
Expand All @@ -120,10 +152,10 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
return 0
}

// TODO: Update using your Custom configuration 'writeable' type or remove if not using ListenForCustomConfigChanges
// ProcessConfigUpdates processes the updated configuration for the service's writable configuration.
// At a minimum it must copy the updated configuration into the service's current configuration. Then it can
// do any special processing for changes that require more.
// TODO: Update using your Custom configuration 'writeable' type or remove if not using ListenForCustomConfigChanges
func (app *myApp) ProcessConfigUpdates(rawWritableConfig interface{}) {
updated, ok := rawWritableConfig.(*config.AppCustomConfig)
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions app-service-template/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func TestCreateAndRunService_Success(t *testing.T) {
Return([]string{"Random-Boolean-Device, Random-Integer-Device"}, nil)
mockAppService.On("SetFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("AddFunctionsPipelineForTopic", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("LoadCustomConfig", mock.Anything, mock.Anything, mock.Anything).
Return(nil).Run(func(args mock.Arguments) {
// set the required configuration so validation passes
Expand Down Expand Up @@ -148,6 +150,8 @@ func TestCreateAndRunService_MakeItRun_Failed(t *testing.T) {
Return(nil)
mockAppService.On("SetFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("AddFunctionsPipelineForTopic", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("MakeItRun").Return(fmt.Errorf("Failed")).Run(func(args mock.Arguments) {
makeItRunCalled = true
})
Expand Down
7 changes: 2 additions & 5 deletions internal/app/configupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ func (processor *ConfigUpdateProcessor) processConfigChangedStoreForwardEnabled(
return storeClient
},
})

sdk.runtime.Initialize(sdk.dic)
}

sdk.startStoreForward()
Expand All @@ -149,9 +147,8 @@ func (processor *ConfigUpdateProcessor) processConfigChangedPipeline() {
transforms, err := sdk.LoadConfigurablePipeline()
if err != nil {
sdk.LoggingClient().Error("unable to reload Configurable Pipeline from new configuration: " + err.Error())
// Reset the transforms so error occurs when attempting to execute the pipeline.
sdk.transforms = nil
sdk.runtime.SetTransforms(nil)
// Reset the default pipeline transforms to nil so error occurs when attempting to execute the pipeline.
_ = sdk.runtime.SetDefaultFunctionsPipeline(nil)
return
}

Expand Down
Loading

0 comments on commit 262cc6a

Please sign in to comment.