Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental metric reader and view cardinality limits #5494

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ void stringRepresentation() {
+ "clock=SystemClock{}, "
+ "resource=Resource{schemaUrl=null, attributes={service.name=\"otel-test\"}}, "
+ "metricReaders=[PeriodicMetricReader{exporter=MockMetricExporter{}, intervalNanos=60000000000}], "
+ "views=[RegisteredView{instrumentSelector=InstrumentSelector{instrumentName=instrument}, view=View{name=new-instrument, aggregation=DefaultAggregation, attributesProcessor=NoopAttributesProcessor{}}}]"
+ "views=[RegisteredView{instrumentSelector=InstrumentSelector{instrumentName=instrument}, view=View{name=new-instrument, aggregation=DefaultAggregation, attributesProcessor=NoopAttributesProcessor{}, cardinalityLimit=2000}}]"
+ "}, "
+ "loggerProvider=SdkLoggerProvider{"
+ "clock=SystemClock{}, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector;
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
Expand All @@ -26,6 +27,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -54,17 +56,19 @@ public static SdkMeterProviderBuilder builder() {

SdkMeterProvider(
List<RegisteredView> registeredViews,
List<MetricReader> metricReaders,
IdentityHashMap<MetricReader, CardinalityLimitSelector> metricReaders,
Clock clock,
Resource resource,
ExemplarFilter exemplarFilter) {
long startEpochNanos = clock.now();
this.registeredViews = registeredViews;
this.registeredReaders =
metricReaders.stream()
metricReaders.entrySet().stream()
.map(
reader ->
RegisteredReader.create(reader, ViewRegistry.create(reader, registeredViews)))
entry ->
RegisteredReader.create(
entry.getKey(),
ViewRegistry.create(entry.getKey(), entry.getValue(), registeredViews)))
.collect(toList());
this.sharedState =
MeterProviderSharedState.create(clock, resource, exemplarFilter, startEpochNanos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector;
import io.opentelemetry.sdk.metrics.internal.view.RegisteredView;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Objects;

Expand All @@ -32,7 +34,8 @@ public final class SdkMeterProviderBuilder {

private Clock clock = Clock.getDefault();
private Resource resource = Resource.getDefault();
private final List<MetricReader> metricReaders = new ArrayList<>();
private final IdentityHashMap<MetricReader, CardinalityLimitSelector> metricReaders =
new IdentityHashMap<>();
private final List<RegisteredView> registeredViews = new ArrayList<>();
private ExemplarFilter exemplarFilter = DEFAULT_EXEMPLAR_FILTER;

Expand Down Expand Up @@ -96,7 +99,11 @@ public SdkMeterProviderBuilder registerView(InstrumentSelector selector, View vi
Objects.requireNonNull(view, "view");
registeredViews.add(
RegisteredView.create(
selector, view, view.getAttributesProcessor(), SourceInfo.fromCurrentStack()));
selector,
view,
view.getAttributesProcessor(),
view.getCardinalityLimit(),
SourceInfo.fromCurrentStack()));
return this;
}

Expand All @@ -106,7 +113,20 @@ public SdkMeterProviderBuilder registerView(InstrumentSelector selector, View vi
* <p>Note: custom implementations of {@link MetricReader} are not currently supported.
*/
public SdkMeterProviderBuilder registerMetricReader(MetricReader reader) {
metricReaders.add(reader);
metricReaders.put(reader, CardinalityLimitSelector.defaultCardinalityLimitSelector());
return this;
}

/**
* Registers a {@link MetricReader} with a {@link CardinalityLimitSelector}.
*
* <p>Note: not currently stable but available for experimental use via {@link
* SdkMeterProviderUtil#registerMetricReaderWithCardinalitySelector(SdkMeterProviderBuilder,
* MetricReader, CardinalityLimitSelector)}.
*/
SdkMeterProviderBuilder registerMetricReader(
MetricReader reader, CardinalityLimitSelector cardinalityLimitSelector) {
metricReaders.put(reader, cardinalityLimitSelector);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ static View create(
@Nullable String name,
@Nullable String description,
Aggregation aggregation,
AttributesProcessor attributesProcessor) {
return new AutoValue_View(name, description, aggregation, attributesProcessor);
AttributesProcessor attributesProcessor,
int cardinalityLimit) {
return new AutoValue_View(
name, description, aggregation, attributesProcessor, cardinalityLimit);
}

View() {}
Expand All @@ -58,6 +60,9 @@ static View create(
/** Returns the attribute processor used for this view. */
abstract AttributesProcessor getAttributesProcessor();

/** Returns the cardinality limit for this view. */
abstract int getCardinalityLimit();

@Override
public final String toString() {
StringJoiner joiner = new StringJoiner(", ", "View{", "}");
Expand All @@ -69,6 +74,7 @@ public final String toString() {
}
joiner.add("aggregation=" + getAggregation());
joiner.add("attributesProcessor=" + getAttributesProcessor());
joiner.add("cardinalityLimit=" + getCardinalityLimit());
return joiner.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.state.MetricStorage;
import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor;
import java.util.Objects;
import java.util.function.Predicate;
Expand All @@ -23,6 +24,7 @@ public final class ViewBuilder {
@Nullable private String description;
private Aggregation aggregation = Aggregation.defaultAggregation();
private AttributesProcessor processor = AttributesProcessor.noop();
private int cardinalityLimit = MetricStorage.DEFAULT_MAX_CARDINALITY;

ViewBuilder() {}

Expand Down Expand Up @@ -85,8 +87,24 @@ ViewBuilder addAttributesProcessor(AttributesProcessor attributesProcessor) {
return this;
}

/**
* Set the cardinality limit.
*
* <p>Note: not currently stable but cardinality limit can be configured via
* SdkMeterProviderUtil#setCardinalityLimit(ViewBuilder, int)}.
*
* @param cardinalityLimit the maximum number of series for a metric
*/
ViewBuilder setCardinalityLimit(int cardinalityLimit) {
if (cardinalityLimit <= 0) {
throw new IllegalArgumentException("cardinalityLimit must be > 0");
}
this.cardinalityLimit = cardinalityLimit;
return this;
}

/** Returns a {@link View} with the configuration of this builder. */
public View build() {
return View.create(name, description, aggregation, processor);
return View.create(name, description, aggregation, processor, cardinalityLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.ViewBuilder;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector;
import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor;
import io.opentelemetry.sdk.metrics.internal.view.StringPredicates;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -26,7 +28,7 @@ private SdkMeterProviderUtil() {}
/**
* Reflectively assign the {@link ExemplarFilter} to the {@link SdkMeterProviderBuilder}.
*
* @param sdkMeterProviderBuilder the
* @param sdkMeterProviderBuilder the builder
*/
public static void setExemplarFilter(
SdkMeterProviderBuilder sdkMeterProviderBuilder, ExemplarFilter exemplarFilter) {
Expand All @@ -42,6 +44,28 @@ public static void setExemplarFilter(
}
}

/**
* Reflectively add a {@link MetricReader} with the {@link CardinalityLimitSelector} to the {@link
* SdkMeterProviderBuilder}.
*
* @param sdkMeterProviderBuilder the builder
*/
public static void registerMetricReaderWithCardinalitySelector(
SdkMeterProviderBuilder sdkMeterProviderBuilder,
MetricReader metricReader,
CardinalityLimitSelector cardinalityLimitSelector) {
try {
Method method =
SdkMeterProviderBuilder.class.getDeclaredMethod(
"registerMetricReader", MetricReader.class, CardinalityLimitSelector.class);
method.setAccessible(true);
method.invoke(sdkMeterProviderBuilder, metricReader, cardinalityLimitSelector);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException(
"Error calling addMetricReader on SdkMeterProviderBuilder", e);
}
}

/**
* Reflectively add an {@link AttributesProcessor} to the {@link ViewBuilder} which appends
* key-values from baggage to all measurements.
Expand Down Expand Up @@ -81,6 +105,21 @@ private static void addAttributesProcessor(
}
}

/**
* Reflectively set the {@code cardinalityLimit} on the {@link ViewBuilder}.
*
* @param viewBuilder the builder
*/
public static void setCardinalityLimit(ViewBuilder viewBuilder, int cardinalityLimit) {
try {
Method method = ViewBuilder.class.getDeclaredMethod("setCardinalityLimit", int.class);
method.setAccessible(true);
method.invoke(viewBuilder, cardinalityLimit);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new IllegalStateException("Error setting cardinalityLimit on ViewBuilder", e);
}
}

/** Reflectively reset the {@link SdkMeterProvider}, clearing all registered instruments. */
public static void resetForTest(SdkMeterProvider sdkMeterProvider) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.export;

import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.state.MetricStorage;

/**
* Customize the {@link io.opentelemetry.sdk.metrics.export.MetricReader} cardinality limit as a
* function of {@link InstrumentType}. Register via {@link
* SdkMeterProviderUtil#registerMetricReaderWithCardinalitySelector(SdkMeterProviderBuilder,
* MetricReader, CardinalityLimitSelector)}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
@FunctionalInterface
public interface CardinalityLimitSelector {

/**
* The default {@link CardinalityLimitSelector}, allowing each metric to have {@code 2000} points.
*/
static CardinalityLimitSelector defaultCardinalityLimitSelector() {
return unused -> MetricStorage.DEFAULT_MAX_CARDINALITY;
}

/**
* Return the default cardinality limit for metrics from instruments of type {@code
* instrumentType}. The cardinality limit dictates the maximum number of distinct points (or time
* series) for the metric.
*/
int getCardinalityLimit(InstrumentType instrumentType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ final class AsynchronousMetricStorage<T extends PointData, U extends ExemplarDat
private final AggregationTemporality aggregationTemporality;
private final Aggregator<T, U> aggregator;
private final AttributesProcessor attributesProcessor;
private final int maxCardinality;
private Map<Attributes, T> points = new HashMap<>();
private Map<Attributes, T> lastPoints =
new HashMap<>(); // Only populated if aggregationTemporality == DELTA
Expand All @@ -54,7 +55,8 @@ private AsynchronousMetricStorage(
RegisteredReader registeredReader,
MetricDescriptor metricDescriptor,
Aggregator<T, U> aggregator,
AttributesProcessor attributesProcessor) {
AttributesProcessor attributesProcessor,
int maxCardinality) {
this.registeredReader = registeredReader;
this.metricDescriptor = metricDescriptor;
this.aggregationTemporality =
Expand All @@ -63,6 +65,7 @@ private AsynchronousMetricStorage(
.getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
this.aggregator = aggregator;
this.attributesProcessor = attributesProcessor;
this.maxCardinality = maxCardinality;
}

/**
Expand All @@ -83,7 +86,8 @@ static <T extends PointData, U extends ExemplarData> AsynchronousMetricStorage<T
registeredReader,
metricDescriptor,
aggregator,
registeredView.getViewAttributesProcessor());
registeredView.getViewAttributesProcessor(),
registeredView.getCardinalityLimit());
}

/**
Expand All @@ -109,13 +113,13 @@ void record(Measurement measurement) {
private void recordPoint(T point) {
Attributes attributes = point.getAttributes();

if (points.size() >= MetricStorage.MAX_CARDINALITY) {
if (points.size() >= maxCardinality) {
throttlingLogger.log(
Level.WARNING,
"Instrument "
+ metricDescriptor.getSourceInstrument().getName()
+ " has exceeded the maximum allowed cardinality ("
+ MetricStorage.MAX_CARDINALITY
+ maxCardinality
+ ").");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles =
new ConcurrentHashMap<>();
private final AttributesProcessor attributesProcessor;
private final int maxCardinality;
private final ConcurrentLinkedQueue<AggregatorHandle<T, U>> aggregatorHandlePool =
new ConcurrentLinkedQueue<>();

DefaultSynchronousMetricStorage(
RegisteredReader registeredReader,
MetricDescriptor metricDescriptor,
Aggregator<T, U> aggregator,
AttributesProcessor attributesProcessor) {
AttributesProcessor attributesProcessor,
int maxCardinality) {
this.registeredReader = registeredReader;
this.metricDescriptor = metricDescriptor;
this.aggregationTemporality =
Expand All @@ -66,6 +68,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
.getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
this.aggregator = aggregator;
this.attributesProcessor = attributesProcessor;
this.maxCardinality = maxCardinality;
}

// Visible for testing
Expand Down Expand Up @@ -97,13 +100,13 @@ private AggregatorHandle<T, U> getAggregatorHandle(Attributes attributes, Contex
if (handle != null) {
return handle;
}
if (aggregatorHandles.size() >= MAX_CARDINALITY) {
if (aggregatorHandles.size() >= maxCardinality) {
logger.log(
Level.WARNING,
"Instrument "
+ metricDescriptor.getSourceInstrument().getName()
+ " has exceeded the maximum allowed cardinality ("
+ MAX_CARDINALITY
+ maxCardinality
+ ").");
return null;
}
Expand Down Expand Up @@ -143,9 +146,9 @@ public MetricData collect(
}
});

// Trim pool down if needed. pool.size() will only exceed MAX_CARDINALITY if new handles are
// Trim pool down if needed. pool.size() will only exceed maxCardinality if new handles are
// created during collection.
int toDelete = aggregatorHandlePool.size() - MAX_CARDINALITY;
int toDelete = aggregatorHandlePool.size() - maxCardinality;
for (int i = 0; i < toDelete; i++) {
aggregatorHandlePool.poll();
}
Expand Down
Loading