From 331c6af8d6e12de52e8125b9776f2b94f4a73234 Mon Sep 17 00:00:00 2001 From: jack-berg <34418638+jack-berg@users.noreply.github.com> Date: Thu, 8 Jun 2023 13:20:19 -0500 Subject: [PATCH] Experimental metric reader and view cardinality limits (#5494) --- .../sdk/OpenTelemetrySdkTest.java | 2 +- .../sdk/metrics/SdkMeterProvider.java | 12 +- .../sdk/metrics/SdkMeterProviderBuilder.java | 26 +- .../io/opentelemetry/sdk/metrics/View.java | 10 +- .../sdk/metrics/ViewBuilder.java | 20 +- .../internal/SdkMeterProviderUtil.java | 41 ++- .../export/CardinalityLimitSelector.java | 39 +++ .../state/AsynchronousMetricStorage.java | 12 +- .../DefaultSynchronousMetricStorage.java | 13 +- .../metrics/internal/state/MetricStorage.java | 4 +- .../state/SynchronousMetricStorage.java | 3 +- .../metrics/internal/view/RegisteredView.java | 7 +- .../metrics/internal/view/ViewRegistry.java | 22 +- .../sdk/metrics/CardinalityTest.java | 248 ++++++++++++++++-- .../opentelemetry/sdk/metrics/ViewTest.java | 10 +- .../state/AsynchronousMetricStorageTest.java | 14 +- .../state/SynchronousMetricStorageTest.java | 67 +++-- .../internal/view/RegisteredViewTest.java | 4 +- .../internal/view/ViewRegistryTest.java | 34 ++- 19 files changed, 499 insertions(+), 89 deletions(-) create mode 100644 sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/CardinalityLimitSelector.java diff --git a/sdk/all/src/test/java/io/opentelemetry/sdk/OpenTelemetrySdkTest.java b/sdk/all/src/test/java/io/opentelemetry/sdk/OpenTelemetrySdkTest.java index d3ec82b5f8c..09e1a402671 100644 --- a/sdk/all/src/test/java/io/opentelemetry/sdk/OpenTelemetrySdkTest.java +++ b/sdk/all/src/test/java/io/opentelemetry/sdk/OpenTelemetrySdkTest.java @@ -421,7 +421,7 @@ void stringRepresentation() { + "clock=SystemClock{}, " + "resource=Resource{schemaUrl=null, attributes={service.name=\"otel-test\"}}, " + "metricReaders=[PeriodicMetricReader{exporter=MockMetricExporter{}, intervalNanos=60000000000}], " - + "views=[RegisteredView{instrumentSelector=InstrumentSelector{instrumentName=instrument}, view=View{name=new-instrument, aggregation=DefaultAggregation, attributesProcessor=NoopAttributesProcessor{}}}]" + + "views=[RegisteredView{instrumentSelector=InstrumentSelector{instrumentName=instrument}, view=View{name=new-instrument, aggregation=DefaultAggregation, attributesProcessor=NoopAttributesProcessor{}, cardinalityLimit=2000}}]" + "}, " + "loggerProvider=SdkLoggerProvider{" + "clock=SystemClock{}, " diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java index 5061c55ff69..431fc41ab30 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProvider.java @@ -16,6 +16,7 @@ import io.opentelemetry.sdk.metrics.export.MetricReader; import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; +import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector; import io.opentelemetry.sdk.metrics.internal.export.MetricProducer; import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader; import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState; @@ -26,6 +27,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -54,17 +56,19 @@ public static SdkMeterProviderBuilder builder() { SdkMeterProvider( List registeredViews, - List metricReaders, + IdentityHashMap metricReaders, Clock clock, Resource resource, ExemplarFilter exemplarFilter) { long startEpochNanos = clock.now(); this.registeredViews = registeredViews; this.registeredReaders = - metricReaders.stream() + metricReaders.entrySet().stream() .map( - reader -> - RegisteredReader.create(reader, ViewRegistry.create(reader, registeredViews))) + entry -> + RegisteredReader.create( + entry.getKey(), + ViewRegistry.create(entry.getKey(), entry.getValue(), registeredViews))) .collect(toList()); this.sharedState = MeterProviderSharedState.create(clock, resource, exemplarFilter, startEpochNanos); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProviderBuilder.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProviderBuilder.java index 145ebd567f2..736ba9597c2 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProviderBuilder.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeterProviderBuilder.java @@ -10,9 +10,11 @@ import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; +import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector; import io.opentelemetry.sdk.metrics.internal.view.RegisteredView; import io.opentelemetry.sdk.resources.Resource; import java.util.ArrayList; +import java.util.IdentityHashMap; import java.util.List; import java.util.Objects; @@ -32,7 +34,8 @@ public final class SdkMeterProviderBuilder { private Clock clock = Clock.getDefault(); private Resource resource = Resource.getDefault(); - private final List metricReaders = new ArrayList<>(); + private final IdentityHashMap metricReaders = + new IdentityHashMap<>(); private final List registeredViews = new ArrayList<>(); private ExemplarFilter exemplarFilter = DEFAULT_EXEMPLAR_FILTER; @@ -96,7 +99,11 @@ public SdkMeterProviderBuilder registerView(InstrumentSelector selector, View vi Objects.requireNonNull(view, "view"); registeredViews.add( RegisteredView.create( - selector, view, view.getAttributesProcessor(), SourceInfo.fromCurrentStack())); + selector, + view, + view.getAttributesProcessor(), + view.getCardinalityLimit(), + SourceInfo.fromCurrentStack())); return this; } @@ -106,7 +113,20 @@ public SdkMeterProviderBuilder registerView(InstrumentSelector selector, View vi *

Note: custom implementations of {@link MetricReader} are not currently supported. */ public SdkMeterProviderBuilder registerMetricReader(MetricReader reader) { - metricReaders.add(reader); + metricReaders.put(reader, CardinalityLimitSelector.defaultCardinalityLimitSelector()); + return this; + } + + /** + * Registers a {@link MetricReader} with a {@link CardinalityLimitSelector}. + * + *

Note: not currently stable but available for experimental use via {@link + * SdkMeterProviderUtil#registerMetricReaderWithCardinalitySelector(SdkMeterProviderBuilder, + * MetricReader, CardinalityLimitSelector)}. + */ + SdkMeterProviderBuilder registerMetricReader( + MetricReader reader, CardinalityLimitSelector cardinalityLimitSelector) { + metricReaders.put(reader, cardinalityLimitSelector); return this; } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/View.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/View.java index 5f1e2d71277..e545b716459 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/View.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/View.java @@ -32,8 +32,10 @@ static View create( @Nullable String name, @Nullable String description, Aggregation aggregation, - AttributesProcessor attributesProcessor) { - return new AutoValue_View(name, description, aggregation, attributesProcessor); + AttributesProcessor attributesProcessor, + int cardinalityLimit) { + return new AutoValue_View( + name, description, aggregation, attributesProcessor, cardinalityLimit); } View() {} @@ -58,6 +60,9 @@ static View create( /** Returns the attribute processor used for this view. */ abstract AttributesProcessor getAttributesProcessor(); + /** Returns the cardinality limit for this view. */ + abstract int getCardinalityLimit(); + @Override public final String toString() { StringJoiner joiner = new StringJoiner(", ", "View{", "}"); @@ -69,6 +74,7 @@ public final String toString() { } joiner.add("aggregation=" + getAggregation()); joiner.add("attributesProcessor=" + getAttributesProcessor()); + joiner.add("cardinalityLimit=" + getCardinalityLimit()); return joiner.toString(); } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewBuilder.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewBuilder.java index 4150292d271..7db445651c9 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewBuilder.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/ViewBuilder.java @@ -7,6 +7,7 @@ import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; +import io.opentelemetry.sdk.metrics.internal.state.MetricStorage; import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor; import java.util.Objects; import java.util.function.Predicate; @@ -23,6 +24,7 @@ public final class ViewBuilder { @Nullable private String description; private Aggregation aggregation = Aggregation.defaultAggregation(); private AttributesProcessor processor = AttributesProcessor.noop(); + private int cardinalityLimit = MetricStorage.DEFAULT_MAX_CARDINALITY; ViewBuilder() {} @@ -85,8 +87,24 @@ ViewBuilder addAttributesProcessor(AttributesProcessor attributesProcessor) { return this; } + /** + * Set the cardinality limit. + * + *

Note: not currently stable but cardinality limit can be configured via + * SdkMeterProviderUtil#setCardinalityLimit(ViewBuilder, int)}. + * + * @param cardinalityLimit the maximum number of series for a metric + */ + ViewBuilder setCardinalityLimit(int cardinalityLimit) { + if (cardinalityLimit <= 0) { + throw new IllegalArgumentException("cardinalityLimit must be > 0"); + } + this.cardinalityLimit = cardinalityLimit; + return this; + } + /** Returns a {@link View} with the configuration of this builder. */ public View build() { - return View.create(name, description, aggregation, processor); + return View.create(name, description, aggregation, processor, cardinalityLimit); } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/SdkMeterProviderUtil.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/SdkMeterProviderUtil.java index eb89bc35435..ecaf5e388ca 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/SdkMeterProviderUtil.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/SdkMeterProviderUtil.java @@ -8,7 +8,9 @@ import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; import io.opentelemetry.sdk.metrics.ViewBuilder; +import io.opentelemetry.sdk.metrics.export.MetricReader; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; +import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector; import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor; import io.opentelemetry.sdk.metrics.internal.view.StringPredicates; import java.lang.reflect.InvocationTargetException; @@ -26,7 +28,7 @@ private SdkMeterProviderUtil() {} /** * Reflectively assign the {@link ExemplarFilter} to the {@link SdkMeterProviderBuilder}. * - * @param sdkMeterProviderBuilder the + * @param sdkMeterProviderBuilder the builder */ public static void setExemplarFilter( SdkMeterProviderBuilder sdkMeterProviderBuilder, ExemplarFilter exemplarFilter) { @@ -42,6 +44,28 @@ public static void setExemplarFilter( } } + /** + * Reflectively add a {@link MetricReader} with the {@link CardinalityLimitSelector} to the {@link + * SdkMeterProviderBuilder}. + * + * @param sdkMeterProviderBuilder the builder + */ + public static void registerMetricReaderWithCardinalitySelector( + SdkMeterProviderBuilder sdkMeterProviderBuilder, + MetricReader metricReader, + CardinalityLimitSelector cardinalityLimitSelector) { + try { + Method method = + SdkMeterProviderBuilder.class.getDeclaredMethod( + "registerMetricReader", MetricReader.class, CardinalityLimitSelector.class); + method.setAccessible(true); + method.invoke(sdkMeterProviderBuilder, metricReader, cardinalityLimitSelector); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new IllegalStateException( + "Error calling addMetricReader on SdkMeterProviderBuilder", e); + } + } + /** * Reflectively add an {@link AttributesProcessor} to the {@link ViewBuilder} which appends * key-values from baggage to all measurements. @@ -81,6 +105,21 @@ private static void addAttributesProcessor( } } + /** + * Reflectively set the {@code cardinalityLimit} on the {@link ViewBuilder}. + * + * @param viewBuilder the builder + */ + public static void setCardinalityLimit(ViewBuilder viewBuilder, int cardinalityLimit) { + try { + Method method = ViewBuilder.class.getDeclaredMethod("setCardinalityLimit", int.class); + method.setAccessible(true); + method.invoke(viewBuilder, cardinalityLimit); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new IllegalStateException("Error setting cardinalityLimit on ViewBuilder", e); + } + } + /** Reflectively reset the {@link SdkMeterProvider}, clearing all registered instruments. */ public static void resetForTest(SdkMeterProvider sdkMeterProvider) { try { diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/CardinalityLimitSelector.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/CardinalityLimitSelector.java new file mode 100644 index 00000000000..44f155162c9 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/export/CardinalityLimitSelector.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.export; + +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; +import io.opentelemetry.sdk.metrics.internal.state.MetricStorage; + +/** + * Customize the {@link io.opentelemetry.sdk.metrics.export.MetricReader} cardinality limit as a + * function of {@link InstrumentType}. Register via {@link + * SdkMeterProviderUtil#registerMetricReaderWithCardinalitySelector(SdkMeterProviderBuilder, + * MetricReader, CardinalityLimitSelector)}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +@FunctionalInterface +public interface CardinalityLimitSelector { + + /** + * The default {@link CardinalityLimitSelector}, allowing each metric to have {@code 2000} points. + */ + static CardinalityLimitSelector defaultCardinalityLimitSelector() { + return unused -> MetricStorage.DEFAULT_MAX_CARDINALITY; + } + + /** + * Return the default cardinality limit for metrics from instruments of type {@code + * instrumentType}. The cardinality limit dictates the maximum number of distinct points (or time + * series) for the metric. + */ + int getCardinalityLimit(InstrumentType instrumentType); +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java index 050a794725f..cd3ae6d4162 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java @@ -46,6 +46,7 @@ final class AsynchronousMetricStorage aggregator; private final AttributesProcessor attributesProcessor; + private final int maxCardinality; private Map points = new HashMap<>(); private Map lastPoints = new HashMap<>(); // Only populated if aggregationTemporality == DELTA @@ -54,7 +55,8 @@ private AsynchronousMetricStorage( RegisteredReader registeredReader, MetricDescriptor metricDescriptor, Aggregator aggregator, - AttributesProcessor attributesProcessor) { + AttributesProcessor attributesProcessor, + int maxCardinality) { this.registeredReader = registeredReader; this.metricDescriptor = metricDescriptor; this.aggregationTemporality = @@ -63,6 +65,7 @@ private AsynchronousMetricStorage( .getAggregationTemporality(metricDescriptor.getSourceInstrument().getType()); this.aggregator = aggregator; this.attributesProcessor = attributesProcessor; + this.maxCardinality = maxCardinality; } /** @@ -83,7 +86,8 @@ static AsynchronousMetricStorage= MetricStorage.MAX_CARDINALITY) { + if (points.size() >= maxCardinality) { throttlingLogger.log( Level.WARNING, "Instrument " + metricDescriptor.getSourceInstrument().getName() + " has exceeded the maximum allowed cardinality (" - + MetricStorage.MAX_CARDINALITY + + maxCardinality + ")."); return; } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index b329af3c892..d4b285ce435 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -50,6 +50,7 @@ public final class DefaultSynchronousMetricStorage> aggregatorHandles = new ConcurrentHashMap<>(); private final AttributesProcessor attributesProcessor; + private final int maxCardinality; private final ConcurrentLinkedQueue> aggregatorHandlePool = new ConcurrentLinkedQueue<>(); @@ -57,7 +58,8 @@ public final class DefaultSynchronousMetricStorage aggregator, - AttributesProcessor attributesProcessor) { + AttributesProcessor attributesProcessor, + int maxCardinality) { this.registeredReader = registeredReader; this.metricDescriptor = metricDescriptor; this.aggregationTemporality = @@ -66,6 +68,7 @@ public final class DefaultSynchronousMetricStorage getAggregatorHandle(Attributes attributes, Contex if (handle != null) { return handle; } - if (aggregatorHandles.size() >= MAX_CARDINALITY) { + if (aggregatorHandles.size() >= maxCardinality) { logger.log( Level.WARNING, "Instrument " + metricDescriptor.getSourceInstrument().getName() + " has exceeded the maximum allowed cardinality (" - + MAX_CARDINALITY + + maxCardinality + ")."); return null; } @@ -143,9 +146,9 @@ public MetricData collect( } }); - // Trim pool down if needed. pool.size() will only exceed MAX_CARDINALITY if new handles are + // Trim pool down if needed. pool.size() will only exceed maxCardinality if new handles are // created during collection. - int toDelete = aggregatorHandlePool.size() - MAX_CARDINALITY; + int toDelete = aggregatorHandlePool.size() - maxCardinality; for (int i = 0; i < toDelete; i++) { aggregatorHandlePool.poll(); } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java index 55c99a3e878..f2e394eb87f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorage.java @@ -19,8 +19,8 @@ */ public interface MetricStorage { - /** The max number of distinct metric points for a particular {@link MetricStorage}. */ - int MAX_CARDINALITY = 2000; + /** The default max number of distinct metric points for a particular {@link MetricStorage}. */ + int DEFAULT_MAX_CARDINALITY = 2000; /** Returns a description of the metric produced in this storage. */ MetricDescriptor getMetricDescriptor(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java index fc8cf3ff3e9..3b821367343 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorage.java @@ -55,6 +55,7 @@ static SynchronousMetricStorage cr registeredReader, metricDescriptor, aggregator, - registeredView.getViewAttributesProcessor()); + registeredView.getViewAttributesProcessor(), + registeredView.getCardinalityLimit()); } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/RegisteredView.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/RegisteredView.java index 43384eaaa62..df63d59f997 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/RegisteredView.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/RegisteredView.java @@ -25,8 +25,10 @@ public static RegisteredView create( InstrumentSelector selector, View view, AttributesProcessor viewAttributesProcessor, + int cardinalityLimit, SourceInfo viewSourceInfo) { - return new AutoValue_RegisteredView(selector, view, viewAttributesProcessor, viewSourceInfo); + return new AutoValue_RegisteredView( + selector, view, viewAttributesProcessor, cardinalityLimit, viewSourceInfo); } RegisteredView() {} @@ -40,6 +42,9 @@ public static RegisteredView create( /** The view's {@link AttributesProcessor}. */ public abstract AttributesProcessor getViewAttributesProcessor(); + /** The view's cardinality limit. */ + public abstract int getCardinalityLimit(); + /** The {@link SourceInfo} from where the view was registered. */ public abstract SourceInfo getViewSourceInfo(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/ViewRegistry.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/ViewRegistry.java index b95a6524a4f..3bba1153488 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/ViewRegistry.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/ViewRegistry.java @@ -18,6 +18,8 @@ import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; +import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector; +import io.opentelemetry.sdk.metrics.internal.state.MetricStorage; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -45,6 +47,7 @@ public final class ViewRegistry { InstrumentSelector.builder().setName("*").build(), DEFAULT_VIEW, NOOP, + MetricStorage.DEFAULT_MAX_CARDINALITY, SourceInfo.noSourceInfo()); private static final Logger logger = Logger.getLogger(ViewRegistry.class.getName()); @@ -52,7 +55,9 @@ public final class ViewRegistry { private final List registeredViews; ViewRegistry( - DefaultAggregationSelector defaultAggregationSelector, List registeredViews) { + DefaultAggregationSelector defaultAggregationSelector, + CardinalityLimitSelector cardinalityLimitSelector, + List registeredViews) { instrumentDefaultRegisteredView = new HashMap<>(); for (InstrumentType instrumentType : InstrumentType.values()) { instrumentDefaultRegisteredView.put( @@ -63,6 +68,7 @@ public final class ViewRegistry { .setAggregation(defaultAggregationSelector.getDefaultAggregation(instrumentType)) .build(), AttributesProcessor.noop(), + cardinalityLimitSelector.getCardinalityLimit(instrumentType), SourceInfo.noSourceInfo())); } this.registeredViews = registeredViews; @@ -70,13 +76,19 @@ public final class ViewRegistry { /** Returns a {@link ViewRegistry}. */ public static ViewRegistry create( - DefaultAggregationSelector defaultAggregationSelector, List registeredViews) { - return new ViewRegistry(defaultAggregationSelector, new ArrayList<>(registeredViews)); + DefaultAggregationSelector defaultAggregationSelector, + CardinalityLimitSelector cardinalityLimitSelector, + List registeredViews) { + return new ViewRegistry( + defaultAggregationSelector, cardinalityLimitSelector, new ArrayList<>(registeredViews)); } /** Return a {@link ViewRegistry} using the default aggregation and no views registered. */ public static ViewRegistry create() { - return create(unused -> Aggregation.defaultAggregation(), Collections.emptyList()); + return create( + unused -> Aggregation.defaultAggregation(), + CardinalityLimitSelector.defaultCardinalityLimitSelector(), + Collections.emptyList()); } /** @@ -113,7 +125,7 @@ public List findViews( return Collections.unmodifiableList(result); } - // Not views matched, use default view + // No views matched, use default view RegisteredView instrumentDefaultView = Objects.requireNonNull(instrumentDefaultRegisteredView.get(descriptor.getType())); AggregatorFactory viewAggregatorFactory = diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java index edc1de95dc3..605fe315ce9 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/CardinalityTest.java @@ -5,16 +5,24 @@ package io.opentelemetry.sdk.metrics; +import static io.opentelemetry.sdk.metrics.internal.state.MetricStorage.DEFAULT_MAX_CARDINALITY; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableLongMeasurement; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; +import io.opentelemetry.sdk.metrics.data.Data; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.SumData; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; +import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector; import io.opentelemetry.sdk.metrics.internal.state.DefaultSynchronousMetricStorage; +import io.opentelemetry.sdk.metrics.internal.state.MetricStorage; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -26,9 +34,6 @@ @SuppressLogger(DefaultSynchronousMetricStorage.class) class CardinalityTest { - /** Traces {@code MetricStorageUtils#MAX_CARDINALITY}. */ - private static final int MAX_CARDINALITY = 2000; - private InMemoryMetricReader deltaReader; private InMemoryMetricReader cumulativeReader; private Meter meter; @@ -50,14 +55,14 @@ void setup() { * are dropped for delta and cumulative readers. Stale metrics are those with attributes that did * not receive recordings in the most recent collection. * - *

Effectively, we make sure we cap-out at attribute size = 2000 (constant in - * MetricStorageutils). + *

Effectively, we make sure we cap-out at attribute size = {@link + * MetricStorage#DEFAULT_MAX_CARDINALITY}. */ @Test void staleMetricsDropped_synchronousInstrument() { LongCounter syncCounter = meter.counterBuilder("sync-counter").build(); // Note: This constant comes from MetricStorageUtils, but it's package-private. - for (int i = 1; i <= 2000; i++) { + for (int i = 1; i <= DEFAULT_MAX_CARDINALITY; i++) { syncCounter.add(1, Attributes.builder().put("key", "num_" + i).build()); // DELTA reader only has latest @@ -87,7 +92,7 @@ void staleMetricsDropped_synchronousInstrument() { .isEqualTo(currentSize)))); } // Now punch the limit and ONLY metrics we just recorded stay, due to simplistic GC. - for (int i = 2001; i <= 2010; i++) { + for (int i = DEFAULT_MAX_CARDINALITY + 1; i <= DEFAULT_MAX_CARDINALITY + 10; i++) { syncCounter.add(1, Attributes.builder().put("key", "num_" + i).build()); } assertThat(deltaReader.collectAllMetrics()) @@ -118,7 +123,7 @@ void staleMetricsDropped_synchronousInstrument() { (Consumer>) sumPointData -> assertThat(sumPointData.getPoints().size()) - .isEqualTo(2000)))); + .isEqualTo(DEFAULT_MAX_CARDINALITY)))); } /** @@ -165,7 +170,7 @@ void staleMetricsDropped_asynchronousInstrument() { void cardinalityLimits_synchronousInstrument() { LongCounter syncCounter1 = meter.counterBuilder("sync-counter1").build(); LongCounter syncCounter2 = meter.counterBuilder("sync-counter2").build(); - for (int i = 0; i < MAX_CARDINALITY + 1; i++) { + for (int i = 0; i < DEFAULT_MAX_CARDINALITY + 1; i++) { syncCounter1.add(1, Attributes.builder().put("key", "value" + i).build()); syncCounter2.add(1, Attributes.builder().put("key", "value" + i).build()); } @@ -183,7 +188,7 @@ void cardinalityLimits_synchronousInstrument() { (Consumer>) sumPointData -> assertThat(sumPointData.getPoints().size()) - .isEqualTo(MAX_CARDINALITY))), + .isEqualTo(DEFAULT_MAX_CARDINALITY))), metricData -> assertThat(metricData) .hasName("sync-counter2") @@ -194,7 +199,7 @@ void cardinalityLimits_synchronousInstrument() { (Consumer>) sumPointData -> assertThat(sumPointData.getPoints().size()) - .isEqualTo(MAX_CARDINALITY)))); + .isEqualTo(DEFAULT_MAX_CARDINALITY)))); assertThat(cumulativeReader.collectAllMetrics()) .as("Cumulative collection") @@ -209,7 +214,7 @@ void cardinalityLimits_synchronousInstrument() { (Consumer>) sumPointData -> assertThat(sumPointData.getPoints().size()) - .isEqualTo(MAX_CARDINALITY))), + .isEqualTo(DEFAULT_MAX_CARDINALITY))), metricData -> assertThat(metricData) .hasName("sync-counter2") @@ -220,7 +225,7 @@ void cardinalityLimits_synchronousInstrument() { (Consumer>) sumPointData -> assertThat(sumPointData.getPoints().size()) - .isEqualTo(MAX_CARDINALITY)))); + .isEqualTo(DEFAULT_MAX_CARDINALITY)))); } /** @@ -231,7 +236,7 @@ void cardinalityLimits_synchronousInstrument() { void cardinalityLimits_asynchronousInstrument() { Consumer callback = measurement -> { - for (int i = 0; i < MAX_CARDINALITY + 1; i++) { + for (int i = 0; i < DEFAULT_MAX_CARDINALITY + 1; i++) { measurement.record(1, Attributes.builder().put("key", "value" + i).build()); } }; @@ -251,7 +256,7 @@ void cardinalityLimits_asynchronousInstrument() { (Consumer>) sumPointData -> assertThat(sumPointData.getPoints().size()) - .isEqualTo(MAX_CARDINALITY))), + .isEqualTo(DEFAULT_MAX_CARDINALITY))), metricData -> assertThat(metricData) .hasName("async-counter2") @@ -262,7 +267,7 @@ void cardinalityLimits_asynchronousInstrument() { (Consumer>) sumPointData -> assertThat(sumPointData.getPoints().size()) - .isEqualTo(MAX_CARDINALITY)))); + .isEqualTo(DEFAULT_MAX_CARDINALITY)))); assertThat(cumulativeReader.collectAllMetrics()) .as("Cumulative collection") @@ -277,7 +282,7 @@ void cardinalityLimits_asynchronousInstrument() { (Consumer>) sumPointData -> assertThat(sumPointData.getPoints().size()) - .isEqualTo(MAX_CARDINALITY))), + .isEqualTo(DEFAULT_MAX_CARDINALITY))), metricData -> assertThat(metricData) .hasName("async-counter2") @@ -288,6 +293,213 @@ void cardinalityLimits_asynchronousInstrument() { (Consumer>) sumPointData -> assertThat(sumPointData.getPoints().size()) - .isEqualTo(MAX_CARDINALITY)))); + .isEqualTo(DEFAULT_MAX_CARDINALITY)))); + } + + /** + * Validate ability to customize metric reader cardinality limits via {@link + * SdkMeterProviderBuilder#registerMetricReader(MetricReader, CardinalityLimitSelector)}, and view + * cardinality limits via {@link ViewBuilder#setCardinalityLimit(int)}. + */ + @Test + void readerAndViewCardinalityConfiguration() { + int counterLimit = 10; + int generalLimit = 20; + int counter2Limit = 30; + + // Define a cardinality selector which has one limit for counters, and another general limit for + // other instrument kinds + CardinalityLimitSelector cardinalityLimitSelector = + instrumentType -> instrumentType == InstrumentType.COUNTER ? counterLimit : generalLimit; + SdkMeterProviderBuilder builder = SdkMeterProvider.builder(); + + // Register both the delta and cumulative reader with the customized cardinality selector + SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector( + builder, deltaReader, cardinalityLimitSelector); + SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector( + builder, cumulativeReader, cardinalityLimitSelector); + + // Register a view which defines a custom cardinality limit for instrumented named "counter2" + ViewBuilder viewBuilder = View.builder(); + SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, counter2Limit); + builder.registerView( + InstrumentSelector.builder().setName("counter2").build(), viewBuilder.build()); + + SdkMeterProvider sdkMeterProvider = builder.build(); + meter = sdkMeterProvider.get(CardinalityTest.class.getName()); + + LongCounter counter1 = meter.counterBuilder("counter1").build(); + LongCounter counter2 = meter.counterBuilder("counter2").build(); + LongHistogram histogram = meter.histogramBuilder("histogram").ofLongs().build(); + + // Record enough measurements to exceed cardinality threshold + for (int i = 0; i < DEFAULT_MAX_CARDINALITY; i++) { + counter1.add(1, Attributes.builder().put("key", i).build()); + counter2.add(1, Attributes.builder().put("key", i).build()); + histogram.record(1, Attributes.builder().put("key", i).build()); + } + + // Assert that each instrument has the appropriate number of points based on cardinality limits: + // - counter1 should have counterLimit points + // - counter2 should have counter2Limit points + // - histogram should have generalLimit points + assertThat(deltaReader.collectAllMetrics()) + .as("delta collection") + .satisfiesExactlyInAnyOrder( + metricData -> + assertThat(metricData) + .hasName("counter1") + .hasLongSumSatisfying( + sum -> + sum.isDelta() + .satisfies( + data -> pointsAssert(data, counterLimit, 0, counterLimit))), + metricData -> + assertThat(metricData) + .hasName("counter2") + .hasLongSumSatisfying( + sum -> + sum.isDelta() + .satisfies( + data -> pointsAssert(data, counter2Limit, 0, counter2Limit))), + metricData -> + assertThat(metricData) + .hasName("histogram") + .hasHistogramSatisfying( + histogramMetric -> + histogramMetric + .isDelta() + .satisfies( + data -> pointsAssert(data, generalLimit, 0, generalLimit)))); + assertThat(cumulativeReader.collectAllMetrics()) + .as("cumulative collection") + .satisfiesExactlyInAnyOrder( + metricData -> + assertThat(metricData) + .hasName("counter1") + .hasLongSumSatisfying( + sum -> + sum.isCumulative() + .satisfies( + data -> pointsAssert(data, counterLimit, 0, counterLimit))), + metricData -> + assertThat(metricData) + .hasName("counter2") + .hasLongSumSatisfying( + sum -> + sum.isCumulative() + .satisfies( + data -> pointsAssert(data, counter2Limit, 0, counter2Limit))), + metricData -> + assertThat(metricData) + .hasName("histogram") + .hasHistogramSatisfying( + histogramMetric -> + histogramMetric + .isCumulative() + .satisfies( + data -> pointsAssert(data, generalLimit, 0, generalLimit)))); + + // Record another round of measurements, again exceeding cardinality limits + for (int i = DEFAULT_MAX_CARDINALITY; i < DEFAULT_MAX_CARDINALITY * 2; i++) { + counter1.add(1, Attributes.builder().put("key", i).build()); + counter2.add(1, Attributes.builder().put("key", i).build()); + histogram.record(1, Attributes.builder().put("key", i).build()); + } + + // Delta reader should have new points, forgetting the points with measurements recorded prior + // to last collection + assertThat(deltaReader.collectAllMetrics()) + .as("delta collection") + .satisfiesExactlyInAnyOrder( + metricData -> + assertThat(metricData) + .hasName("counter1") + .hasLongSumSatisfying( + sum -> + sum.isDelta() + .satisfies( + data -> + pointsAssert( + data, + counterLimit, + DEFAULT_MAX_CARDINALITY, + DEFAULT_MAX_CARDINALITY + counterLimit))), + metricData -> + assertThat(metricData) + .hasName("counter2") + .hasLongSumSatisfying( + sum -> + sum.isDelta() + .satisfies( + data -> + pointsAssert( + data, + counter2Limit, + DEFAULT_MAX_CARDINALITY, + DEFAULT_MAX_CARDINALITY + counter2Limit))), + metricData -> + assertThat(metricData) + .hasName("histogram") + .hasHistogramSatisfying( + histogramMetric -> + histogramMetric + .isDelta() + .satisfies( + data -> + pointsAssert( + data, + generalLimit, + DEFAULT_MAX_CARDINALITY, + DEFAULT_MAX_CARDINALITY + generalLimit)))); + + // Cumulative reader should retain old points, dropping the new measurements + assertThat(cumulativeReader.collectAllMetrics()) + .as("cumulative collection") + .satisfiesExactlyInAnyOrder( + metricData -> + assertThat(metricData) + .hasName("counter1") + .hasLongSumSatisfying( + sum -> + sum.isCumulative() + .satisfies( + data -> pointsAssert(data, counterLimit, 0, counterLimit))), + metricData -> + assertThat(metricData) + .hasName("counter2") + .hasLongSumSatisfying( + sum -> + sum.isCumulative() + .satisfies( + data -> pointsAssert(data, counter2Limit, 0, counter2Limit))), + metricData -> + assertThat(metricData) + .hasName("histogram") + .hasHistogramSatisfying( + histogramMetric -> + histogramMetric + .isCumulative() + .satisfies( + data -> pointsAssert(data, generalLimit, 0, generalLimit)))); + } + + /** + * Helper function for {@link #readerAndViewCardinalityConfiguration()}. Asserts that the {@code + * data} contains the {@code expectedNumPoints}, and has the attribute "key" values in the range + * [{@code minAttributeValueInclusive}, {@code maxAttributeValueExclusive}). + */ + private static void pointsAssert( + Data data, + int expectedNumPoints, + int minAttributeValueInclusive, + int maxAttributeValueExclusive) { + assertThat(data.getPoints()) + .hasSize(expectedNumPoints) + .allSatisfy( + point -> + assertThat(point.getAttributes().get(AttributeKey.longKey("key"))) + .isGreaterThanOrEqualTo(minAttributeValueInclusive) + .isLessThan(maxAttributeValueExclusive)); } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/ViewTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/ViewTest.java index b81b46cba13..0dfaeafcfa1 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/ViewTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/ViewTest.java @@ -15,12 +15,17 @@ class ViewTest { void stringRepresentation() { assertThat(View.builder().build().toString()) .isEqualTo( - "View{aggregation=DefaultAggregation, attributesProcessor=NoopAttributesProcessor{}}"); + "View{" + + "aggregation=DefaultAggregation, " + + "attributesProcessor=NoopAttributesProcessor{}, " + + "cardinalityLimit=2000" + + "}"); assertThat( View.builder() .setName("name") .setDescription("description") .setAggregation(Aggregation.sum()) + .setCardinalityLimit(10) .build() .toString()) .isEqualTo( @@ -28,7 +33,8 @@ void stringRepresentation() { + "name=name, " + "description=description, " + "aggregation=SumAggregation, " - + "attributesProcessor=NoopAttributesProcessor{}" + + "attributesProcessor=NoopAttributesProcessor{}, " + + "cardinalityLimit=10" + "}"); } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index cdd38a9c9d1..da9008056aa 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -42,6 +42,8 @@ @ExtendWith(MockitoExtension.class) class AsynchronousMetricStorageTest { + private static final int CARDINALITY_LIMIT = 25; + @RegisterExtension LogCapturer logs = LogCapturer.create().captureForType(AsynchronousMetricStorage.class); @@ -51,7 +53,11 @@ class AsynchronousMetricStorageTest { private final InstrumentSelector selector = InstrumentSelector.builder().setName("*").build(); private final RegisteredView registeredView = RegisteredView.create( - selector, View.builder().build(), AttributesProcessor.noop(), SourceInfo.noSourceInfo()); + selector, + View.builder().build(), + AttributesProcessor.noop(), + CARDINALITY_LIMIT, + SourceInfo.noSourceInfo()); @Mock private MetricReader reader; private RegisteredReader registeredReader; @@ -149,6 +155,7 @@ void record_ProcessesAttributes() { selector, View.builder().build(), AttributesProcessor.filterByKeyName(key -> key.equals("key1")), + CARDINALITY_LIMIT, SourceInfo.noSourceInfo()), InstrumentDescriptor.create( "name", @@ -175,7 +182,7 @@ void record_ProcessesAttributes() { @Test void record_MaxCardinality() { - for (int i = 0; i <= MetricStorage.MAX_CARDINALITY + 1; i++) { + for (int i = 0; i <= CARDINALITY_LIMIT + 1; i++) { longCounterStorage.record( longMeasurement(0, 1, 1, Attributes.builder().put("key" + i, "val").build())); } @@ -183,8 +190,7 @@ void record_MaxCardinality() { assertThat(longCounterStorage.collect(resource, scope, 0, testClock.nanoTime())) .satisfies( metricData -> - assertThat(metricData.getLongSumData().getPoints()) - .hasSize(MetricStorage.MAX_CARDINALITY)); + assertThat(metricData.getLongSumData().getPoints()).hasSize(CARDINALITY_LIMIT)); logs.assertContains("Instrument long-counter has exceeded the maximum allowed cardinality"); } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index 3104f387806..ebb9adfee2e 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -53,6 +53,7 @@ public class SynchronousMetricStorageTest { Advice.empty()); private static final MetricDescriptor METRIC_DESCRIPTOR = MetricDescriptor.create("name", "description", "unit"); + private static final int CARDINALITY_LIMIT = 25; @RegisterExtension LogCapturer logs = LogCapturer.create().captureForType(DefaultSynchronousMetricStorage.class); @@ -76,7 +77,11 @@ void attributesProcessor_applied() { AttributesProcessor spyAttributesProcessor = spy(attributesProcessor); SynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( - cumulativeReader, METRIC_DESCRIPTOR, aggregator, spyAttributesProcessor); + cumulativeReader, + METRIC_DESCRIPTOR, + aggregator, + spyAttributesProcessor, + CARDINALITY_LIMIT); storage.recordDouble(1, attributes, Context.root()); MetricData md = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, testClock.now()); assertThat(md) @@ -92,7 +97,11 @@ void attributesProcessor_applied() { void recordAndCollect_CumulativeDoesNotReset() { DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( - cumulativeReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor); + cumulativeReader, + METRIC_DESCRIPTOR, + aggregator, + attributesProcessor, + CARDINALITY_LIMIT); // Record measurement and collect at time 10 storage.recordDouble(3, Attributes.empty(), Context.current()); @@ -134,7 +143,7 @@ void recordAndCollect_CumulativeDoesNotReset() { void recordAndCollect_DeltaResets() { DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( - deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor); + deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT); // Record measurement and collect at time 10 storage.recordDouble(3, Attributes.empty(), Context.current()); @@ -181,14 +190,18 @@ void recordAndCollect_DeltaResets() { void recordAndCollect_CumulativeAtLimit() { DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( - cumulativeReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor); + cumulativeReader, + METRIC_DESCRIPTOR, + aggregator, + attributesProcessor, + CARDINALITY_LIMIT); // Record measurements for max number of attributes - for (int i = 0; i < MetricStorage.MAX_CARDINALITY; i++) { + for (int i = 0; i < CARDINALITY_LIMIT; i++) { storage.recordDouble( 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); } - verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle(); + verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(); assertThat(storage.getAggregatorHandlePool()).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) .hasDoubleSumSatisfying( @@ -196,7 +209,7 @@ void recordAndCollect_CumulativeAtLimit() { sum.satisfies( sumData -> assertThat(sumData.getPoints()) - .hasSize(MetricStorage.MAX_CARDINALITY) + .hasSize(CARDINALITY_LIMIT) .allSatisfy( point -> { assertThat(point.getStartEpochNanos()).isEqualTo(0); @@ -210,10 +223,10 @@ void recordAndCollect_CumulativeAtLimit() { // Record measurement for additional attribute, exceeding limit storage.recordDouble( 3, - Attributes.builder().put("key", "value" + MetricStorage.MAX_CARDINALITY + 1).build(), + Attributes.builder().put("key", "value" + CARDINALITY_LIMIT + 1).build(), Context.current()); - // Should not create additional handles after MAX_CARDINALITY is reached - verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle(); + // Should not create additional handles after CARDINALITY_LIMIT is reached + verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(); assertThat(storage.getAggregatorHandlePool()).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20)) .hasDoubleSumSatisfying( @@ -221,7 +234,7 @@ void recordAndCollect_CumulativeAtLimit() { sum.satisfies( sumData -> assertThat(sumData.getPoints()) - .hasSize(MetricStorage.MAX_CARDINALITY) + .hasSize(CARDINALITY_LIMIT) .allSatisfy( point -> { assertThat(point.getStartEpochNanos()).isEqualTo(0); @@ -233,7 +246,7 @@ void recordAndCollect_CumulativeAtLimit() { point .getAttributes() .get(AttributeKey.stringKey("key")) - .equals("value" + MetricStorage.MAX_CARDINALITY + 1)))); + .equals("value" + CARDINALITY_LIMIT + 1)))); assertThat(storage.getAggregatorHandlePool()).hasSize(0); logs.assertContains("Instrument name has exceeded the maximum allowed cardinality"); } @@ -242,14 +255,14 @@ void recordAndCollect_CumulativeAtLimit() { void recordAndCollect_DeltaAtLimit() { DefaultSynchronousMetricStorage storage = new DefaultSynchronousMetricStorage<>( - deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor); + deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor, CARDINALITY_LIMIT); // Record measurements for max number of attributes - for (int i = 0; i < MetricStorage.MAX_CARDINALITY; i++) { + for (int i = 0; i < CARDINALITY_LIMIT; i++) { storage.recordDouble( 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); } - verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle(); + verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(); assertThat(storage.getAggregatorHandlePool()).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10)) .hasDoubleSumSatisfying( @@ -257,25 +270,25 @@ void recordAndCollect_DeltaAtLimit() { sum.satisfies( sumData -> assertThat(sumData.getPoints()) - .hasSize(MetricStorage.MAX_CARDINALITY) + .hasSize(CARDINALITY_LIMIT) .allSatisfy( point -> { assertThat(point.getStartEpochNanos()).isEqualTo(0); assertThat(point.getEpochNanos()).isEqualTo(10); assertThat(point.getValue()).isEqualTo(3); }))); - assertThat(storage.getAggregatorHandlePool()).hasSize(MetricStorage.MAX_CARDINALITY); + assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT); assertThat(logs.getEvents()).isEmpty(); deltaReader.setLastCollectEpochNanos(10); // Record measurement for additional attribute, should not exceed limit due to reset storage.recordDouble( 3, - Attributes.builder().put("key", "value" + MetricStorage.MAX_CARDINALITY + 1).build(), + Attributes.builder().put("key", "value" + CARDINALITY_LIMIT + 1).build(), Context.current()); // Should use handle returned to pool instead of creating new ones - verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle(); - assertThat(storage.getAggregatorHandlePool()).hasSize(MetricStorage.MAX_CARDINALITY - 1); + verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(); + assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT - 1); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20)) .hasDoubleSumSatisfying( sum -> @@ -288,19 +301,19 @@ void recordAndCollect_DeltaAtLimit() { .hasValue(3) .hasAttributes( Attributes.builder() - .put("key", "value" + MetricStorage.MAX_CARDINALITY + 1) + .put("key", "value" + CARDINALITY_LIMIT + 1) .build()))); - assertThat(storage.getAggregatorHandlePool()).hasSize(MetricStorage.MAX_CARDINALITY); + assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT); assertThat(logs.getEvents()).isEmpty(); deltaReader.setLastCollectEpochNanos(20); // Record measurements exceeding max number of attributes. Last measurement should be dropped - for (int i = 0; i < MetricStorage.MAX_CARDINALITY + 1; i++) { + for (int i = 0; i < CARDINALITY_LIMIT + 1; i++) { storage.recordDouble( 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); } // Should use handles returned to pool instead of creating new ones - verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle(); + verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(); assertThat(storage.getAggregatorHandlePool()).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30)) .hasDoubleSumSatisfying( @@ -308,7 +321,7 @@ void recordAndCollect_DeltaAtLimit() { sum.satisfies( sumData -> assertThat(sumData.getPoints()) - .hasSize(MetricStorage.MAX_CARDINALITY) + .hasSize(CARDINALITY_LIMIT) .allSatisfy( point -> { assertThat(point.getStartEpochNanos()).isEqualTo(20); @@ -320,8 +333,8 @@ void recordAndCollect_DeltaAtLimit() { point .getAttributes() .get(AttributeKey.stringKey("key")) - .equals("value" + MetricStorage.MAX_CARDINALITY + 1)))); - assertThat(storage.getAggregatorHandlePool()).hasSize(MetricStorage.MAX_CARDINALITY); + .equals("value" + CARDINALITY_LIMIT + 1)))); + assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT); logs.assertContains("Instrument name has exceeded the maximum allowed cardinality"); } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/RegisteredViewTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/RegisteredViewTest.java index 1cbd79b4d0d..634347067d2 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/RegisteredViewTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/RegisteredViewTest.java @@ -12,6 +12,7 @@ import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.View; import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo; +import io.opentelemetry.sdk.metrics.internal.state.MetricStorage; import org.junit.jupiter.api.Test; class RegisteredViewTest { @@ -33,12 +34,13 @@ void stringRepresentation() { .setAggregation(Aggregation.sum()) .build(), AttributesProcessor.noop(), + MetricStorage.DEFAULT_MAX_CARDINALITY, SourceInfo.fromCurrentStack()) .toString()) .isEqualTo( "RegisteredView{" + "instrumentSelector=InstrumentSelector{instrumentType=COUNTER, instrumentName=name, meterName=meter-name, meterVersion=meter-version, meterSchemaUrl=meter-schema-url}, " - + "view=View{name=name, description=description, aggregation=SumAggregation, attributesProcessor=NoopAttributesProcessor{}}" + + "view=View{name=name, description=description, aggregation=SumAggregation, attributesProcessor=NoopAttributesProcessor{}, cardinalityLimit=2000}" + "}"); } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/ViewRegistryTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/ViewRegistryTest.java index 08234ebef47..906a3e0856f 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/ViewRegistryTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/view/ViewRegistryTest.java @@ -21,6 +21,8 @@ import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo; import io.opentelemetry.sdk.metrics.internal.descriptor.Advice; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; +import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector; +import io.opentelemetry.sdk.metrics.internal.state.MetricStorage; import java.util.Arrays; import java.util.Collections; import org.junit.jupiter.api.Test; @@ -38,7 +40,11 @@ class ViewRegistryTest { private static RegisteredView registeredView(InstrumentSelector instrumentSelector, View view) { return RegisteredView.create( - instrumentSelector, view, AttributesProcessor.noop(), SourceInfo.fromCurrentStack()); + instrumentSelector, + view, + AttributesProcessor.noop(), + MetricStorage.DEFAULT_MAX_CARDINALITY, + SourceInfo.fromCurrentStack()); } @Test @@ -49,7 +55,9 @@ void findViews_SelectionOnType() { View.builder().setDescription("description").build()); ViewRegistry viewRegistry = ViewRegistry.create( - DefaultAggregationSelector.getDefault(), Collections.singletonList(registeredView)); + DefaultAggregationSelector.getDefault(), + CardinalityLimitSelector.defaultCardinalityLimitSelector(), + Collections.singletonList(registeredView)); assertThat( viewRegistry.findViews( @@ -81,7 +89,9 @@ void findViews_SelectionOnUnit() { View.builder().setDescription("description").build()); ViewRegistry viewRegistry = ViewRegistry.create( - DefaultAggregationSelector.getDefault(), Collections.singletonList(registeredView)); + DefaultAggregationSelector.getDefault(), + CardinalityLimitSelector.defaultCardinalityLimitSelector(), + Collections.singletonList(registeredView)); assertThat( viewRegistry.findViews( @@ -113,7 +123,9 @@ void findViews_SelectionOnName() { View.builder().setDescription("description").build()); ViewRegistry viewRegistry = ViewRegistry.create( - DefaultAggregationSelector.getDefault(), Collections.singletonList(registeredView)); + DefaultAggregationSelector.getDefault(), + CardinalityLimitSelector.defaultCardinalityLimitSelector(), + Collections.singletonList(registeredView)); assertThat( viewRegistry.findViews( @@ -156,6 +168,7 @@ void findViews_MultipleMatchingViews() { ViewRegistry viewRegistry = ViewRegistry.create( DefaultAggregationSelector.getDefault(), + CardinalityLimitSelector.defaultCardinalityLimitSelector(), Arrays.asList(registeredView1, registeredView2)); assertThat( @@ -195,7 +208,9 @@ void findViews_SelectionTypeAndName() { View.builder().setAggregation(Aggregation.explicitBucketHistogram()).build()); ViewRegistry viewRegistry = ViewRegistry.create( - DefaultAggregationSelector.getDefault(), Collections.singletonList(registeredView)); + DefaultAggregationSelector.getDefault(), + CardinalityLimitSelector.defaultCardinalityLimitSelector(), + Collections.singletonList(registeredView)); assertThat( viewRegistry.findViews( @@ -254,7 +269,10 @@ void findViews_DefaultAggregationSelector() { : Aggregation.defaultAggregation(); ViewRegistry viewRegistry = - ViewRegistry.create(defaultAggregationSelector, Collections.singletonList(registeredView)); + ViewRegistry.create( + defaultAggregationSelector, + CardinalityLimitSelector.defaultCardinalityLimitSelector(), + Collections.singletonList(registeredView)); // Counter instrument should result in default view assertThat( @@ -330,7 +348,9 @@ void findViews_IncompatibleViewIgnored() { View.builder().setAggregation(Aggregation.explicitBucketHistogram()).build()); ViewRegistry viewRegistry = ViewRegistry.create( - DefaultAggregationSelector.getDefault(), Collections.singletonList(registeredView)); + DefaultAggregationSelector.getDefault(), + CardinalityLimitSelector.defaultCardinalityLimitSelector(), + Collections.singletonList(registeredView)); assertThat( viewRegistry.findViews(