Skip to content

Commit

Permalink
Move monitoring event counters to model processor
Browse files Browse the repository at this point in the history
Also, remove unused metrics.
  • Loading branch information
axw committed Aug 20, 2021
1 parent e21922b commit b09a075
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 46 deletions.
2 changes: 2 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/go-ucfg"

"github.com/pkg/errors"
Expand Down Expand Up @@ -479,6 +480,7 @@ func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) R
modelprocessor.SetErrorMessage{},
newObserverBatchProcessor(s.beat.Info),
model.ProcessBatchFunc(ecsVersionBatchProcessor),
modelprocessor.NewEventCounter(monitoring.Default.GetRegistry("apm-server")),
}
if s.config.DefaultServiceEnvironment != "" {
processors = append(processors, &modelprocessor.SetDefaultServiceEnvironment{
Expand Down
22 changes: 0 additions & 22 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,9 @@ package model

import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"
)

var (
errorMetrics = monitoring.Default.NewRegistry("apm-server.processor.error")
errorTransformations = monitoring.NewInt(errorMetrics, "transformations")
errorStacktraceCounter = monitoring.NewInt(errorMetrics, "stacktraces")
errorFrameCounter = monitoring.NewInt(errorMetrics, "frames")

// ErrorProcessor is the Processor value that should be assigned to error events.
ErrorProcessor = Processor{Name: "error", Event: "error"}
)
Expand Down Expand Up @@ -78,15 +72,6 @@ type Log struct {
}

func (e *Error) fields() common.MapStr {
errorTransformations.Inc()

if e.Exception != nil {
addStacktraceCounter(e.Exception.Stacktrace)
}
if e.Log != nil {
addStacktraceCounter(e.Log.Stacktrace)
}

var fields mapStr
if e.HTTP != nil {
fields.maybeSetMapStr("http", e.HTTP.transactionTopLevelFields())
Expand Down Expand Up @@ -165,13 +150,6 @@ func (e *Error) logFields() common.MapStr {
return common.MapStr(log)
}

func addStacktraceCounter(st Stacktrace) {
if frames := len(st); frames > 0 {
errorStacktraceCounter.Inc()
errorFrameCounter.Add(int64(frames))
}
}

// flattenExceptionTree recursively traverses the causes of an exception to return a slice of exceptions.
// Tree traversal is Depth First.
// The parent of a exception in the resulting slice is at the position indicated by the `parent` property
Expand Down
6 changes: 0 additions & 6 deletions model/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package model

import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"
)

const (
Expand All @@ -28,9 +27,6 @@ const (
)

var (
metricsetMetrics = monitoring.Default.NewRegistry("apm-server.processor.metric")
metricsetTransformations = monitoring.NewInt(metricsetMetrics, "transformations")

// MetricsetProcessor is the Processor value that should be assigned to metricset events.
MetricsetProcessor = Processor{Name: "metric", Event: "metric"}
)
Expand Down Expand Up @@ -138,8 +134,6 @@ type MetricsetSpan struct {
}

func (me *Metricset) fields() common.MapStr {
metricsetTransformations.Inc()

var fields mapStr
fields.maybeSetMapStr("transaction", me.Transaction.fields())
fields.maybeSetMapStr("span", me.Span.fields())
Expand Down
75 changes: 75 additions & 0 deletions model/modelprocessor/eventcounter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package modelprocessor

import (
"context"
"sync"

"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/model"
)

// EventCounter is a model.BatchProcessor that counts the number of events processed,
// recording the counts as metrics in a monitoring.Registry.
//
// Metrics are named after the event type: `processor.<processor.event>.transformations`.
// These metrics are used to populate the "Processed Events" graphs in Stack Monitoring.
type EventCounter struct {
registry *monitoring.Registry

mu sync.RWMutex
eventCounters map[string]*monitoring.Int
}

// NewEventCounter returns an EventCounter that counts events processed, recording
// them as `<processor.event>.transformations` under the given registry.
func NewEventCounter(registry *monitoring.Registry) *EventCounter {
return &EventCounter{
registry: registry,
eventCounters: make(map[string]*monitoring.Int),
}
}

// ProcessBatch counts events in b, grouping by APMEvent.Processor.Event.
func (c *EventCounter) ProcessBatch(ctx context.Context, b *model.Batch) error {
for _, event := range *b {
pe := event.Processor.Event
if pe == "" {
continue
}
c.mu.RLock()
eventCounter := c.eventCounters[pe]
c.mu.RUnlock()
if eventCounter == nil {
c.mu.Lock()
eventCounter = c.eventCounters[pe]
if eventCounter == nil {
eventCounter = monitoring.NewInt(
c.registry,
"processor."+pe+".transformations",
)
c.eventCounters[pe] = eventCounter
}
c.mu.Unlock()
}
eventCounter.Inc()
}
return nil
}
51 changes: 51 additions & 0 deletions model/modelprocessor/eventcounter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package modelprocessor_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/modelprocessor"
)

func TestEventCounter(t *testing.T) {
batch := model.Batch{
{},
{Processor: model.TransactionProcessor},
{Processor: model.SpanProcessor},
{Processor: model.TransactionProcessor},
}

expected := monitoring.MakeFlatSnapshot()
expected.Ints["processor.span.transformations"] = 1
expected.Ints["processor.transaction.transformations"] = 2

registry := monitoring.NewRegistry()
processor := modelprocessor.NewEventCounter(registry)
err := processor.ProcessBatch(context.Background(), &batch)
assert.NoError(t, err)
snapshot := monitoring.CollectFlatSnapshot(registry, monitoring.Full, false)
assert.Equal(t, expected, snapshot)

}
12 changes: 0 additions & 12 deletions model/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,11 @@ package model

import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/utility"
)

var (
spanMetrics = monitoring.Default.NewRegistry("apm-server.processor.span")
spanTransformations = monitoring.NewInt(spanMetrics, "transformations")
spanStacktraceCounter = monitoring.NewInt(spanMetrics, "stacktraces")
spanFrameCounter = monitoring.NewInt(spanMetrics, "frames")

// SpanProcessor is the Processor value that should be assigned to span events.
SpanProcessor = Processor{Name: "transaction", Event: "span"}
)
Expand Down Expand Up @@ -131,12 +125,6 @@ func (c *Composite) fields() common.MapStr {
}

func (e *Span) fields(apmEvent *APMEvent) common.MapStr {
spanTransformations.Inc()
if frames := len(e.Stacktrace); frames > 0 {
spanStacktraceCounter.Inc()
spanFrameCounter.Add(int64(frames))
}

var fields mapStr
var trace, transaction, parent mapStr
if trace.maybeSetString("id", e.TraceID) {
Expand Down
6 changes: 0 additions & 6 deletions model/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package model

import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/utility"
)
Expand All @@ -29,9 +28,6 @@ const (
)

var (
transactionMetrics = monitoring.Default.NewRegistry("apm-server.processor.transaction")
transactionTransformations = monitoring.NewInt(transactionMetrics, "transformations")

// TransactionProcessor is the Processor value that should be assigned to transaction events.
TransactionProcessor = Processor{Name: "transaction", Event: "transaction"}
)
Expand Down Expand Up @@ -68,8 +64,6 @@ type SpanCount struct {
}

func (e *Transaction) fields() common.MapStr {
transactionTransformations.Inc()

var fields mapStr
var parent, trace mapStr
parent.maybeSetString("id", e.ParentID)
Expand Down

0 comments on commit b09a075

Please sign in to comment.