From 8d88902a1d1a97114e9aa6c1a0486eca0c940aaa Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Tue, 27 Feb 2024 12:03:41 -0800 Subject: [PATCH 1/2] Improve exemplar tests. --- src/OpenTelemetry/Metrics/AggregatorStore.cs | 5 +- src/OpenTelemetry/Metrics/Metric.cs | 13 +- src/OpenTelemetry/Metrics/MetricPoint.cs | 20 ++- src/OpenTelemetry/Metrics/MetricReaderExt.cs | 16 +- .../Metrics/MetricStreamConfiguration.cs | 4 + .../Metrics/MetricExemplarTests.cs | 145 +++++++++++++----- 6 files changed, 147 insertions(+), 56 deletions(-) diff --git a/src/OpenTelemetry/Metrics/AggregatorStore.cs b/src/OpenTelemetry/Metrics/AggregatorStore.cs index fa4cefc7221..65c9c2c2df7 100644 --- a/src/OpenTelemetry/Metrics/AggregatorStore.cs +++ b/src/OpenTelemetry/Metrics/AggregatorStore.cs @@ -17,6 +17,7 @@ internal sealed class AggregatorStore internal readonly int CardinalityLimit; internal readonly bool EmitOverflowAttribute; internal readonly ConcurrentDictionary? TagsToMetricPointIndexDictionaryDelta; + internal readonly Func? ExemplarReservoirFactory; internal long DroppedMeasurements = 0; private static readonly string MetricPointCapHitFixMessage = "Consider opting in for the experimental SDK feature to emit all the throttled metrics under the overflow attribute by setting env variable OTEL_DOTNET_EXPERIMENTAL_METRICS_EMIT_OVERFLOW_ATTRIBUTE = true. You could also modify instrumentation to reduce the number of unique key/value pair combinations. Or use Views to drop unwanted tags. Or use MeterProviderBuilder.SetMaxMetricPointsPerMetricStream to set higher limit."; @@ -59,7 +60,8 @@ internal AggregatorStore( int cardinalityLimit, bool emitOverflowAttribute, bool shouldReclaimUnusedMetricPoints, - ExemplarFilter? exemplarFilter = null) + ExemplarFilter? exemplarFilter = null, + Func? exemplarReservoirFactory = null) { this.name = metricStreamIdentity.InstrumentName; this.CardinalityLimit = cardinalityLimit; @@ -74,6 +76,7 @@ internal AggregatorStore( this.exponentialHistogramMaxScale = metricStreamIdentity.ExponentialHistogramMaxScale; this.StartTimeExclusive = DateTimeOffset.UtcNow; this.exemplarFilter = exemplarFilter ?? DefaultExemplarFilter; + this.ExemplarReservoirFactory = exemplarReservoirFactory; if (metricStreamIdentity.TagKeys == null) { this.updateLongCallback = this.UpdateLong; diff --git a/src/OpenTelemetry/Metrics/Metric.cs b/src/OpenTelemetry/Metrics/Metric.cs index cfd6b4e3463..ca82f8d4eaa 100644 --- a/src/OpenTelemetry/Metrics/Metric.cs +++ b/src/OpenTelemetry/Metrics/Metric.cs @@ -49,7 +49,8 @@ internal Metric( int cardinalityLimit, bool emitOverflowAttribute, bool shouldReclaimUnusedMetricPoints, - ExemplarFilter? exemplarFilter = null) + ExemplarFilter? exemplarFilter = null, + Func? exemplarReservoirFactory = null) { this.InstrumentIdentity = instrumentIdentity; @@ -155,7 +156,15 @@ internal Metric( throw new NotSupportedException($"Unsupported Instrument Type: {instrumentIdentity.InstrumentType.FullName}"); } - this.AggregatorStore = new AggregatorStore(instrumentIdentity, aggType, temporality, cardinalityLimit, emitOverflowAttribute, shouldReclaimUnusedMetricPoints, exemplarFilter); + this.AggregatorStore = new AggregatorStore( + instrumentIdentity, + aggType, + temporality, + cardinalityLimit, + emitOverflowAttribute, + shouldReclaimUnusedMetricPoints, + exemplarFilter, + exemplarReservoirFactory); this.Temporality = temporality; } diff --git a/src/OpenTelemetry/Metrics/MetricPoint.cs b/src/OpenTelemetry/Metrics/MetricPoint.cs index 65a62c3eb4a..18656ef84c9 100644 --- a/src/OpenTelemetry/Metrics/MetricPoint.cs +++ b/src/OpenTelemetry/Metrics/MetricPoint.cs @@ -63,15 +63,27 @@ internal MetricPoint( this.ReferenceCount = 1; this.LookupData = lookupData; - ExemplarReservoir? reservoir = null; + var isExemplarEnabled = aggregatorStore!.IsExemplarEnabled(); + + ExemplarReservoir? reservoir; + try + { + reservoir = aggregatorStore.ExemplarReservoirFactory?.Invoke(); + } + catch + { + // todo: Log that the factory on view threw an exception + reservoir = null; + } + if (this.aggType == AggregationType.HistogramWithBuckets || this.aggType == AggregationType.HistogramWithMinMaxBuckets) { this.mpComponents = new MetricPointOptionalComponents(); this.mpComponents.HistogramBuckets = new HistogramBuckets(histogramExplicitBounds); - if (aggregatorStore!.IsExemplarEnabled()) + if (isExemplarEnabled) { - reservoir = new AlignedHistogramBucketExemplarReservoir(histogramExplicitBounds!.Length); + reservoir ??= new AlignedHistogramBucketExemplarReservoir(histogramExplicitBounds!.Length); } } else if (this.aggType == AggregationType.Histogram || @@ -91,7 +103,7 @@ internal MetricPoint( this.mpComponents = null; } - if (aggregatorStore!.IsExemplarEnabled() && reservoir == null) + if (isExemplarEnabled && reservoir == null) { reservoir = new SimpleFixedSizeExemplarReservoir(DefaultSimpleReservoirPoolSize); } diff --git a/src/OpenTelemetry/Metrics/MetricReaderExt.cs b/src/OpenTelemetry/Metrics/MetricReaderExt.cs index 9f3b6fa10d2..6b78958e6a5 100644 --- a/src/OpenTelemetry/Metrics/MetricReaderExt.cs +++ b/src/OpenTelemetry/Metrics/MetricReaderExt.cs @@ -147,14 +147,14 @@ internal virtual List AddMetricWithViews(Instrument instrument, List? ExemplarReservoirFactory { get; set; } + internal string[]? CopiedTagKeys { get; private set; } internal int? ViewId { get; set; } diff --git a/test/OpenTelemetry.Tests/Metrics/MetricExemplarTests.cs b/test/OpenTelemetry.Tests/Metrics/MetricExemplarTests.cs index 356e4ac8acb..885e1e725fd 100644 --- a/test/OpenTelemetry.Tests/Metrics/MetricExemplarTests.cs +++ b/test/OpenTelemetry.Tests/Metrics/MetricExemplarTests.cs @@ -1,23 +1,18 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +#nullable enable + using System.Diagnostics; using System.Diagnostics.Metrics; using OpenTelemetry.Tests; using Xunit; -using Xunit.Abstractions; namespace OpenTelemetry.Metrics.Tests; public class MetricExemplarTests : MetricTestsBase { private const int MaxTimeToAllowForFlush = 10000; - private readonly ITestOutputHelper output; - - public MetricExemplarTests(ITestOutputHelper output) - { - this.output = output; - } [Theory] [InlineData(MetricReaderTemporalityPreference.Cumulative)] @@ -33,15 +28,21 @@ public void TestExemplarsCounter(MetricReaderTemporalityPreference temporality) using var container = this.BuildMeterProvider(out var meterProvider, builder => builder .AddMeter(meter.Name) .SetExemplarFilter(new AlwaysOnExemplarFilter()) + .AddView( + "testCounter", + new MetricStreamConfiguration + { + ExemplarReservoirFactory = () => new SimpleFixedSizeExemplarReservoir(3), + }) .AddInMemoryExporter(exportedItems, metricReaderOptions => { metricReaderOptions.TemporalityPreference = temporality; })); - var measurementValues = GenerateRandomValues(10); + var measurementValues = GenerateRandomValues(2, false, null); foreach (var value in measurementValues) { - counter.Add(value); + counter.Add(value.Value); } meterProvider.ForceFlush(MaxTimeToAllowForFlush); @@ -49,14 +50,9 @@ public void TestExemplarsCounter(MetricReaderTemporalityPreference temporality) Assert.NotNull(metricPoint); Assert.True(metricPoint.Value.StartTime >= testStartTime); Assert.True(metricPoint.Value.EndTime != default); - var exemplars = GetExemplars(metricPoint.Value); - // TODO: Modify the test to better test cumulative. - // In cumulative, where SimpleFixedSizeExemplarReservoir's size is - // more than the count of new measurements, it is possible - // that the exemplar value is for a measurement that was recorded in the prior - // cycle. The current ValidateExemplars() does not handle this case. - ValidateExemplars(exemplars, metricPoint.Value.StartTime, metricPoint.Value.EndTime, measurementValues, false); + var exemplars = GetExemplars(metricPoint.Value); + ValidateExemplars(exemplars, metricPoint.Value.StartTime, metricPoint.Value.EndTime, measurementValues); exportedItems.Clear(); @@ -64,12 +60,11 @@ public void TestExemplarsCounter(MetricReaderTemporalityPreference temporality) Thread.Sleep(10); // Compensates for low resolution timing in netfx. #endif - measurementValues = GenerateRandomValues(10); - foreach (var value in measurementValues) + var secondMeasurementValues = GenerateRandomValues(1, true, measurementValues); + foreach (var value in secondMeasurementValues) { - var act = new Activity("test").Start(); - counter.Add(value); - act.Stop(); + using var act = new Activity("test").Start(); + counter.Add(value.Value); } meterProvider.ForceFlush(MaxTimeToAllowForFlush); @@ -77,12 +72,29 @@ public void TestExemplarsCounter(MetricReaderTemporalityPreference temporality) Assert.NotNull(metricPoint); Assert.True(metricPoint.Value.StartTime >= testStartTime); Assert.True(metricPoint.Value.EndTime != default); + exemplars = GetExemplars(metricPoint.Value); - ValidateExemplars(exemplars, metricPoint.Value.StartTime, metricPoint.Value.EndTime, measurementValues, true); + + if (temporality == MetricReaderTemporalityPreference.Cumulative) + { + // Current design: + // First collect we saw Exemplar A & B + // Second collect we saw Exemplar C but B remained in the reservoir + Assert.Equal(2, exemplars.Count); + secondMeasurementValues = secondMeasurementValues.Concat(measurementValues.Skip(1).Take(1)).ToArray(); + } + else + { + Assert.Single(exemplars); + } + + ValidateExemplars(exemplars, metricPoint.Value.StartTime, metricPoint.Value.EndTime, secondMeasurementValues); } - [Fact] - public void TestExemplarsHistogram() + [Theory] + [InlineData(MetricReaderTemporalityPreference.Cumulative)] + [InlineData(MetricReaderTemporalityPreference.Delta)] + public void TestExemplarsHistogram(MetricReaderTemporalityPreference temporality) { DateTime testStartTime = DateTime.UtcNow; var exportedItems = new List(); @@ -90,18 +102,30 @@ public void TestExemplarsHistogram() using var meter = new Meter($"{Utils.GetCurrentMethodName()}"); var histogram = meter.CreateHistogram("testHistogram"); + var buckets = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + using var container = this.BuildMeterProvider(out var meterProvider, builder => builder .AddMeter(meter.Name) .SetExemplarFilter(new AlwaysOnExemplarFilter()) + .AddView( + "testHistogram", + new ExplicitBucketHistogramConfiguration + { + Boundaries = buckets, + }) .AddInMemoryExporter(exportedItems, metricReaderOptions => { - metricReaderOptions.TemporalityPreference = MetricReaderTemporalityPreference.Delta; + metricReaderOptions.TemporalityPreference = temporality; })); - var measurementValues = GenerateRandomValues(10); + var measurementValues = buckets + /* 2000 is here to test overflow measurement */ + .Concat(new double[] { 2000 }) + .Select(b => (Value: b, ExpectTraceId: false)) + .ToArray(); foreach (var value in measurementValues) { - histogram.Record(value); + histogram.Record(value.Value); } meterProvider.ForceFlush(MaxTimeToAllowForFlush); @@ -109,8 +133,9 @@ public void TestExemplarsHistogram() Assert.NotNull(metricPoint); Assert.True(metricPoint.Value.StartTime >= testStartTime); Assert.True(metricPoint.Value.EndTime != default); + var exemplars = GetExemplars(metricPoint.Value); - ValidateExemplars(exemplars, metricPoint.Value.StartTime, metricPoint.Value.EndTime, measurementValues, false); + ValidateExemplars(exemplars, metricPoint.Value.StartTime, metricPoint.Value.EndTime, measurementValues); exportedItems.Clear(); @@ -118,11 +143,11 @@ public void TestExemplarsHistogram() Thread.Sleep(10); // Compensates for low resolution timing in netfx. #endif - measurementValues = GenerateRandomValues(10); - foreach (var value in measurementValues) + var secondMeasurementValues = buckets.Take(1).Select(b => (Value: b, ExpectTraceId: true)).ToArray(); + foreach (var value in secondMeasurementValues) { using var act = new Activity("test").Start(); - histogram.Record(value); + histogram.Record(value.Value); } meterProvider.ForceFlush(MaxTimeToAllowForFlush); @@ -130,8 +155,20 @@ public void TestExemplarsHistogram() Assert.NotNull(metricPoint); Assert.True(metricPoint.Value.StartTime >= testStartTime); Assert.True(metricPoint.Value.EndTime != default); + exemplars = GetExemplars(metricPoint.Value); - ValidateExemplars(exemplars, metricPoint.Value.StartTime, metricPoint.Value.EndTime, measurementValues, true); + + if (temporality == MetricReaderTemporalityPreference.Cumulative) + { + Assert.Equal(11, exemplars.Count); + secondMeasurementValues = secondMeasurementValues.Concat(measurementValues.Skip(1)).ToArray(); + } + else + { + Assert.Single(exemplars); + } + + ValidateExemplars(exemplars, metricPoint.Value.StartTime, metricPoint.Value.EndTime, secondMeasurementValues); } [Fact] @@ -152,10 +189,14 @@ public void TestExemplarsFilterTags() metricReaderOptions.TemporalityPreference = MetricReaderTemporalityPreference.Delta; })); - var measurementValues = GenerateRandomValues(10); + var measurementValues = GenerateRandomValues(10, false, null); foreach (var value in measurementValues) { - histogram.Record(value, new("key1", "value1"), new("key2", "value1"), new("key3", "value1")); + histogram.Record( + value.Value, + new("key1", "value1"), + new("key2", "value1"), + new("key3", "value1")); } meterProvider.ForceFlush(MaxTimeToAllowForFlush); @@ -176,27 +217,45 @@ public void TestExemplarsFilterTags() } } - private static double[] GenerateRandomValues(int count) + private static (double Value, bool ExpectTraceId)[] GenerateRandomValues( + int count, + bool expectTraceId, + (double Value, bool ExpectTraceId)[]? previousValues) { var random = new Random(); - var values = new double[count]; + var values = new (double, bool)[count]; for (int i = 0; i < count; i++) { - values[i] = random.NextDouble(); + var nextValue = random.NextDouble(); + if (values.Any(m => m.Item1 == nextValue) + || previousValues?.Any(m => m.Value == nextValue) == true) + { + i--; + continue; + } + + values[i] = (nextValue, expectTraceId); } return values; } - private static void ValidateExemplars(IReadOnlyList exemplars, DateTimeOffset startTime, DateTimeOffset endTime, double[] measurementValues, bool traceContextExists) + private static void ValidateExemplars( + IReadOnlyList exemplars, + DateTimeOffset startTime, + DateTimeOffset endTime, + (double Value, bool ExpectTraceId)[] measurementValues) { - Assert.NotNull(exemplars); + int count = 0; + foreach (var exemplar in exemplars) { Assert.True(exemplar.Timestamp >= startTime && exemplar.Timestamp <= endTime, $"{startTime} < {exemplar.Timestamp} < {endTime}"); - Assert.Contains(exemplar.DoubleValue, measurementValues); Assert.Equal(0, exemplar.FilteredTags.MaximumCount); - if (traceContextExists) + + var measurement = measurementValues.FirstOrDefault(v => v.Value == exemplar.DoubleValue); + Assert.NotEqual(default, measurement); + if (measurement.ExpectTraceId) { Assert.NotEqual(default, exemplar.TraceId); Assert.NotEqual(default, exemplar.SpanId); @@ -206,6 +265,10 @@ private static void ValidateExemplars(IReadOnlyList exemplars, DateTim Assert.Equal(default, exemplar.TraceId); Assert.Equal(default, exemplar.SpanId); } + + count++; } + + Assert.Equal(measurementValues.Length, count); } } From 6f3070a69e02860d00f83fd2090a710ee3bdbee9 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Tue, 27 Feb 2024 12:24:01 -0800 Subject: [PATCH 2/2] Nits --- src/OpenTelemetry/Metrics/MetricPoint.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/OpenTelemetry/Metrics/MetricPoint.cs b/src/OpenTelemetry/Metrics/MetricPoint.cs index 3766af73e44..8e76e83778c 100644 --- a/src/OpenTelemetry/Metrics/MetricPoint.cs +++ b/src/OpenTelemetry/Metrics/MetricPoint.cs @@ -72,7 +72,7 @@ internal MetricPoint( } catch { - // todo: Log that the factory on view threw an exception + // TODO : Log that the factory on view threw an exception, once view exposes that capability reservoir = null; } @@ -81,9 +81,9 @@ internal MetricPoint( { this.mpComponents = new MetricPointOptionalComponents(); this.mpComponents.HistogramBuckets = new HistogramBuckets(histogramExplicitBounds); - if (isExemplarEnabled) + if (isExemplarEnabled && reservoir == null) { - reservoir ??= new AlignedHistogramBucketExemplarReservoir(histogramExplicitBounds!.Length); + reservoir = new AlignedHistogramBucketExemplarReservoir(histogramExplicitBounds!.Length); } } else if (this.aggType == AggregationType.Histogram ||