Skip to content

Commit

Permalink
[connector/spanmetrics] disable exemplars by default (#24048)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

Breaking change! Allows enabling / disabling Exemplars.

**Link to tracking Issue:** <Issue number if applicable>

#23872
**Testing:** <Describe what testing was performed and which tests were
added.>

- Added unit test
**Documentation:** <Describe the documentation added.>
- Added docs

---------

Co-authored-by: Albert <[email protected]>
Co-authored-by: Pablo Baeyens <[email protected]>
  • Loading branch information
3 people authored Jul 13, 2023
1 parent 71e5297 commit 1005a5c
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 15 deletions.
22 changes: 22 additions & 0 deletions .chloggen/spanmetricsconnector-disabled.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetricsconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Histograms will not have exemplars by default"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23872]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Previously spanmetricsconnector would attach every single span as an exemplar to the histogram.
Exemplars are now disabled by default. To enable them set `exemplars::enabled=true`
6 changes: 5 additions & 1 deletion connector/spanmetricsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ The following settings can be optionally configured:
One of either `AGGREGATION_TEMPORALITY_CUMULATIVE` or `AGGREGATION_TEMPORALITY_DELTA`.
- `namespace`: Defines the namespace of the generated metrics. If `namespace` provided, generated metric name will be added `namespace.` prefix.
- `metrics_flush_interval` (default: `15s`): Defines the flush interval of the generated metrics.

- `exemplars`: Use to configure how to attach exemplars to histograms
- `enabled` (default: `false`): enabling will add spans as Exemplars.

## Examples

The following is a simple example usage of the `spanmetrics` connector.
Expand All @@ -123,6 +125,8 @@ connectors:
- name: http.method
default: GET
- name: http.status_code
exemplars:
enabled: true
exclude_dimensions: ['status.code']
dimensions_cache_size: 1000
aggregation_temporality: "AGGREGATION_TEMPORALITY_CUMULATIVE"
Expand Down
7 changes: 7 additions & 0 deletions connector/spanmetricsconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type Config struct {

// Namespace is the namespace of the metrics emitted by the connector.
Namespace string `mapstructure:"namespace"`

// Exemplars defines the configuration for exemplars.
Exemplars ExemplarsConfig `mapstructure:"exemplars"`
}

type HistogramConfig struct {
Expand All @@ -64,6 +67,10 @@ type HistogramConfig struct {
Explicit *ExplicitHistogramConfig `mapstructure:"explicit"`
}

type ExemplarsConfig struct {
Enabled bool `mapstructure:"enabled"`
}

type ExponentialHistogramConfig struct {
MaxSize int32 `mapstructure:"max_size"`
}
Expand Down
16 changes: 14 additions & 2 deletions connector/spanmetricsconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func TestLoadConfig(t *testing.T) {
},
DimensionsCacheSize: 1500,
MetricsFlushInterval: 30 * time.Second,
Exemplars: ExemplarsConfig{
Enabled: true,
},
Histogram: HistogramConfig{
Unit: metrics.Seconds,
Explicit: &ExplicitHistogramConfig{
Expand All @@ -59,8 +62,7 @@ func TestLoadConfig(t *testing.T) {
},
},
},
},
},
}},
{
id: component.NewIDWithName(metadata.Type, "exponential_histogram"),
expected: &Config{
Expand All @@ -83,6 +85,16 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "invalid_histogram_unit"),
errorMessage: "unknown Unit \"h\"",
},
{
id: component.NewIDWithName(metadata.Type, "exemplars_enabled"),
expected: &Config{
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
MetricsFlushInterval: 15 * time.Second,
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
Exemplars: ExemplarsConfig{Enabled: true},
},
},
}

for _, tt := range tests {
Expand Down
16 changes: 13 additions & 3 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,9 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
if !p.config.Histogram.Disable {
// aggregate histogram metrics
h := histograms.GetOrCreate(key, attributes)
p.addExemplar(span, duration, h)
h.Observe(duration)
if !span.TraceID().IsEmpty() {
h.AddExemplar(span.TraceID(), span.SpanID(), duration)
}

}
// aggregate sums metrics
s := sums.GetOrCreate(key, attributes)
Expand All @@ -327,6 +326,17 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
}
}

func (p *connectorImp) addExemplar(span ptrace.Span, duration float64, h metrics.Histogram) {
if !p.config.Exemplars.Enabled {
return
}
if span.TraceID().IsEmpty() {
return
}

h.AddExemplar(span.TraceID(), span.SpanID(), duration)
}

type resourceKey [16]byte

func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics {
Expand Down
79 changes: 71 additions & 8 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,33 @@ func verifyDisabledHistogram(t testing.TB, input pmetric.Metrics) bool {
return true
}

func verifyExemplarsExist(t testing.TB, input pmetric.Metrics) bool {
for i := 0; i < input.ResourceMetrics().Len(); i++ {
rm := input.ResourceMetrics().At(i)
ism := rm.ScopeMetrics()

// Checking all metrics, naming notice: ismC/mC - C here is for Counter.
for ismC := 0; ismC < ism.Len(); ismC++ {

m := ism.At(ismC).Metrics()

for mC := 0; mC < m.Len(); mC++ {
metric := m.At(mC)

if metric.Type() != pmetric.MetricTypeHistogram {
continue
}
dps := metric.Histogram().DataPoints()
for dp := 0; dp < dps.Len(); dp++ {
d := dps.At(dp)
assert.True(t, d.Exemplars().Len() > 0)
}
}
}
}
return true
}

// verifyConsumeMetricsInputCumulative expects one accumulation of metrics, and marked as cumulative
func verifyConsumeMetricsInputCumulative(t testing.TB, input pmetric.Metrics) bool {
return verifyConsumeMetricsInput(t, input, pmetric.AggregationTemporalityCumulative, 1)
Expand Down Expand Up @@ -371,6 +398,18 @@ func initSpan(span span, s ptrace.Span) {
s.SetSpanID(pcommon.SpanID(span.spanID))
}

func disabledExemplarsConfig() ExemplarsConfig {
return ExemplarsConfig{
Enabled: false,
}
}

func enabledExemplarsConfig() ExemplarsConfig {
return ExemplarsConfig{
Enabled: true,
}
}

func explicitHistogramsConfig() HistogramConfig {
return HistogramConfig{
Unit: defaultUnit,
Expand Down Expand Up @@ -544,7 +583,7 @@ func TestConcurrentShutdown(t *testing.T) {
ticker := mockClock.NewTicker(time.Nanosecond)

// Test
p := newConnectorImp(t, new(consumertest.MetricsSink), nil, explicitHistogramsConfig, cumulative, logger, ticker)
p := newConnectorImp(t, new(consumertest.MetricsSink), nil, explicitHistogramsConfig, disabledExemplarsConfig, cumulative, logger, ticker)
err := p.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err)

Expand Down Expand Up @@ -612,7 +651,7 @@ func TestConsumeMetricsErrors(t *testing.T) {

mockClock := clock.NewMock(time.Now())
ticker := mockClock.NewTicker(time.Nanosecond)
p := newConnectorImp(t, mcon, nil, explicitHistogramsConfig, cumulative, logger, ticker)
p := newConnectorImp(t, mcon, nil, explicitHistogramsConfig, disabledExemplarsConfig, cumulative, logger, ticker)

ctx := metadata.NewIncomingContext(context.Background(), nil)
err := p.Start(ctx, componenttest.NewNopHost())
Expand Down Expand Up @@ -649,6 +688,7 @@ func TestConsumeTraces(t *testing.T) {
name string
aggregationTemporality string
histogramConfig func() HistogramConfig
exemplarConfig func() ExemplarsConfig
verifier func(t testing.TB, input pmetric.Metrics) bool
traces []ptrace.Traces
}{
Expand All @@ -657,6 +697,7 @@ func TestConsumeTraces(t *testing.T) {
name: "Test histogram metrics are disabled",
aggregationTemporality: cumulative,
histogramConfig: disabledHistogramsConfig,
exemplarConfig: disabledExemplarsConfig,
verifier: verifyDisabledHistogram,
traces: []ptrace.Traces{buildSampleTrace()},
},
Expand All @@ -665,13 +706,15 @@ func TestConsumeTraces(t *testing.T) {
name: "Test single consumption, three spans (Cumulative), using exp. histogram",
aggregationTemporality: cumulative,
histogramConfig: exponentialHistogramsConfig,
exemplarConfig: disabledExemplarsConfig,
verifier: verifyConsumeMetricsInputCumulative,
traces: []ptrace.Traces{buildSampleTrace()},
},
{
name: "Test single consumption, three spans (Delta), using exp. histogram",
aggregationTemporality: delta,
histogramConfig: exponentialHistogramsConfig,
exemplarConfig: disabledExemplarsConfig,
verifier: verifyConsumeMetricsInputDelta,
traces: []ptrace.Traces{buildSampleTrace()},
},
Expand All @@ -680,6 +723,7 @@ func TestConsumeTraces(t *testing.T) {
name: "Test two consumptions (Cumulative), using exp. histogram",
aggregationTemporality: cumulative,
histogramConfig: exponentialHistogramsConfig,
exemplarConfig: disabledExemplarsConfig,
verifier: verifyMultipleCumulativeConsumptions(),
traces: []ptrace.Traces{buildSampleTrace(), buildSampleTrace()},
},
Expand All @@ -688,6 +732,7 @@ func TestConsumeTraces(t *testing.T) {
name: "Test two consumptions (Delta), using exp. histogram",
aggregationTemporality: delta,
histogramConfig: exponentialHistogramsConfig,
exemplarConfig: disabledExemplarsConfig,
verifier: verifyConsumeMetricsInputDelta,
traces: []ptrace.Traces{buildSampleTrace(), buildSampleTrace()},
},
Expand All @@ -696,6 +741,7 @@ func TestConsumeTraces(t *testing.T) {
name: "Test bad consumptions (Delta), using exp. histogram",
aggregationTemporality: cumulative,
histogramConfig: exponentialHistogramsConfig,
exemplarConfig: disabledExemplarsConfig,
verifier: verifyBadMetricsOkay,
traces: []ptrace.Traces{buildBadSampleTrace()},
},
Expand All @@ -705,13 +751,15 @@ func TestConsumeTraces(t *testing.T) {
name: "Test single consumption, three spans (Cumulative).",
aggregationTemporality: cumulative,
histogramConfig: explicitHistogramsConfig,
exemplarConfig: disabledExemplarsConfig,
verifier: verifyConsumeMetricsInputCumulative,
traces: []ptrace.Traces{buildSampleTrace()},
},
{
name: "Test single consumption, three spans (Delta).",
aggregationTemporality: delta,
histogramConfig: explicitHistogramsConfig,
exemplarConfig: disabledExemplarsConfig,
verifier: verifyConsumeMetricsInputDelta,
traces: []ptrace.Traces{buildSampleTrace()},
},
Expand All @@ -720,6 +768,7 @@ func TestConsumeTraces(t *testing.T) {
name: "Test two consumptions (Cumulative).",
aggregationTemporality: cumulative,
histogramConfig: explicitHistogramsConfig,
exemplarConfig: disabledExemplarsConfig,
verifier: verifyMultipleCumulativeConsumptions(),
traces: []ptrace.Traces{buildSampleTrace(), buildSampleTrace()},
},
Expand All @@ -728,6 +777,7 @@ func TestConsumeTraces(t *testing.T) {
name: "Test two consumptions (Delta).",
aggregationTemporality: delta,
histogramConfig: explicitHistogramsConfig,
exemplarConfig: disabledExemplarsConfig,
verifier: verifyConsumeMetricsInputDelta,
traces: []ptrace.Traces{buildSampleTrace(), buildSampleTrace()},
},
Expand All @@ -736,9 +786,19 @@ func TestConsumeTraces(t *testing.T) {
name: "Test bad consumptions (Delta).",
aggregationTemporality: cumulative,
histogramConfig: explicitHistogramsConfig,
exemplarConfig: disabledExemplarsConfig,
verifier: verifyBadMetricsOkay,
traces: []ptrace.Traces{buildBadSampleTrace()},
},
// enabling exemplars
{
name: "Test Exemplars are enabled",
aggregationTemporality: cumulative,
histogramConfig: explicitHistogramsConfig,
exemplarConfig: enabledExemplarsConfig,
verifier: verifyExemplarsExist,
traces: []ptrace.Traces{buildSampleTrace()},
},
}

for _, tc := range testcases {
Expand All @@ -754,13 +814,14 @@ func TestConsumeTraces(t *testing.T) {
// Mocked metric exporter will perform validation on metrics, during p.ConsumeMetrics()
mcon.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pmetric.Metrics) bool {
wg.Done()

return tc.verifier(t, input)
})).Return(nil)

mockClock := clock.NewMock(time.Now())
ticker := mockClock.NewTicker(time.Nanosecond)

p := newConnectorImp(t, mcon, stringp("defaultNullValue"), tc.histogramConfig, tc.aggregationTemporality, zaptest.NewLogger(t), ticker)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, tc.aggregationTemporality, zaptest.NewLogger(t), ticker)

ctx := metadata.NewIncomingContext(context.Background(), nil)
err := p.Start(ctx, componenttest.NewNopHost())
Expand All @@ -785,7 +846,7 @@ func TestMetricKeyCache(t *testing.T) {
mcon := &mocks.MetricsConsumer{}
mcon.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(nil)

p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, cumulative, zaptest.NewLogger(t), nil)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, cumulative, zaptest.NewLogger(t), nil)
traces := buildSampleTrace()

// Test
Expand Down Expand Up @@ -817,7 +878,7 @@ func BenchmarkConnectorConsumeTraces(b *testing.B) {
mcon := &mocks.MetricsConsumer{}
mcon.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(nil)

conn := newConnectorImp(nil, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, cumulative, zaptest.NewLogger(b), nil)
conn := newConnectorImp(nil, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, cumulative, zaptest.NewLogger(b), nil)

traces := buildSampleTrace()

Expand All @@ -832,7 +893,7 @@ func TestExcludeDimensionsConsumeTraces(t *testing.T) {
mcon := &mocks.MetricsConsumer{}
mcon.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(nil)
excludeDimensions := []string{"span.kind", "span.name", "totallyWrongNameDoesNotAffectAnything"}
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, cumulative, zaptest.NewLogger(t), nil, excludeDimensions...)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, cumulative, zaptest.NewLogger(t), nil, excludeDimensions...)
traces := buildSampleTrace()

// Test
Expand Down Expand Up @@ -881,10 +942,12 @@ func TestExcludeDimensionsConsumeTraces(t *testing.T) {

}

func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, temporality string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp {
func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, temporality string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp {

cfg := &Config{
AggregationTemporality: temporality,
Histogram: histogramConfig(),
Exemplars: exemplarsConfig(),
ExcludeDimensions: excludedDimensions,
DimensionsCacheSize: DimensionsCacheSize,
Dimensions: []Dimension{
Expand Down Expand Up @@ -1014,7 +1077,7 @@ func TestConnectorConsumeTracesEvictedCacheKey(t *testing.T) {
ticker := mockClock.NewTicker(time.Nanosecond)

// Note: default dimension key cache size is 2.
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, cumulative, zaptest.NewLogger(t), ticker)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, cumulative, zaptest.NewLogger(t), ticker)

ctx := metadata.NewIncomingContext(context.Background(), nil)
err := p.Start(ctx, componenttest.NewNopHost())
Expand Down
8 changes: 7 additions & 1 deletion connector/spanmetricsconnector/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ spanmetrics/full:
unit: "s"
explicit:
buckets: [ 10ms, 100ms, 250ms ]

exemplars:
enabled: true
dimensions_cache_size: 1500

# Additional list of dimensions on top of:
Expand Down Expand Up @@ -55,3 +56,8 @@ spanmetrics/exponential_and_explicit_histogram:
spanmetrics/invalid_histogram_unit:
histogram:
unit: "h"

# exemplars enabled
spanmetrics/exemplars_enabled:
exemplars:
enabled: true

0 comments on commit 1005a5c

Please sign in to comment.