Skip to content

Commit

Permalink
apmotel: follow APM OTel spec and prefer delta temporality (#1437)
Browse files Browse the repository at this point in the history
Make apmotel follow apm otel spec:
* Prefer delta temporality on certain instruments
* Filter out zero values on delta temporality
  • Loading branch information
carsonip authored May 16, 2023
1 parent 2aee364 commit ea8c3f6
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 12 deletions.
6 changes: 6 additions & 0 deletions module/apmotel/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,16 @@ func (e Gatherer) GatherMetrics(ctx context.Context, out *apm.Metrics) error {
addHistogramMetric(out, sm, m)
case metricdata.Sum[int64]:
for _, dp := range m.DataPoints {
if m.Temporality == metricdata.DeltaTemporality && dp.Value == 0 {
continue
}
out.Add(sm.Name, makeLabels(dp.Attributes), float64(dp.Value))
}
case metricdata.Sum[float64]:
for _, dp := range m.DataPoints {
if m.Temporality == metricdata.DeltaTemporality && dp.Value == 0 {
continue
}
out.Add(sm.Name, makeLabels(dp.Attributes), dp.Value)
}
case metricdata.Gauge[int64]:
Expand Down
34 changes: 30 additions & 4 deletions module/apmotel/gatherer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package apmotel // import "go.elastic.co/apm/module/apmotel/v2"
import (
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

var customHistogramBoundaries = []float64{
Expand All @@ -36,13 +37,15 @@ var customHistogramBoundaries = []float64{

type gathererConfig struct {
aggregation metric.AggregationSelector
temporality metric.TemporalitySelector
}

type GathererOption func(gathererConfig) gathererConfig

func newGathererConfig(opts ...GathererOption) gathererConfig {
cfg := gathererConfig{
aggregation: customAggregationSelector,
aggregation: defaultAggregationSelector,
temporality: defaultTemporalitySelector,
}
for _, opt := range opts {
cfg = opt(cfg)
Expand All @@ -52,8 +55,13 @@ func newGathererConfig(opts ...GathererOption) gathererConfig {
}

func (cfg gathererConfig) manualReaderOptions() []metric.ManualReaderOption {
opts := []metric.ManualReaderOption{}
opts = append(opts, metric.WithAggregationSelector(cfg.aggregation))
var opts []metric.ManualReaderOption
if cfg.aggregation != nil {
opts = append(opts, metric.WithAggregationSelector(cfg.aggregation))
}
if cfg.temporality != nil {
opts = append(opts, metric.WithTemporalitySelector(cfg.temporality))
}
return opts
}

Expand All @@ -67,7 +75,7 @@ func WithAggregationSelector(agg metric.AggregationSelector) GathererOption {
}
}

func customAggregationSelector(ik metric.InstrumentKind) aggregation.Aggregation {
func defaultAggregationSelector(ik metric.InstrumentKind) aggregation.Aggregation {
switch ik {
case metric.InstrumentKindHistogram:
return aggregation.ExplicitBucketHistogram{
Expand All @@ -78,3 +86,21 @@ func customAggregationSelector(ik metric.InstrumentKind) aggregation.Aggregation
return metric.DefaultAggregationSelector(ik)
}
}

// WithTemporalitySelector configure the Aggregation Selector the exporter will
// use. If no AggregationSelector is provided the DefaultAggregationSelector is
// used.
func WithTemporalitySelector(temporality metric.TemporalitySelector) GathererOption {
return func(cfg gathererConfig) gathererConfig {
cfg.temporality = temporality
return cfg
}
}

func defaultTemporalitySelector(ik metric.InstrumentKind) metricdata.Temporality {
switch ik {
case metric.InstrumentKindCounter, metric.InstrumentKindObservableCounter, metric.InstrumentKindHistogram:
return metricdata.DeltaTemporality
}
return metric.DefaultTemporalitySelector(ik)
}
24 changes: 22 additions & 2 deletions module/apmotel/gatherer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestNewGathererConfig(t *testing.T) {
aggregationSelector := func(metric.InstrumentKind) aggregation.Aggregation { return nil }
temporalitySelector := func(metric.InstrumentKind) metricdata.Temporality { return metricdata.CumulativeTemporality }

testCases := []struct {
name string
Expand All @@ -48,12 +50,20 @@ func TestNewGathererConfig(t *testing.T) {
},
wantConfig: gathererConfig{},
},
{
name: "WithTemporalitySelector",
options: []GathererOption{
WithTemporalitySelector(temporalitySelector),
},
wantConfig: gathererConfig{},
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
cfg := newGathererConfig(tt.options...)
// tested by TestConfigManualReaderOptions
cfg.aggregation = nil
cfg.temporality = nil

assert.Equal(t, tt.wantConfig, cfg)
})
Expand All @@ -62,6 +72,7 @@ func TestNewGathererConfig(t *testing.T) {

func TestConfigManualReaderOptions(t *testing.T) {
aggregationSelector := func(metric.InstrumentKind) aggregation.Aggregation { return nil }
temporalitySelector := func(metric.InstrumentKind) metricdata.Temporality { return metricdata.CumulativeTemporality }

testCases := []struct {
name string
Expand All @@ -71,14 +82,23 @@ func TestConfigManualReaderOptions(t *testing.T) {
{
name: "Default",
config: gathererConfig{},
wantOptionCount: 1,
wantOptionCount: 0,
},

{
name: "WithAggregationSelector",
config: gathererConfig{aggregation: aggregationSelector},
wantOptionCount: 1,
},
{
name: "WithTemporalitySelector",
config: gathererConfig{temporality: temporalitySelector},
wantOptionCount: 1,
},
{
name: "WithAggregationSelectorWithTemporalitySelector",
config: gathererConfig{aggregation: aggregationSelector, temporality: temporalitySelector},
wantOptionCount: 2,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
Expand Down
126 changes: 120 additions & 6 deletions module/apmotel/gatherer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ package apmotel
import (
"context"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/attribute"
metric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"

Expand Down Expand Up @@ -345,18 +347,130 @@ func gatherMetrics(g apm.MetricsGatherer) []model.Metrics {
metrics[i].Timestamp = model.Time{}
}

// Remove internal metrics
for i, m := range metrics {
removeInternalMetrics(&metrics)
return metrics
}

func TestDeltaTemporalityFilterOutZero(t *testing.T) {
gatherer, err := NewGatherer()
assert.NoError(t, err)
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(gatherer))
meter := provider.Meter("apmotel_test")

// Make callback run 2 times and make sure in the 2nd time delta temporality zero values are filtered out
wg := sync.WaitGroup{}
wg.Add(2)

i64Histogram, err := meter.Int64Histogram("i64Histogram")
require.NoError(t, err)
i64Histogram.Record(context.Background(), 1)
i64Counter, err := meter.Int64Counter("i64Counter")
require.NoError(t, err)
i64Counter.Add(context.Background(), 1)
i64UDCounter, err := meter.Int64UpDownCounter("i64UDCounter")
require.NoError(t, err)
i64UDCounter.Add(context.Background(), 1)
i64ObservableCounter, err := meter.Int64ObservableCounter("i64ObservableCounter")
require.NoError(t, err)
i64ObservableUDCounter, err := meter.Int64ObservableUpDownCounter("i64ObservableUDCounter")
require.NoError(t, err)
i64ObservableGauge, err := meter.Int64ObservableGauge("i64ObservableGauge")
require.NoError(t, err)
f64Histogram, err := meter.Float64Histogram("f64Histogram")
require.NoError(t, err)
f64Histogram.Record(context.Background(), 1)
f64Counter, err := meter.Float64Counter("f64Counter")
require.NoError(t, err)
f64Counter.Add(context.Background(), 1)
f64UDCounter, err := meter.Float64UpDownCounter("f64UDCounter")
require.NoError(t, err)
f64UDCounter.Add(context.Background(), 1)
f64ObservableCounter, err := meter.Float64ObservableCounter("f64ObservableCounter")
require.NoError(t, err)
f64ObservableUDCounter, err := meter.Float64ObservableUpDownCounter("f64ObservableUDCounter")
require.NoError(t, err)
f64ObservableGauge, err := meter.Float64ObservableGauge("f64ObservableGauge")
require.NoError(t, err)
registration, err := meter.RegisterCallback(
func(_ context.Context, obs metric.Observer) error {
wg.Done()
obs.ObserveInt64(i64ObservableCounter, 1)
obs.ObserveInt64(i64ObservableUDCounter, 1)
obs.ObserveInt64(i64ObservableGauge, 1)
obs.ObserveFloat64(f64ObservableCounter, 1)
obs.ObserveFloat64(f64ObservableUDCounter, 1)
obs.ObserveFloat64(f64ObservableGauge, 1)
return nil
},
i64ObservableCounter,
i64ObservableUDCounter,
i64ObservableGauge,
f64ObservableCounter,
f64ObservableUDCounter,
f64ObservableGauge,
)
require.NoError(t, err)
defer registration.Unregister()

t.Setenv("ELASTIC_APM_METRICS_INTERVAL", "1s")
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()
tracer.RegisterMetricsGatherer(gatherer)
tracer.SendMetrics(nil)
metrics := tracer.Payloads().Metrics
removeInternalMetrics(&metrics)
var names []string
for _, m := range metrics {
for k := range m.Samples {
names = append(names, k)
}
}
assert.ElementsMatch(t, []string{"i64Histogram",
"i64Counter",
"i64UDCounter",
"i64ObservableCounter",
"i64ObservableUDCounter",
"i64ObservableGauge",
"f64Histogram",
"f64Counter",
"f64UDCounter",
"f64ObservableCounter",
"f64ObservableUDCounter",
"f64ObservableGauge",
}, names)

tracer.ResetPayloads()
wg.Wait()
tracer.SendMetrics(nil)
metrics = tracer.Payloads().Metrics
removeInternalMetrics(&metrics)
names = names[:0]
for _, m := range metrics {
for k := range m.Samples {
names = append(names, k)
}
}
assert.ElementsMatch(t, []string{
"i64UDCounter",
"f64UDCounter",
"i64ObservableUDCounter",
"f64ObservableUDCounter",
"i64ObservableGauge",
"f64ObservableGauge",
}, names)
}

func removeInternalMetrics(metrics *[]model.Metrics) {
for i, m := range *metrics {
for k := range m.Samples {
if strings.HasPrefix(k, "golang.") || strings.HasPrefix(k, "system.") {
delete(m.Samples, k)
}
}

if len(m.Samples) == 0 {
metrics[i] = metrics[len(metrics)-1]
metrics = metrics[:len(metrics)-1]
(*metrics)[i] = (*metrics)[len(*metrics)-1]
*metrics = (*metrics)[:len(*metrics)-1]
}
}
return metrics
}

0 comments on commit ea8c3f6

Please sign in to comment.