From 7359094fff9dbb4f9caf8fb058f30321568bf6c8 Mon Sep 17 00:00:00 2001 From: Sean Porter Date: Thu, 21 Dec 2023 10:28:51 -0800 Subject: [PATCH] Fix broken spanmetrics counters after span producing service restart (#29711) My spanmetrics counters (e.g. `calls_total`) break after restarting the span producing service. For example: ![Screenshot from 2023-12-06 11-39-57](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/149630/abea1b72-392b-4f1f-a403-644c4e356f3d) I discovered that the resource key used for the calculated metrics was a map hash of the resource attributes. This worked fine for some instrumented services, however, other services include attributes like its process id etc. Restarting one of these services would result in a new hash and calculated resource metrics (in addition to the existing ones). This pull-request filters the resource attributes used to produce the resource metrics key map hash. I am now able to restart services without breaking my counters. --------- Signed-off-by: Sean Porter Co-authored-by: Albert <26584478+albertteoh@users.noreply.github.com> --- .../spanmetrics-fix-resource-metrics-key.yaml | 27 +++++++ connector/spanmetricsconnector/config.go | 9 +++ connector/spanmetricsconnector/config_test.go | 11 +++ connector/spanmetricsconnector/connector.go | 44 ++++++++--- .../spanmetricsconnector/connector_test.go | 75 +++++++++++++++---- .../spanmetricsconnector/testdata/config.yaml | 7 ++ 6 files changed, 146 insertions(+), 27 deletions(-) create mode 100644 .chloggen/spanmetrics-fix-resource-metrics-key.yaml diff --git a/.chloggen/spanmetrics-fix-resource-metrics-key.yaml b/.chloggen/spanmetrics-fix-resource-metrics-key.yaml new file mode 100644 index 000000000000..9aeaa31920ee --- /dev/null +++ b/.chloggen/spanmetrics-fix-resource-metrics-key.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: connector/spanmetrics + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Configurable resource metrics key attributes, filter the resource attributes used to create the resource metrics key. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29711] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This enhancement can be used to fix broken spanmetrics counters after a span producing service restart, when resource attributes contain dynamic/ephemeral values (e.g. process id). + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/connector/spanmetricsconnector/config.go b/connector/spanmetricsconnector/config.go index 72c3412c2bae..66174e34fccb 100644 --- a/connector/spanmetricsconnector/config.go +++ b/connector/spanmetricsconnector/config.go @@ -51,6 +51,15 @@ type Config struct { // Optional. See defaultResourceMetricsCacheSize in connector.go for the default value. ResourceMetricsCacheSize int `mapstructure:"resource_metrics_cache_size"` + // ResourceMetricsKeyAttributes filters the resource attributes used to create the resource metrics key hash. + // This can be used to avoid situations where resource attributes may change across service restarts, causing + // metric counters to break (and duplicate). A resource does not need to have all of the attributes. The list + // must include enough attributes to properly identify unique resources or risk aggregating data from more + // than one service and span. + // e.g. ["service.name", "telemetry.sdk.language", "telemetry.sdk.name"] + // See https://opentelemetry.io/docs/specs/semconv/resource/ for possible attributes. + ResourceMetricsKeyAttributes []string `mapstructure:"resource_metrics_key_attributes"` + AggregationTemporality string `mapstructure:"aggregation_temporality"` Histogram HistogramConfig `mapstructure:"histogram"` diff --git a/connector/spanmetricsconnector/config_test.go b/connector/spanmetricsconnector/config_test.go index 281e33e4b6ea..18458fc7cd98 100644 --- a/connector/spanmetricsconnector/config_test.go +++ b/connector/spanmetricsconnector/config_test.go @@ -110,6 +110,17 @@ func TestLoadConfig(t *testing.T) { Exemplars: ExemplarsConfig{Enabled: true, MaxPerDataPoint: &defaultMaxPerDatapoint}, }, }, + { + id: component.NewIDWithName(metadata.Type, "resource_metrics_key_attributes"), + expected: &Config{ + AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", + DimensionsCacheSize: defaultDimensionsCacheSize, + ResourceMetricsCacheSize: defaultResourceMetricsCacheSize, + ResourceMetricsKeyAttributes: []string{"service.name", "telemetry.sdk.language", "telemetry.sdk.name"}, + MetricsFlushInterval: 15 * time.Second, + Histogram: HistogramConfig{Disable: false, Unit: defaultUnit}, + }, + }, } for _, tt := range tests { diff --git a/connector/spanmetricsconnector/connector.go b/connector/spanmetricsconnector/connector.go index 4d3b0e101b6d..5a001c42dd38 100644 --- a/connector/spanmetricsconnector/connector.go +++ b/connector/spanmetricsconnector/connector.go @@ -54,6 +54,8 @@ type connectorImp struct { resourceMetrics *cache.Cache[resourceKey, *resourceMetrics] + resourceMetricsKeyAttributes map[string]struct{} + keyBuf *bytes.Buffer // An LRU cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values: @@ -115,17 +117,24 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic return nil, err } + resourceMetricsKeyAttributes := make(map[string]struct{}, len(cfg.ResourceMetricsKeyAttributes)) + var s struct{} + for _, attr := range cfg.ResourceMetricsKeyAttributes { + resourceMetricsKeyAttributes[attr] = s + } + return &connectorImp{ - logger: logger, - config: *cfg, - resourceMetrics: resourceMetricsCache, - dimensions: newDimensions(cfg.Dimensions), - keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)), - metricKeyToDimensions: metricKeyToDimensionsCache, - ticker: ticker, - done: make(chan struct{}), - eDimensions: newDimensions(cfg.Events.Dimensions), - events: cfg.Events, + logger: logger, + config: *cfg, + resourceMetrics: resourceMetricsCache, + resourceMetricsKeyAttributes: resourceMetricsKeyAttributes, + dimensions: newDimensions(cfg.Dimensions), + keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)), + metricKeyToDimensions: metricKeyToDimensionsCache, + ticker: ticker, + done: make(chan struct{}), + eDimensions: newDimensions(cfg.Events.Dimensions), + events: cfg.Events, }, nil } @@ -390,8 +399,21 @@ func (p *connectorImp) addExemplar(span ptrace.Span, duration float64, h metrics type resourceKey [16]byte +func (p *connectorImp) createResourceKey(attr pcommon.Map) resourceKey { + if len(p.resourceMetricsKeyAttributes) == 0 { + return pdatautil.MapHash(attr) + } + m := pcommon.NewMap() + attr.CopyTo(m) + m.RemoveIf(func(k string, _ pcommon.Value) bool { + _, ok := p.resourceMetricsKeyAttributes[k] + return !ok + }) + return pdatautil.MapHash(m) +} + func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics { - key := resourceKey(pdatautil.MapHash(attr)) + key := p.createResourceKey(attr) v, ok := p.resourceMetrics.Get(key) if !ok { v = &resourceMetrics{ diff --git a/connector/spanmetricsconnector/connector_test.go b/connector/spanmetricsconnector/connector_test.go index 236c02cc2543..98a2377dfa80 100644 --- a/connector/spanmetricsconnector/connector_test.go +++ b/connector/spanmetricsconnector/connector_test.go @@ -600,7 +600,7 @@ func TestConcurrentShutdown(t *testing.T) { ticker := mockClock.NewTicker(time.Nanosecond) // Test - p := newConnectorImp(t, new(consumertest.MetricsSink), nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, logger, ticker) + p := newConnectorImp(t, new(consumertest.MetricsSink), nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, logger, ticker) err := p.Start(ctx, componenttest.NewNopHost()) require.NoError(t, err) @@ -680,7 +680,7 @@ func TestConsumeMetricsErrors(t *testing.T) { } mockClock := clock.NewMock(time.Now()) ticker := mockClock.NewTicker(time.Nanosecond) - p := newConnectorImp(t, mcon, nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, logger, ticker) + p := newConnectorImp(t, mcon, nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, logger, ticker) ctx := metadata.NewIncomingContext(context.Background(), nil) err := p.Start(ctx, componenttest.NewNopHost()) @@ -842,7 +842,7 @@ func TestConsumeTraces(t *testing.T) { mockClock := clock.NewMock(time.Now()) ticker := mockClock.NewTicker(time.Nanosecond) - p := newConnectorImp(t, mcon, stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, zaptest.NewLogger(t), ticker) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, []string{}, zaptest.NewLogger(t), ticker) ctx := metadata.NewIncomingContext(context.Background(), nil) err := p.Start(ctx, componenttest.NewNopHost()) @@ -868,7 +868,7 @@ func TestConsumeTraces(t *testing.T) { func TestMetricKeyCache(t *testing.T) { mcon := consumertest.NewNop() - p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil) traces := buildSampleTrace() // Test @@ -898,7 +898,7 @@ func TestMetricKeyCache(t *testing.T) { func TestResourceMetricsCache(t *testing.T) { mcon := consumertest.NewNop() - p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil) // Test ctx := metadata.NewIncomingContext(context.Background(), nil) @@ -933,11 +933,53 @@ func TestResourceMetricsCache(t *testing.T) { assert.Equal(t, resourceMetricsCacheSize, p.resourceMetrics.Len()) } +func TestResourceMetricsKeyAttributes(t *testing.T) { + mcon := consumertest.NewNop() + + resourceMetricsKeyAttributes := []string{ + "service.name", + } + + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, resourceMetricsKeyAttributes, zaptest.NewLogger(t), nil) + + // Test + ctx := metadata.NewIncomingContext(context.Background(), nil) + + // 0 resources in the beginning + assert.Zero(t, p.resourceMetrics.Len()) + + err := p.ConsumeTraces(ctx, buildSampleTrace()) + // Validate + require.NoError(t, err) + assert.Equal(t, 2, p.resourceMetrics.Len()) + + // consume another batch of traces for the same resources + err = p.ConsumeTraces(ctx, buildSampleTrace()) + require.NoError(t, err) + assert.Equal(t, 2, p.resourceMetrics.Len()) + + // consume more batches for new resources. Max size is exceeded causing old resource entries to be discarded + for i := 0; i < resourceMetricsCacheSize; i++ { + traces := buildSampleTrace() + + // add resource attributes to simulate additional resources providing data + for j := 0; j < traces.ResourceSpans().Len(); j++ { + traces.ResourceSpans().At(j).Resource().Attributes().PutStr("not included in resource key attributes", fmt.Sprintf("%d", i)) + } + + err = p.ConsumeTraces(ctx, traces) + require.NoError(t, err) + } + + // validate that the additional resources providing data did not result in additional resource metrics + assert.Equal(t, 2, p.resourceMetrics.Len()) +} + func BenchmarkConnectorConsumeTraces(b *testing.B) { // Prepare mcon := consumertest.NewNop() - conn := newConnectorImp(nil, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(b), nil) + conn := newConnectorImp(nil, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(b), nil) traces := buildSampleTrace() @@ -951,7 +993,7 @@ func BenchmarkConnectorConsumeTraces(b *testing.B) { func TestExcludeDimensionsConsumeTraces(t *testing.T) { mcon := consumertest.NewNop() excludeDimensions := []string{"span.kind", "span.name", "totallyWrongNameDoesNotAffectAnything"} - p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil, excludeDimensions...) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil, excludeDimensions...) traces := buildSampleTrace() // Test @@ -1000,15 +1042,16 @@ func TestExcludeDimensionsConsumeTraces(t *testing.T) { } -func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp { +func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, resourceMetricsKeyAttributes []string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp { cfg := &Config{ - AggregationTemporality: temporality, - Histogram: histogramConfig(), - Exemplars: exemplarsConfig(), - ExcludeDimensions: excludedDimensions, - DimensionsCacheSize: dimensionsCacheSize, - ResourceMetricsCacheSize: resourceMetricsCacheSize, + AggregationTemporality: temporality, + Histogram: histogramConfig(), + Exemplars: exemplarsConfig(), + ExcludeDimensions: excludedDimensions, + DimensionsCacheSize: dimensionsCacheSize, + ResourceMetricsCacheSize: resourceMetricsCacheSize, + ResourceMetricsKeyAttributes: resourceMetricsKeyAttributes, Dimensions: []Dimension{ // Set nil defaults to force a lookup for the attribute in the span. {stringAttrName, nil}, @@ -1120,7 +1163,7 @@ func TestConnectorConsumeTracesEvictedCacheKey(t *testing.T) { ticker := mockClock.NewTicker(time.Nanosecond) // Note: default dimension key cache size is 2. - p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), ticker) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), ticker) ctx := metadata.NewIncomingContext(context.Background(), nil) err := p.Start(ctx, componenttest.NewNopHost()) @@ -1374,7 +1417,7 @@ func TestSpanMetrics_Events(t *testing.T) { } func TestExemplarsForSumMetrics(t *testing.T) { mcon := consumertest.NewNop() - p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, zaptest.NewLogger(t), nil) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil) traces := buildSampleTrace() // Test diff --git a/connector/spanmetricsconnector/testdata/config.yaml b/connector/spanmetricsconnector/testdata/config.yaml index 07b89c1841ea..ae6faab493b9 100644 --- a/connector/spanmetricsconnector/testdata/config.yaml +++ b/connector/spanmetricsconnector/testdata/config.yaml @@ -68,3 +68,10 @@ spanmetrics/exemplars_enabled_with_max_per_datapoint: exemplars: enabled: true max_per_data_point: 5 + +# resource metrics key attributes filter +spanmetrics/resource_metrics_key_attributes: + resource_metrics_key_attributes: + - service.name + - telemetry.sdk.language + - telemetry.sdk.name