Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Pipeline per Topic capability #938

Merged
merged 3 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ GO=CGO_ENABLED=1 GO111MODULE=on go
build:
make -C ./app-service-template build

tidy:
go mod tidy

test-template:
make -C ./app-service-template test

Expand Down
9 changes: 4 additions & 5 deletions app-service-template/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ GOFLAGS=-ldflags "-X github.com/edgexfoundry/app-functions-sdk-go/v2/internal.SD
#GIT_SHA=$(shell git rev-parse HEAD)
GIT_SHA=no-sha

# `go mod tidy` is needed for the Depend a Bot PRs which don't update the go.sum in this folder, causing tests to fail
# in the pipeline since the tests call this make build.
# TODO: Remove `go mod tidy` when creating your new app services using this template
build:
go mod tidy
build: tidy
$(GO) build $(GOFLAGS) -o $(MICROSERVICE)

tidy:
go mod tidy

# TODO: Change the registries (edgexfoundry, nexus3.edgexfoundry.org:10004) below as needed.
# Leave them as is if service is to be upstreamed to EdgeX Foundry
# NOTE: This is only used for local development. Jenkins CI does not use this make target
Expand Down
41 changes: 23 additions & 18 deletions app-service-template/functions/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package functions

import (
"errors"
"fmt"
"strings"

"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces"
Expand All @@ -28,49 +28,54 @@ import (

// TODO: Create your custom type and function(s) and remove these samples

// NewSample ...
// TODO: Add parameters that the function(s) will need each time one is executed
func NewSample() Sample {
return Sample{}
}

// Sample ...
type Sample struct {
// TODO: Add properties that the function(s) will need each time one is executed
}

// LogEventDetails is example of processing an Event and passing the original Event to to next function in the pipeline
// LogEventDetails is example of processing an Event and passing the original Event to next function in the pipeline
// For more details on the Context API got here: https://docs.edgexfoundry.org/1.3/microservices/application/ContextAPI/
func (s *Sample) LogEventDetails(ctx interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
lc := ctx.LoggingClient()
lc.Debug("LogEventDetails called")
lc.Debugf("LogEventDetails called in pipeline '%s'", ctx.PipelineId())

if data == nil {
// Go here for details on Error Handle: https://docs.edgexfoundry.org/1.3/microservices/application/ErrorHandling/
return false, errors.New("no Event Received")
return false, fmt.Errorf("function LogEventDetails in pipeline '%s': No Data Received", ctx.PipelineId())
}

event, ok := data.(dtos.Event)
if !ok {
return false, errors.New("type received is not an Event")
return false, fmt.Errorf("function LogEventDetails in pipeline '%s', type received is not an Event", ctx.PipelineId())
}

lc.Infof("Event received: ID=%s, Device=%s, and ReadingCount=%d",
lc.Infof("Event received in pipeline '%s': ID=%s, Device=%s, and ReadingCount=%d",
ctx.PipelineId(),
event.Id,
event.DeviceName,
len(event.Readings))
for index, reading := range event.Readings {
switch strings.ToLower(reading.ValueType) {
case strings.ToLower(common.ValueTypeBinary):
lc.Infof(
"Reading #%d received with ID=%s, Resource=%s, ValueType=%s, MediaType=%s and BinaryValue of size=`%d`",
"Reading #%d received in pipeline '%s' with ID=%s, Resource=%s, ValueType=%s, MediaType=%s and BinaryValue of size=`%d`",
index+1,
ctx.PipelineId(),
reading.Id,
reading.ResourceName,
reading.ValueType,
reading.MediaType,
len(reading.BinaryValue))
default:
lc.Infof("Reading #%d received with ID=%s, Resource=%s, ValueType=%s, Value=`%s`",
lc.Infof("Reading #%d received in pipeline '%s' with ID=%s, Resource=%s, ValueType=%s, Value=`%s`",
index+1,
ctx.PipelineId(),
reading.Id,
reading.ResourceName,
reading.ValueType,
Expand All @@ -83,29 +88,29 @@ func (s *Sample) LogEventDetails(ctx interfaces.AppFunctionContext, data interfa
return true, event
}

// ConvertEventToXML is example of transforming an Event and passing the transformed data to to next function in the pipeline
// ConvertEventToXML is example of transforming an Event and passing the transformed data to next function in the pipeline
func (s *Sample) ConvertEventToXML(ctx interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
lc := ctx.LoggingClient()
lc.Debug("ConvertEventToXML called")
lc.Debugf("ConvertEventToXML called in pipeline '%s'", ctx.PipelineId())

if data == nil {
return false, errors.New("no Event Received")
return false, fmt.Errorf("function ConvertEventToXML in pipeline '%s': No Data Received", ctx.PipelineId())
}

event, ok := data.(dtos.Event)
if !ok {
return false, errors.New("type received is not an Event")
return false, fmt.Errorf("function ConvertEventToXML in pipeline '%s': type received is not an Event", ctx.PipelineId())
}

xml, err := event.ToXML()
if err != nil {
return false, errors.New("failed to convert event to XML")
return false, fmt.Errorf("function ConvertEventToXML in pipeline '%s': failed to convert event to XML", ctx.PipelineId())
}

// Example of DEBUG message which by default you don't want to be logged.
// To see debug log messages, Set WRITABLE_LOGLEVEL=DEBUG environment variable or
// change LogLevel in configuration.toml before running app service.
lc.Debug("Event converted to XML: " + xml)
lc.Debugf("Event converted to XML in pipeline '%s': %s", ctx.PipelineId(), xml)

// Returning true indicates that the pipeline execution should continue with the next function
// using the event passed as input in this case.
Expand All @@ -115,18 +120,18 @@ func (s *Sample) ConvertEventToXML(ctx interfaces.AppFunctionContext, data inter
// OutputXML is an example of processing transformed data
func (s *Sample) OutputXML(ctx interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
lc := ctx.LoggingClient()
lc.Debug("OutputXML called")
lc.Debugf("OutputXML called in pipeline '%s'", ctx.PipelineId())

if data == nil {
return false, errors.New("no XML Received")
return false, fmt.Errorf("function OutputXML in pipeline '%s': No Data Received", ctx.PipelineId())
}

xml, ok := data.(string)
if !ok {
return false, errors.New("type received is not an string")
return false, fmt.Errorf("function ConvertEventToXML in pipeline '%s': type received is not an string", ctx.PipelineId())
}

lc.Debugf("Outputting the following XML: %s", xml)
lc.Debugf("Outputting the following XML in pipeline '%s': %s", ctx.PipelineId(), xml)

// This sends the XML as a response. i.e. publish for MessageBus/MQTT triggers as configured or
// HTTP response to for the HTTP Trigger
Expand Down
2 changes: 1 addition & 1 deletion app-service-template/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go 1.15
require (
github.com/edgexfoundry/app-functions-sdk-go/v2 v2.0.1
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.0
github.com/google/uuid v1.2.0
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.7.0
)

Expand Down
6 changes: 4 additions & 2 deletions app-service-template/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fxamacker/cbor/v2 v2.2.0 h1:6eXqdDDe588rSYAi1HfZKbx6YYQO4mxQ9eC6xYpU/JQ=
github.com/fxamacker/cbor/v2 v2.2.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/fxamacker/cbor/v2 v2.3.0 h1:aM45YGMctNakddNNAezPxDUpv38j44Abh+hifNuqXik=
github.com/fxamacker/cbor/v2 v2.3.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA=
Expand All @@ -57,8 +58,9 @@ github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNu
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
Expand Down
38 changes: 35 additions & 3 deletions app-service-template/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
return -1
}

// TODO: Replace below functions with built in and/or your custom functions for your use case.
// See https://docs.edgexfoundry.org/2.0/microservices/application/BuiltIn/ for list of built-in functions
sample := functions.NewSample()

// TODO: Replace below functions with built in and/or your custom functions for your use case
// or remove is using Pipeline By Topic below.
// See https://docs.edgexfoundry.org/2.0/microservices/application/BuiltIn/ for list of built-in functions
err = app.service.SetFunctionsPipeline(
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
Expand All @@ -110,6 +112,36 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
return -1
}

// TODO: Remove adding functions pipelines by topic if default pipeline above is all your Use Case needs.
// Or remove default above if your use case needs multiple pipelines by topic.
// Example of adding functions pipelines by topic.
// These pipelines will only execute if the specified topic match the incoming topic.
// Note: Device services publish to the 'edgex/events/device/<profile-name>/<device-name>/<source-name>' topic
// Core Data publishes to the 'edgex/events/core/<profile-name>/<device-name>/<source-name>' topic
// Note: This example with default above causes Events from Random-Float-Device device to be processed twice
// resulting in the XML to be published back to the MessageBus twice.
// See <Pipeline By Topic documentation url TBD> for more details.
err = app.service.AddFunctionsPipelineForTopic("Floats", "edgex/events/#/#/Random-Float-Device/#",
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
sample.ConvertEventToXML,
sample.OutputXML)
if err != nil {
app.lc.Errorf("AddFunctionsPipelineForTopic returned error: %s", err.Error())
return -1
}
// Note: This example with default above causes Events from Int32 source to be processed twice
// resulting in the XML to be published back to the MessageBus twice.
err = app.service.AddFunctionsPipelineForTopic("Int32s", "edgex/events/#/#/#/Int32",
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
sample.ConvertEventToXML,
sample.OutputXML)
if err != nil {
app.lc.Errorf("AddFunctionsPipelineForTopic returned error: %s", err.Error())
return -1
}

if err := app.service.MakeItRun(); err != nil {
app.lc.Errorf("MakeItRun returned error: %s", err.Error())
return -1
Expand All @@ -120,10 +152,10 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
return 0
}

// TODO: Update using your Custom configuration 'writeable' type or remove if not using ListenForCustomConfigChanges
// ProcessConfigUpdates processes the updated configuration for the service's writable configuration.
// At a minimum it must copy the updated configuration into the service's current configuration. Then it can
// do any special processing for changes that require more.
// TODO: Update using your Custom configuration 'writeable' type or remove if not using ListenForCustomConfigChanges
func (app *myApp) ProcessConfigUpdates(rawWritableConfig interface{}) {
updated, ok := rawWritableConfig.(*config.AppCustomConfig)
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions app-service-template/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func TestCreateAndRunService_Success(t *testing.T) {
Return([]string{"Random-Boolean-Device, Random-Integer-Device"}, nil)
mockAppService.On("SetFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("AddFunctionsPipelineForTopic", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("LoadCustomConfig", mock.Anything, mock.Anything, mock.Anything).
Return(nil).Run(func(args mock.Arguments) {
// set the required configuration so validation passes
Expand Down Expand Up @@ -148,6 +150,8 @@ func TestCreateAndRunService_MakeItRun_Failed(t *testing.T) {
Return(nil)
mockAppService.On("SetFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("AddFunctionsPipelineForTopic", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("MakeItRun").Return(fmt.Errorf("Failed")).Run(func(args mock.Arguments) {
makeItRunCalled = true
})
Expand Down
7 changes: 2 additions & 5 deletions internal/app/configupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ func (processor *ConfigUpdateProcessor) processConfigChangedStoreForwardEnabled(
return storeClient
},
})

sdk.runtime.Initialize(sdk.dic)
}

sdk.startStoreForward()
Expand All @@ -149,9 +147,8 @@ func (processor *ConfigUpdateProcessor) processConfigChangedPipeline() {
transforms, err := sdk.LoadConfigurablePipeline()
if err != nil {
sdk.LoggingClient().Error("unable to reload Configurable Pipeline from new configuration: " + err.Error())
// Reset the transforms so error occurs when attempting to execute the pipeline.
sdk.transforms = nil
sdk.runtime.SetTransforms(nil)
// Reset the default pipeline transforms to nil so error occurs when attempting to execute the pipeline.
_ = sdk.runtime.SetFunctionsPipeline(nil)
return
}

Expand Down
Loading