diff --git a/.chloggen/spanmetrics-fix-resource-metrics-key.yaml b/.chloggen/spanmetrics-fix-resource-metrics-key.yaml new file mode 100644 index 000000000000..9aeaa31920ee --- /dev/null +++ b/.chloggen/spanmetrics-fix-resource-metrics-key.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: connector/spanmetrics + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Configurable resource metrics key attributes, filter the resource attributes used to create the resource metrics key. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29711] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This enhancement can be used to fix broken spanmetrics counters after a span producing service restart, when resource attributes contain dynamic/ephemeral values (e.g. process id). + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/connector/spanmetricsconnector/config.go b/connector/spanmetricsconnector/config.go index 72c3412c2bae..66174e34fccb 100644 --- a/connector/spanmetricsconnector/config.go +++ b/connector/spanmetricsconnector/config.go @@ -51,6 +51,15 @@ type Config struct { // Optional. See defaultResourceMetricsCacheSize in connector.go for the default value. ResourceMetricsCacheSize int `mapstructure:"resource_metrics_cache_size"` + // ResourceMetricsKeyAttributes filters the resource attributes used to create the resource metrics key hash. + // This can be used to avoid situations where resource attributes may change across service restarts, causing + // metric counters to break (and duplicate). A resource does not need to have all of the attributes. The list + // must include enough attributes to properly identify unique resources or risk aggregating data from more + // than one service and span. + // e.g. ["service.name", "telemetry.sdk.language", "telemetry.sdk.name"] + // See https://opentelemetry.io/docs/specs/semconv/resource/ for possible attributes. + ResourceMetricsKeyAttributes []string `mapstructure:"resource_metrics_key_attributes"` + AggregationTemporality string `mapstructure:"aggregation_temporality"` Histogram HistogramConfig `mapstructure:"histogram"` diff --git a/connector/spanmetricsconnector/config_test.go b/connector/spanmetricsconnector/config_test.go index 281e33e4b6ea..18458fc7cd98 100644 --- a/connector/spanmetricsconnector/config_test.go +++ b/connector/spanmetricsconnector/config_test.go @@ -110,6 +110,17 @@ func TestLoadConfig(t *testing.T) { Exemplars: ExemplarsConfig{Enabled: true, MaxPerDataPoint: &defaultMaxPerDatapoint}, }, }, + { + id: component.NewIDWithName(metadata.Type, "resource_metrics_key_attributes"), + expected: &Config{ + AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", + DimensionsCacheSize: defaultDimensionsCacheSize, + ResourceMetricsCacheSize: defaultResourceMetricsCacheSize, + ResourceMetricsKeyAttributes: []string{"service.name", "telemetry.sdk.language", "telemetry.sdk.name"}, + MetricsFlushInterval: 15 * time.Second, + Histogram: HistogramConfig{Disable: false, Unit: defaultUnit}, + }, + }, } for _, tt := range tests { diff --git a/connector/spanmetricsconnector/connector.go b/connector/spanmetricsconnector/connector.go index 4d3b0e101b6d..5a001c42dd38 100644 --- a/connector/spanmetricsconnector/connector.go +++ b/connector/spanmetricsconnector/connector.go @@ -54,6 +54,8 @@ type connectorImp struct { resourceMetrics *cache.Cache[resourceKey, *resourceMetrics] + resourceMetricsKeyAttributes map[string]struct{} + keyBuf *bytes.Buffer // An LRU cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values: @@ -115,17 +117,24 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic return nil, err } + resourceMetricsKeyAttributes := make(map[string]struct{}, len(cfg.ResourceMetricsKeyAttributes)) + var s struct{} + for _, attr := range cfg.ResourceMetricsKeyAttributes { + resourceMetricsKeyAttributes[attr] = s + } + return &connectorImp{ - logger: logger, - config: *cfg, - resourceMetrics: resourceMetricsCache, - dimensions: newDimensions(cfg.Dimensions), - keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)), - metricKeyToDimensions: metricKeyToDimensionsCache, - ticker: ticker, - done: make(chan struct{}), - eDimensions: newDimensions(cfg.Events.Dimensions), - events: cfg.Events, + logger: logger, + config: *cfg, + resourceMetrics: resourceMetricsCache, + resourceMetricsKeyAttributes: resourceMetricsKeyAttributes, + dimensions: newDimensions(cfg.Dimensions), + keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)), + metricKeyToDimensions: metricKeyToDimensionsCache, + ticker: ticker, + done: make(chan struct{}), + eDimensions: newDimensions(cfg.Events.Dimensions), + events: cfg.Events, }, nil } @@ -390,8 +399,21 @@ func (p *connectorImp) addExemplar(span ptrace.Span, duration float64, h metrics type resourceKey [16]byte +func (p *connectorImp) createResourceKey(attr pcommon.Map) resourceKey { + if len(p.resourceMetricsKeyAttributes) == 0 { + return pdatautil.MapHash(attr) + } + m := pcommon.NewMap() + attr.CopyTo(m) + m.RemoveIf(func(k string, _ pcommon.Value) bool { + _, ok := p.resourceMetricsKeyAttributes[k] + return !ok + }) + return pdatautil.MapHash(m) +} + func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics { - key := resourceKey(pdatautil.MapHash(attr)) + key := p.createResourceKey(attr) v, ok := p.resourceMetrics.Get(key) if !ok { v = &resourceMetrics{ diff --git a/connector/spanmetricsconnector/connector_test.go b/connector/spanmetricsconnector/connector_test.go index 236c02cc2543..98a2377dfa80 100644 --- a/connector/spanmetricsconnector/connector_test.go +++ b/connector/spanmetricsconnector/connector_test.go @@ -600,7 +600,7 @@ func TestConcurrentShutdown(t *testing.T) { ticker := mockClock.NewTicker(time.Nanosecond) // Test - p := newConnectorImp(t, new(consumertest.MetricsSink), nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, logger, ticker) + p := newConnectorImp(t, new(consumertest.MetricsSink), nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, logger, ticker) err := p.Start(ctx, componenttest.NewNopHost()) require.NoError(t, err) @@ -680,7 +680,7 @@ func TestConsumeMetricsErrors(t *testing.T) { } mockClock := clock.NewMock(time.Now()) ticker := mockClock.NewTicker(time.Nanosecond) - p := newConnectorImp(t, mcon, nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, logger, ticker) + p := newConnectorImp(t, mcon, nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, logger, ticker) ctx := metadata.NewIncomingContext(context.Background(), nil) err := p.Start(ctx, componenttest.NewNopHost()) @@ -842,7 +842,7 @@ func TestConsumeTraces(t *testing.T) { mockClock := clock.NewMock(time.Now()) ticker := mockClock.NewTicker(time.Nanosecond) - p := newConnectorImp(t, mcon, stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, zaptest.NewLogger(t), ticker) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, []string{}, zaptest.NewLogger(t), ticker) ctx := metadata.NewIncomingContext(context.Background(), nil) err := p.Start(ctx, componenttest.NewNopHost()) @@ -868,7 +868,7 @@ func TestConsumeTraces(t *testing.T) { func TestMetricKeyCache(t *testing.T) { mcon := consumertest.NewNop() - p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil) traces := buildSampleTrace() // Test @@ -898,7 +898,7 @@ func TestMetricKeyCache(t *testing.T) { func TestResourceMetricsCache(t *testing.T) { mcon := consumertest.NewNop() - p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil) // Test ctx := metadata.NewIncomingContext(context.Background(), nil) @@ -933,11 +933,53 @@ func TestResourceMetricsCache(t *testing.T) { assert.Equal(t, resourceMetricsCacheSize, p.resourceMetrics.Len()) } +func TestResourceMetricsKeyAttributes(t *testing.T) { + mcon := consumertest.NewNop() + + resourceMetricsKeyAttributes := []string{ + "service.name", + } + + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, resourceMetricsKeyAttributes, zaptest.NewLogger(t), nil) + + // Test + ctx := metadata.NewIncomingContext(context.Background(), nil) + + // 0 resources in the beginning + assert.Zero(t, p.resourceMetrics.Len()) + + err := p.ConsumeTraces(ctx, buildSampleTrace()) + // Validate + require.NoError(t, err) + assert.Equal(t, 2, p.resourceMetrics.Len()) + + // consume another batch of traces for the same resources + err = p.ConsumeTraces(ctx, buildSampleTrace()) + require.NoError(t, err) + assert.Equal(t, 2, p.resourceMetrics.Len()) + + // consume more batches for new resources. Max size is exceeded causing old resource entries to be discarded + for i := 0; i < resourceMetricsCacheSize; i++ { + traces := buildSampleTrace() + + // add resource attributes to simulate additional resources providing data + for j := 0; j < traces.ResourceSpans().Len(); j++ { + traces.ResourceSpans().At(j).Resource().Attributes().PutStr("not included in resource key attributes", fmt.Sprintf("%d", i)) + } + + err = p.ConsumeTraces(ctx, traces) + require.NoError(t, err) + } + + // validate that the additional resources providing data did not result in additional resource metrics + assert.Equal(t, 2, p.resourceMetrics.Len()) +} + func BenchmarkConnectorConsumeTraces(b *testing.B) { // Prepare mcon := consumertest.NewNop() - conn := newConnectorImp(nil, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(b), nil) + conn := newConnectorImp(nil, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(b), nil) traces := buildSampleTrace() @@ -951,7 +993,7 @@ func BenchmarkConnectorConsumeTraces(b *testing.B) { func TestExcludeDimensionsConsumeTraces(t *testing.T) { mcon := consumertest.NewNop() excludeDimensions := []string{"span.kind", "span.name", "totallyWrongNameDoesNotAffectAnything"} - p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil, excludeDimensions...) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil, excludeDimensions...) traces := buildSampleTrace() // Test @@ -1000,15 +1042,16 @@ func TestExcludeDimensionsConsumeTraces(t *testing.T) { } -func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp { +func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, resourceMetricsKeyAttributes []string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp { cfg := &Config{ - AggregationTemporality: temporality, - Histogram: histogramConfig(), - Exemplars: exemplarsConfig(), - ExcludeDimensions: excludedDimensions, - DimensionsCacheSize: dimensionsCacheSize, - ResourceMetricsCacheSize: resourceMetricsCacheSize, + AggregationTemporality: temporality, + Histogram: histogramConfig(), + Exemplars: exemplarsConfig(), + ExcludeDimensions: excludedDimensions, + DimensionsCacheSize: dimensionsCacheSize, + ResourceMetricsCacheSize: resourceMetricsCacheSize, + ResourceMetricsKeyAttributes: resourceMetricsKeyAttributes, Dimensions: []Dimension{ // Set nil defaults to force a lookup for the attribute in the span. {stringAttrName, nil}, @@ -1120,7 +1163,7 @@ func TestConnectorConsumeTracesEvictedCacheKey(t *testing.T) { ticker := mockClock.NewTicker(time.Nanosecond) // Note: default dimension key cache size is 2. - p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), ticker) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), ticker) ctx := metadata.NewIncomingContext(context.Background(), nil) err := p.Start(ctx, componenttest.NewNopHost()) @@ -1374,7 +1417,7 @@ func TestSpanMetrics_Events(t *testing.T) { } func TestExemplarsForSumMetrics(t *testing.T) { mcon := consumertest.NewNop() - p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, zaptest.NewLogger(t), nil) + p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil) traces := buildSampleTrace() // Test diff --git a/connector/spanmetricsconnector/testdata/config.yaml b/connector/spanmetricsconnector/testdata/config.yaml index 07b89c1841ea..ae6faab493b9 100644 --- a/connector/spanmetricsconnector/testdata/config.yaml +++ b/connector/spanmetricsconnector/testdata/config.yaml @@ -68,3 +68,10 @@ spanmetrics/exemplars_enabled_with_max_per_datapoint: exemplars: enabled: true max_per_data_point: 5 + +# resource metrics key attributes filter +spanmetrics/resource_metrics_key_attributes: + resource_metrics_key_attributes: + - service.name + - telemetry.sdk.language + - telemetry.sdk.name