diff --git a/beater/beater.go b/beater/beater.go index 89e364fb2d..1114ef37fe 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/transport" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/go-ucfg" "github.com/pkg/errors" @@ -479,6 +480,7 @@ func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) R modelprocessor.SetErrorMessage{}, newObserverBatchProcessor(s.beat.Info), model.ProcessBatchFunc(ecsVersionBatchProcessor), + modelprocessor.NewEventCounter(monitoring.Default.GetRegistry("apm-server")), } if s.config.DefaultServiceEnvironment != "" { processors = append(processors, &modelprocessor.SetDefaultServiceEnvironment{ diff --git a/model/error.go b/model/error.go index 0c3a8c61e4..5f726c676b 100644 --- a/model/error.go +++ b/model/error.go @@ -19,15 +19,9 @@ package model import ( "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/monitoring" ) var ( - errorMetrics = monitoring.Default.NewRegistry("apm-server.processor.error") - errorTransformations = monitoring.NewInt(errorMetrics, "transformations") - errorStacktraceCounter = monitoring.NewInt(errorMetrics, "stacktraces") - errorFrameCounter = monitoring.NewInt(errorMetrics, "frames") - // ErrorProcessor is the Processor value that should be assigned to error events. ErrorProcessor = Processor{Name: "error", Event: "error"} ) @@ -78,15 +72,6 @@ type Log struct { } func (e *Error) fields() common.MapStr { - errorTransformations.Inc() - - if e.Exception != nil { - addStacktraceCounter(e.Exception.Stacktrace) - } - if e.Log != nil { - addStacktraceCounter(e.Log.Stacktrace) - } - var fields mapStr if e.HTTP != nil { fields.maybeSetMapStr("http", e.HTTP.transactionTopLevelFields()) @@ -165,13 +150,6 @@ func (e *Error) logFields() common.MapStr { return common.MapStr(log) } -func addStacktraceCounter(st Stacktrace) { - if frames := len(st); frames > 0 { - errorStacktraceCounter.Inc() - errorFrameCounter.Add(int64(frames)) - } -} - // flattenExceptionTree recursively traverses the causes of an exception to return a slice of exceptions. // Tree traversal is Depth First. // The parent of a exception in the resulting slice is at the position indicated by the `parent` property diff --git a/model/metricset.go b/model/metricset.go index 4e78c516cd..94f9724a77 100644 --- a/model/metricset.go +++ b/model/metricset.go @@ -19,7 +19,6 @@ package model import ( "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/monitoring" ) const ( @@ -28,9 +27,6 @@ const ( ) var ( - metricsetMetrics = monitoring.Default.NewRegistry("apm-server.processor.metric") - metricsetTransformations = monitoring.NewInt(metricsetMetrics, "transformations") - // MetricsetProcessor is the Processor value that should be assigned to metricset events. MetricsetProcessor = Processor{Name: "metric", Event: "metric"} ) @@ -138,8 +134,6 @@ type MetricsetSpan struct { } func (me *Metricset) fields() common.MapStr { - metricsetTransformations.Inc() - var fields mapStr fields.maybeSetMapStr("transaction", me.Transaction.fields()) fields.maybeSetMapStr("span", me.Span.fields()) diff --git a/model/modelprocessor/eventcounter.go b/model/modelprocessor/eventcounter.go new file mode 100644 index 0000000000..5f2294e8f6 --- /dev/null +++ b/model/modelprocessor/eventcounter.go @@ -0,0 +1,75 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelprocessor + +import ( + "context" + "sync" + + "github.com/elastic/beats/v7/libbeat/monitoring" + + "github.com/elastic/apm-server/model" +) + +// EventCounter is a model.BatchProcessor that counts the number of events processed, +// recording the counts as metrics in a monitoring.Registry. +// +// Metrics are named after the event type: `processor..transformations`. +// These metrics are used to populate the "Processed Events" graphs in Stack Monitoring. +type EventCounter struct { + registry *monitoring.Registry + + mu sync.RWMutex + eventCounters map[string]*monitoring.Int +} + +// NewEventCounter returns an EventCounter that counts events processed, recording +// them as `.transformations` under the given registry. +func NewEventCounter(registry *monitoring.Registry) *EventCounter { + return &EventCounter{ + registry: registry, + eventCounters: make(map[string]*monitoring.Int), + } +} + +// ProcessBatch counts events in b, grouping by APMEvent.Processor.Event. +func (c *EventCounter) ProcessBatch(ctx context.Context, b *model.Batch) error { + for _, event := range *b { + pe := event.Processor.Event + if pe == "" { + continue + } + c.mu.RLock() + eventCounter := c.eventCounters[pe] + c.mu.RUnlock() + if eventCounter == nil { + c.mu.Lock() + eventCounter = c.eventCounters[pe] + if eventCounter == nil { + eventCounter = monitoring.NewInt( + c.registry, + "processor."+pe+".transformations", + ) + c.eventCounters[pe] = eventCounter + } + c.mu.Unlock() + } + eventCounter.Inc() + } + return nil +} diff --git a/model/modelprocessor/eventcounter_test.go b/model/modelprocessor/eventcounter_test.go new file mode 100644 index 0000000000..92b073e79a --- /dev/null +++ b/model/modelprocessor/eventcounter_test.go @@ -0,0 +1,51 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelprocessor_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/monitoring" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modelprocessor" +) + +func TestEventCounter(t *testing.T) { + batch := model.Batch{ + {}, + {Processor: model.TransactionProcessor}, + {Processor: model.SpanProcessor}, + {Processor: model.TransactionProcessor}, + } + + expected := monitoring.MakeFlatSnapshot() + expected.Ints["processor.span.transformations"] = 1 + expected.Ints["processor.transaction.transformations"] = 2 + + registry := monitoring.NewRegistry() + processor := modelprocessor.NewEventCounter(registry) + err := processor.ProcessBatch(context.Background(), &batch) + assert.NoError(t, err) + snapshot := monitoring.CollectFlatSnapshot(registry, monitoring.Full, false) + assert.Equal(t, expected, snapshot) + +} diff --git a/model/span.go b/model/span.go index c1852f8452..af0f7ab91d 100644 --- a/model/span.go +++ b/model/span.go @@ -19,17 +19,11 @@ package model import ( "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/apm-server/utility" ) var ( - spanMetrics = monitoring.Default.NewRegistry("apm-server.processor.span") - spanTransformations = monitoring.NewInt(spanMetrics, "transformations") - spanStacktraceCounter = monitoring.NewInt(spanMetrics, "stacktraces") - spanFrameCounter = monitoring.NewInt(spanMetrics, "frames") - // SpanProcessor is the Processor value that should be assigned to span events. SpanProcessor = Processor{Name: "transaction", Event: "span"} ) @@ -131,12 +125,6 @@ func (c *Composite) fields() common.MapStr { } func (e *Span) fields(apmEvent *APMEvent) common.MapStr { - spanTransformations.Inc() - if frames := len(e.Stacktrace); frames > 0 { - spanStacktraceCounter.Inc() - spanFrameCounter.Add(int64(frames)) - } - var fields mapStr var trace, transaction, parent mapStr if trace.maybeSetString("id", e.TraceID) { diff --git a/model/transaction.go b/model/transaction.go index 9bd84dee56..879e979ae0 100644 --- a/model/transaction.go +++ b/model/transaction.go @@ -19,7 +19,6 @@ package model import ( "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/apm-server/utility" ) @@ -29,9 +28,6 @@ const ( ) var ( - transactionMetrics = monitoring.Default.NewRegistry("apm-server.processor.transaction") - transactionTransformations = monitoring.NewInt(transactionMetrics, "transformations") - // TransactionProcessor is the Processor value that should be assigned to transaction events. TransactionProcessor = Processor{Name: "transaction", Event: "transaction"} ) @@ -68,8 +64,6 @@ type SpanCount struct { } func (e *Transaction) fields() common.MapStr { - transactionTransformations.Inc() - var fields mapStr var parent, trace mapStr parent.maybeSetString("id", e.ParentID)