diff --git a/app-service-template/go.mod b/app-service-template/go.mod index 6f9e79f0e..182fb415d 100644 --- a/app-service-template/go.mod +++ b/app-service-template/go.mod @@ -6,7 +6,7 @@ go 1.17 require ( github.com/edgexfoundry/app-functions-sdk-go/v2 v2.1.0 - github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0-dev.25 + github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0-dev.26 github.com/google/uuid v1.3.0 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/stretchr/testify v1.7.1 @@ -19,7 +19,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/diegoholiveira/jsonlogic v1.0.1-0.20200220175622-ab7989be08b9 // indirect github.com/eclipse/paho.mqtt.golang v1.3.5 // indirect - github.com/edgexfoundry/go-mod-bootstrap/v2 v2.2.0-dev.13 // indirect + github.com/edgexfoundry/go-mod-bootstrap/v2 v2.2.0-dev.16 // indirect github.com/edgexfoundry/go-mod-configuration/v2 v2.2.0-dev.3 // indirect github.com/edgexfoundry/go-mod-messaging/v2 v2.2.0-dev.12 // indirect github.com/edgexfoundry/go-mod-registry/v2 v2.2.0-dev.3 // indirect diff --git a/app-service-template/go.sum b/app-service-template/go.sum index 1a529f4fa..ae6d10b33 100644 --- a/app-service-template/go.sum +++ b/app-service-template/go.sum @@ -32,12 +32,12 @@ github.com/diegoholiveira/jsonlogic v1.0.1-0.20200220175622-ab7989be08b9 h1:NAHC github.com/diegoholiveira/jsonlogic v1.0.1-0.20200220175622-ab7989be08b9/go.mod h1:9STzWAIpeXT1gYFvw0JM+BkyMmPKYv/ztBNgXX4hAOw= github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y= github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= -github.com/edgexfoundry/go-mod-bootstrap/v2 v2.2.0-dev.13 h1:KqMprarS3TFgWoSqNBCI3omvBC8MnRL34Rr92aUN2JE= -github.com/edgexfoundry/go-mod-bootstrap/v2 v2.2.0-dev.13/go.mod h1:eolD76cvsIAUzn63YN7MGyeOWWMidsFKCJjIpo+DK14= +github.com/edgexfoundry/go-mod-bootstrap/v2 v2.2.0-dev.16 h1:e6/XBUP88TVPUpKW+CiHdi6WNQfJape1FQq2ygXTMb0= +github.com/edgexfoundry/go-mod-bootstrap/v2 v2.2.0-dev.16/go.mod h1:6Q+rUmzAizdpU3sqnzMzSHmX7Z69qN69xxl0up/mQQA= github.com/edgexfoundry/go-mod-configuration/v2 v2.2.0-dev.3 h1:dTTExUFHza9eJmTABr8G4KOE8JKBMzZCVC3wARiwIg4= github.com/edgexfoundry/go-mod-configuration/v2 v2.2.0-dev.3/go.mod h1:YP17JhMnXTitowXE13QJwFaKo0oc03iyoKLjWAYl4FE= -github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0-dev.25 h1:EKkJf49mDUZF2N82bcHV//onD7qNTuZrNGWgP2byHqU= -github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0-dev.25/go.mod h1:jyfVSx7mI3u/o/oo10COxBRBvJ8O/9I3z2xAwPmNt/Q= +github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0-dev.26 h1:YebVI3gAJwFTMXKzB2erUonBC8eNrr+qLUGdtXFMjKs= +github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0-dev.26/go.mod h1:jyfVSx7mI3u/o/oo10COxBRBvJ8O/9I3z2xAwPmNt/Q= github.com/edgexfoundry/go-mod-messaging/v2 v2.2.0-dev.12 h1:u4ndXe8j1xYR1+axSgcgbD2OGvx7Z9v9thv0KBZp2TI= github.com/edgexfoundry/go-mod-messaging/v2 v2.2.0-dev.12/go.mod h1:+X6C0h8ZTJe+lLU2AGJfiAzCJK3zL+yM6cej9VC+79E= github.com/edgexfoundry/go-mod-registry/v2 v2.2.0-dev.3 h1:Z1sR4g+guLsUNJw3z3gxaz472wt0gYu1XCQ4frqJl/o= diff --git a/app-service-template/res/configuration.toml b/app-service-template/res/configuration.toml index 68757b591..4679e1fa4 100644 --- a/app-service-template/res/configuration.toml +++ b/app-service-template/res/configuration.toml @@ -25,7 +25,10 @@ LogLevel = "INFO" Interval = "30s" PublishTopicPrefix = "edgex/telemetry" # // will be added to this Publish Topic prefix [Writable.Telemetry.Metrics] # All service's metric names must be present in this list. - # TODO: Remove sample metric and implement meaningful metrics if any needed. + MessagesReceived = true + PipelineMessagesProcessed = true # Your pipeline IDs are added to this name for the actual metric name reported + PipelineMessageProcessingTime = true # Your pipeline IDs are added to this name for the actual metric name reported + # TODO: Remove sample custom metric and implement meaningful custom metrics if any needed. EventsConvertedToXML = true [Writable.Telemetry.Tags] # Contains the service level tags to be attached to all the service's metrics # Gateway="my-iot-gateway" # Tag must be added here or via Consul Env Override can only chnage existing value, not added new ones. diff --git a/go.mod b/go.mod index 006e55114..6be7898e5 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( bitbucket.org/bertimus9/systemstat v0.0.0-20180207000608-0eeff89b0690 github.com/diegoholiveira/jsonlogic v1.0.1-0.20200220175622-ab7989be08b9 github.com/eclipse/paho.mqtt.golang v1.3.5 - github.com/edgexfoundry/go-mod-bootstrap/v2 v2.2.0-dev.13 - github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0-dev.25 + github.com/edgexfoundry/go-mod-bootstrap/v2 v2.2.0-dev.16 + github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0-dev.26 github.com/edgexfoundry/go-mod-messaging/v2 v2.2.0-dev.12 github.com/edgexfoundry/go-mod-registry/v2 v2.2.0-dev.3 github.com/fxamacker/cbor/v2 v2.4.0 @@ -15,6 +15,7 @@ require ( github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 github.com/hashicorp/go-multierror v1.1.1 + github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/stretchr/testify v1.7.1 ) @@ -52,7 +53,6 @@ require ( github.com/pebbe/zmq4 v1.2.7 // indirect github.com/pelletier/go-toml v1.9.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect github.com/spiffe/go-spiffe/v2 v2.0.0 // indirect github.com/stretchr/objx v0.1.1 // indirect github.com/x448/float16 v0.8.4 // indirect diff --git a/go.sum b/go.sum index 58093eb15..af3dc4a89 100644 --- a/go.sum +++ b/go.sum @@ -32,12 +32,12 @@ github.com/diegoholiveira/jsonlogic v1.0.1-0.20200220175622-ab7989be08b9 h1:NAHC github.com/diegoholiveira/jsonlogic v1.0.1-0.20200220175622-ab7989be08b9/go.mod h1:9STzWAIpeXT1gYFvw0JM+BkyMmPKYv/ztBNgXX4hAOw= github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y= github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= -github.com/edgexfoundry/go-mod-bootstrap/v2 v2.2.0-dev.13 h1:KqMprarS3TFgWoSqNBCI3omvBC8MnRL34Rr92aUN2JE= -github.com/edgexfoundry/go-mod-bootstrap/v2 v2.2.0-dev.13/go.mod h1:eolD76cvsIAUzn63YN7MGyeOWWMidsFKCJjIpo+DK14= +github.com/edgexfoundry/go-mod-bootstrap/v2 v2.2.0-dev.16 h1:e6/XBUP88TVPUpKW+CiHdi6WNQfJape1FQq2ygXTMb0= +github.com/edgexfoundry/go-mod-bootstrap/v2 v2.2.0-dev.16/go.mod h1:6Q+rUmzAizdpU3sqnzMzSHmX7Z69qN69xxl0up/mQQA= github.com/edgexfoundry/go-mod-configuration/v2 v2.2.0-dev.3 h1:dTTExUFHza9eJmTABr8G4KOE8JKBMzZCVC3wARiwIg4= github.com/edgexfoundry/go-mod-configuration/v2 v2.2.0-dev.3/go.mod h1:YP17JhMnXTitowXE13QJwFaKo0oc03iyoKLjWAYl4FE= -github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0-dev.25 h1:EKkJf49mDUZF2N82bcHV//onD7qNTuZrNGWgP2byHqU= -github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0-dev.25/go.mod h1:jyfVSx7mI3u/o/oo10COxBRBvJ8O/9I3z2xAwPmNt/Q= +github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0-dev.26 h1:YebVI3gAJwFTMXKzB2erUonBC8eNrr+qLUGdtXFMjKs= +github.com/edgexfoundry/go-mod-core-contracts/v2 v2.2.0-dev.26/go.mod h1:jyfVSx7mI3u/o/oo10COxBRBvJ8O/9I3z2xAwPmNt/Q= github.com/edgexfoundry/go-mod-messaging/v2 v2.2.0-dev.12 h1:u4ndXe8j1xYR1+axSgcgbD2OGvx7Z9v9thv0KBZp2TI= github.com/edgexfoundry/go-mod-messaging/v2 v2.2.0-dev.12/go.mod h1:+X6C0h8ZTJe+lLU2AGJfiAzCJK3zL+yM6cej9VC+79E= github.com/edgexfoundry/go-mod-registry/v2 v2.2.0-dev.3 h1:Z1sR4g+guLsUNJw3z3gxaz472wt0gYu1XCQ4frqJl/o= diff --git a/internal/app/service_test.go b/internal/app/service_test.go index c2211b091..63d1840ce 100644 --- a/internal/app/service_test.go +++ b/internal/app/service_test.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/google/uuid" + "github.com/stretchr/testify/mock" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces/mocks" @@ -57,6 +58,9 @@ var baseUrl = "http://localhost:" func TestMain(m *testing.M) { // No remote and no file results in STDOUT logging only lc = logger.NewMockClient() + mockMetricsManager := &mocks.MetricsManager{} + mockMetricsManager.On("Register", mock.Anything, mock.Anything, mock.Anything).Return(nil) + dic = di.NewContainer(di.ServiceConstructorMap{ bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} { return lc @@ -64,6 +68,9 @@ func TestMain(m *testing.M) { container.ConfigurationName: func(get di.Get) interface{} { return &common.ConfigurationStruct{} }, + bootstrapContainer.MetricsManagerInterfaceName: func(get di.Get) interface{} { + return mockMetricsManager + }, }) target = NewService("unitTest", nil, "") @@ -236,7 +243,8 @@ func TestAddBackgroundPublisher_HTTP(t *testing.T) { func TestSetupHTTPTrigger(t *testing.T) { sdk := Service{ - lc: lc, + lc: lc, + dic: dic, config: &common.ConfigurationStruct{ Trigger: common.TriggerInfo{ Type: "htTp", @@ -256,7 +264,8 @@ func TestSetupHTTPTrigger(t *testing.T) { func TestSetupMessageBusTrigger(t *testing.T) { sdk := Service{ - lc: lc, + lc: lc, + dic: dic, config: &common.ConfigurationStruct{ Trigger: common.TriggerInfo{ Type: TriggerTypeMessageBus, @@ -275,7 +284,8 @@ func TestSetupMessageBusTrigger(t *testing.T) { func TestSetDefaultFunctionsPipelineNoTransforms(t *testing.T) { sdk := Service{ - lc: lc, + lc: lc, + dic: dic, config: &common.ConfigurationStruct{ Trigger: common.TriggerInfo{ Type: TriggerTypeMessageBus, @@ -290,6 +300,7 @@ func TestSetDefaultFunctionsPipelineNoTransforms(t *testing.T) { func TestSetDefaultFunctionsPipelineOneTransform(t *testing.T) { service := Service{ lc: lc, + dic: dic, runtime: runtime.NewGolangRuntime("", nil, dic), config: &common.ConfigurationStruct{ Trigger: common.TriggerInfo{ @@ -891,16 +902,8 @@ func TestService_SubscriptionClient(t *testing.T) { } func TestService_MetricsManager(t *testing.T) { + // MetricsManager Mock added to DIC in TestMain() actual := target.MetricsManager() - assert.Nil(t, actual) - - dic.Update(di.ServiceConstructorMap{ - bootstrapContainer.MetricsManagerInterfaceName: func(get di.Get) interface{} { - return &mocks.MetricsManager{} - }, - }) - - actual = target.MetricsManager() assert.NotNil(t, actual) } diff --git a/internal/app/triggerfactory.go b/internal/app/triggerfactory.go index deafc6b95..6973bf69e 100644 --- a/internal/app/triggerfactory.go +++ b/internal/app/triggerfactory.go @@ -22,6 +22,9 @@ import ( "fmt" "strings" + gometrics "github.com/rcrowley/go-metrics" + + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal" "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/common" "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/http" "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/messagebus" @@ -40,7 +43,15 @@ func (svc *Service) setupTrigger(configuration *common.ConfigurationStruct) inte var t interfaces.Trigger bnd := NewTriggerServiceBinding(svc) - mp := &triggerMessageProcessor{bnd} + mp := &triggerMessageProcessor{ + bnd: bnd, + messagesReceived: gometrics.NewCounter()} + + if err := svc.MetricsManager().Register(internal.MessagesReceivedName, mp.messagesReceived, nil); err != nil { + svc.lc.Warnf("%s metric failed to register and will not be reported: %s", internal.MessagesReceivedName, err.Error()) + } else { + svc.lc.Infof("%s metric has been registered and will be reported", internal.MessagesReceivedName) + } switch triggerType := strings.ToUpper(configuration.Trigger.Type); triggerType { case TriggerTypeHTTP: @@ -88,7 +99,15 @@ func (svc *Service) RegisterCustomTriggerFactory(name string, svc.customTriggerFactories[nu] = func(sdk *Service) (interfaces.Trigger, error) { binding := NewTriggerServiceBinding(sdk) - messageProcessor := &triggerMessageProcessor{binding} + messageProcessor := &triggerMessageProcessor{ + bnd: binding, + messagesReceived: gometrics.NewCounter()} + + if err := svc.MetricsManager().Register(internal.MessagesReceivedName, messageProcessor.messagesReceived, nil); err != nil { + svc.lc.Warnf("%s metric failed to register and will not be reported: %s", internal.MessagesReceivedName, err.Error()) + } else { + svc.lc.Infof("%s metric has been registered and will be reported", internal.MessagesReceivedName) + } cfg := interfaces.TriggerConfig{ Logger: sdk.lc, diff --git a/internal/app/triggerfactory_test.go b/internal/app/triggerfactory_test.go index 3ffd73ea3..334bf739c 100644 --- a/internal/app/triggerfactory_test.go +++ b/internal/app/triggerfactory_test.go @@ -77,7 +77,10 @@ func TestRegisterCustomTrigger(t *testing.T) { builder := func(c interfaces.TriggerConfig) (interfaces.Trigger, error) { return &trig, nil } - sdk := Service{config: &common.ConfigurationStruct{}} + sdk := Service{ + config: &common.ConfigurationStruct{}, + lc: logger.NewMockClient(), + dic: dic} err := sdk.RegisterCustomTriggerFactory(name, builder) @@ -99,7 +102,8 @@ func TestSetupTrigger_HTTP(t *testing.T) { Type: TriggerTypeHTTP, }, }, - lc: logger.MockLogger{}, + lc: logger.MockLogger{}, + dic: dic, } trigger := sdk.setupTrigger(sdk.config) @@ -115,7 +119,8 @@ func TestSetupTrigger_EdgeXMessageBus(t *testing.T) { Type: TriggerTypeMessageBus, }, }, - lc: logger.MockLogger{}, + lc: logger.MockLogger{}, + dic: dic, } trigger := sdk.setupTrigger(sdk.config) @@ -165,7 +170,8 @@ func Test_Service_setupTrigger_CustomType(t *testing.T) { Type: triggerName, }, }, - lc: logger.MockLogger{}, + lc: logger.MockLogger{}, + dic: dic, } err := sdk.RegisterCustomTriggerFactory(triggerName, func(c interfaces.TriggerConfig) (interfaces.Trigger, error) { @@ -188,7 +194,8 @@ func Test_Service_SetupTrigger_CustomTypeError(t *testing.T) { Type: triggerName, }, }, - lc: logger.MockLogger{}, + lc: logger.MockLogger{}, + dic: dic, } err := sdk.RegisterCustomTriggerFactory(triggerName, func(c interfaces.TriggerConfig) (interfaces.Trigger, error) { @@ -210,7 +217,8 @@ func Test_Service_SetupTrigger_CustomTypeNotFound(t *testing.T) { Type: triggerName, }, }, - lc: logger.MockLogger{}, + lc: logger.MockLogger{}, + dic: dic, } trigger := sdk.setupTrigger(sdk.config) diff --git a/internal/app/triggermessageprocessor.go b/internal/app/triggermessageprocessor.go index 53d0d645e..2a21fa0d5 100644 --- a/internal/app/triggermessageprocessor.go +++ b/internal/app/triggermessageprocessor.go @@ -18,17 +18,20 @@ package app import ( "fmt" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/common" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger" - "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" + "sync" + "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/messaging" "github.com/edgexfoundry/go-mod-bootstrap/v2/di" "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" "github.com/hashicorp/go-multierror" - "sync" + gometrics "github.com/rcrowley/go-metrics" + + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/common" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger" + "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" ) type simpleTriggerServiceBinding struct { @@ -61,12 +64,14 @@ func (b *simpleTriggerServiceBinding) Config() *common.ConfigurationStruct { // triggerMessageProcessor wraps the ServiceBinding interface so that we can attach methods type triggerMessageProcessor struct { - bnd trigger.ServiceBinding + bnd trigger.ServiceBinding + messagesReceived gometrics.Counter } // Process provides runtime orchestration to pass the envelope / context to the pipeline. // Deprecated: This does NOT support multi-pipeline usage. Will send a message to the default pipeline ONLY and throw if not configured. Use MessageReceived. func (mp *triggerMessageProcessor) Process(ctx interfaces.AppFunctionContext, envelope types.MessageEnvelope) error { + mp.messagesReceived.Inc(1) context, ok := ctx.(*appfunction.Context) if !ok { return fmt.Errorf("App Context must be an instance of internal appfunction.Context. Use NewAppContext to create instance.") @@ -89,8 +94,8 @@ func (mp *triggerMessageProcessor) Process(ctx interfaces.AppFunctionContext, en // MessageReceived provides runtime orchestration to pass the envelope / context to configured pipeline(s) along with a response callback to execute on each completion. func (mp *triggerMessageProcessor) MessageReceived(ctx interfaces.AppFunctionContext, envelope types.MessageEnvelope, responseHandler interfaces.PipelineResponseHandler) error { + mp.messagesReceived.Inc(1) lc := mp.bnd.LoggingClient() - lc.Debugf("trigger attempting to find pipeline(s) for topic %s", envelope.ReceivedTopic) // ensure we have a context established that we can safely cast to *appfunction.Context to pass to runtime @@ -109,6 +114,8 @@ func (mp *triggerMessageProcessor) MessageReceived(ctx interfaces.AppFunctionCon for _, pipeline := range pipelines { pipelinesWaitGroup.Add(1) + pipeline.MessagesProcessed.Inc(1) + go func(p *interfaces.FunctionPipeline, wg *sync.WaitGroup, errCollector func(error)) { defer wg.Done() diff --git a/internal/app/triggermessageprocessor_test.go b/internal/app/triggermessageprocessor_test.go index 2e883f32a..7f068d29b 100644 --- a/internal/app/triggermessageprocessor_test.go +++ b/internal/app/triggermessageprocessor_test.go @@ -18,18 +18,21 @@ package app import ( "fmt" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction" - "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime" - triggerMocks "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/mocks" - "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" + "testing" + "github.com/edgexfoundry/go-mod-bootstrap/v2/di" "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" "github.com/google/uuid" "github.com/hashicorp/go-multierror" + gometrics "github.com/rcrowley/go-metrics" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "testing" + + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/runtime" + triggerMocks "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/trigger/mocks" + "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" ) func Test_simpleTriggerServiceBinding_BuildContext(t *testing.T) { @@ -76,7 +79,7 @@ func Test_triggerMessageProcessor_MessageReceived(t *testing.T) { { name: "single pipeline", setup: returns{ - pipelineMatcher: []*interfaces.FunctionPipeline{{}}, + pipelineMatcher: []*interfaces.FunctionPipeline{{MessagesProcessed: gometrics.NewCounter()}}, runtimeProcessor: nil, }, args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}}, @@ -85,7 +88,7 @@ func Test_triggerMessageProcessor_MessageReceived(t *testing.T) { { name: "single pipeline error", setup: returns{ - pipelineMatcher: []*interfaces.FunctionPipeline{{}}, + pipelineMatcher: []*interfaces.FunctionPipeline{{MessagesProcessed: gometrics.NewCounter()}}, runtimeProcessor: &runtime.MessageError{Err: fmt.Errorf("some error")}, }, args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}}, @@ -94,7 +97,7 @@ func Test_triggerMessageProcessor_MessageReceived(t *testing.T) { { name: "multi pipeline", setup: returns{ - pipelineMatcher: []*interfaces.FunctionPipeline{{}, {}, {}}, + pipelineMatcher: []*interfaces.FunctionPipeline{{MessagesProcessed: gometrics.NewCounter()}, {MessagesProcessed: gometrics.NewCounter()}, {MessagesProcessed: gometrics.NewCounter()}}, }, args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}}, wantErr: 0, @@ -102,7 +105,7 @@ func Test_triggerMessageProcessor_MessageReceived(t *testing.T) { { name: "multi pipeline single err", setup: returns{ - pipelineMatcher: []*interfaces.FunctionPipeline{{}, {Id: "errorid"}, {}}, + pipelineMatcher: []*interfaces.FunctionPipeline{{MessagesProcessed: gometrics.NewCounter()}, {Id: "errorid", MessagesProcessed: gometrics.NewCounter()}, {MessagesProcessed: gometrics.NewCounter()}}, runtimeProcessor: func(appContext *appfunction.Context, envelope types.MessageEnvelope, pipeline *interfaces.FunctionPipeline) *runtime.MessageError { if pipeline.Id == "errorid" { return &runtime.MessageError{Err: fmt.Errorf("new error")} @@ -116,7 +119,7 @@ func Test_triggerMessageProcessor_MessageReceived(t *testing.T) { { name: "multi pipeline multi err", setup: returns{ - pipelineMatcher: []*interfaces.FunctionPipeline{{}, {}, {}}, + pipelineMatcher: []*interfaces.FunctionPipeline{{MessagesProcessed: gometrics.NewCounter()}, {MessagesProcessed: gometrics.NewCounter()}, {MessagesProcessed: gometrics.NewCounter()}}, runtimeProcessor: &runtime.MessageError{Err: fmt.Errorf("new error")}, }, args: args{envelope: types.MessageEnvelope{CorrelationID: uuid.NewString(), ContentType: uuid.NewString(), ReceivedTopic: uuid.NewString()}, ctx: &appfunction.Context{}}, @@ -133,7 +136,8 @@ func Test_triggerMessageProcessor_MessageReceived(t *testing.T) { tsb.On("LoggingClient").Return(lc) bnd := &triggerMessageProcessor{ - &tsb, + bnd: &tsb, + messagesReceived: gometrics.NewCounter(), } var rh interfaces.PipelineResponseHandler diff --git a/internal/constant.go b/internal/constant.go index f173480d1..ee16adaaf 100644 --- a/internal/constant.go +++ b/internal/constant.go @@ -32,3 +32,11 @@ var SDKVersion = "0.0.0" // ApplicationVersion indicates the version of the application itself, not the SDK - will be overwritten by build var ApplicationVersion = "0.0.0" + +// Names for the Common Application Service Metrics +const ( + MessagesReceivedName = "MessagesReceived" + PipelineIdTxt = "{PipelineId}" + PipelineMessagesProcessedName = "PipelineMessagesProcessed-" + PipelineIdTxt + PipelineMessageProcessingTimeName = "PipelineMessageProcessingTime-" + PipelineIdTxt +) diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go index f24fc8ea6..cf07b4c6f 100644 --- a/internal/runtime/runtime.go +++ b/internal/runtime/runtime.go @@ -29,7 +29,9 @@ import ( "sync" bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" + gometrics "github.com/rcrowley/go-metrics" + "github.com/edgexfoundry/app-functions-sdk-go/v2/internal" "github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" @@ -52,10 +54,12 @@ const ( func NewFunctionPipeline(id string, topics []string, transforms []interfaces.AppFunction) interfaces.FunctionPipeline { pipeline := interfaces.FunctionPipeline{ - Id: id, - Transforms: transforms, - Topics: topics, - Hash: calculatePipelineHash(transforms), + Id: id, + Transforms: transforms, + Topics: topics, + Hash: calculatePipelineHash(transforms), + MessagesProcessed: gometrics.NewCounter(), + MessageProcessingTime: gometrics.NewTimer(), } return pipeline @@ -131,7 +135,25 @@ func (gr *GolangRuntime) AddFunctionsPipeline(id string, topics []string, transf return fmt.Errorf("pipeline with Id='%s' already exists", id) } - gr.addFunctionsPipeline(id, topics, transforms) + pipeline := gr.addFunctionsPipeline(id, topics, transforms) + + lc := bootstrapContainer.LoggingClientFrom(gr.dic.Get) + metricManager := bootstrapContainer.MetricsManagerFrom(gr.dic.Get) + name := strings.Replace(internal.PipelineMessagesProcessedName, internal.PipelineIdTxt, pipeline.Id, 1) + err := metricManager.Register(name, pipeline.MessagesProcessed, nil) + if err != nil { + lc.Warnf("Unable to register %s metric. Metric will not be reported : %s", name, err.Error()) + } else { + lc.Infof("%s metric has been registered and will be reported", name) + } + + name = strings.Replace(internal.PipelineMessageProcessingTimeName, internal.PipelineIdTxt, pipeline.Id, 1) + err = metricManager.Register(name, pipeline.MessageProcessingTime, nil) + if err != nil { + lc.Warnf("Unable to register %s metric. Metric will not be reported : %s", name, err.Error()) + } else { + lc.Infof("%s metric has been registered and will be reported", name) + } return nil } diff --git a/internal/runtime/storeforward_test.go b/internal/runtime/storeforward_test.go index 26b00a5aa..88133e897 100644 --- a/internal/runtime/storeforward_test.go +++ b/internal/runtime/storeforward_test.go @@ -18,10 +18,13 @@ package runtime import ( "errors" - "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces/mocks" "os" "testing" + mocks2 "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces/mocks" + + "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces/mocks" + bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v2/di" "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" @@ -47,6 +50,9 @@ func TestMain(m *testing.M) { }, } + mockMetricsManager := &mocks2.MetricsManager{} + mockMetricsManager.On("Register", mock.Anything, mock.Anything, mock.Anything).Return(nil) + dic = di.NewContainer(di.ServiceConstructorMap{ container.ConfigurationName: func(get di.Get) interface{} { return &config @@ -54,6 +60,9 @@ func TestMain(m *testing.M) { bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} { return logger.NewMockClient() }, + bootstrapContainer.MetricsManagerInterfaceName: func(get di.Get) interface{} { + return mockMetricsManager + }, }) os.Exit(m.Run()) diff --git a/pkg/interfaces/service.go b/pkg/interfaces/service.go index 07a86da8b..ee3a0696c 100644 --- a/pkg/interfaces/service.go +++ b/pkg/interfaces/service.go @@ -20,6 +20,7 @@ import ( "time" "github.com/edgexfoundry/go-mod-bootstrap/v2/config" + gometrics "github.com/rcrowley/go-metrics" bootstrapInterfaces "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces" "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/interfaces" @@ -56,6 +57,9 @@ type FunctionPipeline struct { Topics []string // Hash of the list of transforms set and used internally for Store and Forward Hash string + + MessagesProcessed gometrics.Counter + MessageProcessingTime gometrics.Timer } // UpdatableConfig interface allows services to have custom configuration populated from configuration stored