Skip to content

Commit

Permalink
Metric cardinality limits (#3831)
Browse files Browse the repository at this point in the history
* Drop stale streams, make metrics minimum collection interval configurable

* Limit number of accumulations in async and sync metric storage

* PR feedback
  • Loading branch information
jack-berg authored Nov 9, 2021
1 parent 8f2b21b commit fc68a88
Show file tree
Hide file tree
Showing 11 changed files with 541 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,7 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {
private final Map<CollectionHandle, CollectionInfo> collectionInfoMap;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicLong lastCollectionTimestamp;

// Minimum amount of time we allow between synchronous collections.
// This meant to reduce overhead when multiple exporters attempt to read metrics quickly.
// TODO: This should be configurable at the SDK level.
private static final long MINIMUM_COLLECTION_INTERVAL_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
private final long minimumCollectionIntervalNanos;

/**
* Returns a new {@link SdkMeterProviderBuilder} for {@link SdkMeterProvider}.
Expand All @@ -65,14 +61,16 @@ public static SdkMeterProviderBuilder builder() {
Clock clock,
Resource resource,
ViewRegistry viewRegistry,
ExemplarFilter exemplarSampler) {
ExemplarFilter exemplarSampler,
long minimumCollectionIntervalNanos) {
this.sharedState =
MeterProviderSharedState.create(clock, resource, viewRegistry, exemplarSampler);
this.registry =
new ComponentRegistry<>(
instrumentationLibraryInfo -> new SdkMeter(sharedState, instrumentationLibraryInfo));
this.lastCollectionTimestamp =
new AtomicLong(clock.nanoTime() - MINIMUM_COLLECTION_INTERVAL_NANOS);
new AtomicLong(clock.nanoTime() - minimumCollectionIntervalNanos);
this.minimumCollectionIntervalNanos = minimumCollectionIntervalNanos;

// Here we construct our own unique handle ids for this SDK.
// These are guaranteed to be unique per-reader for this SDK, and only this SDK.
Expand Down Expand Up @@ -148,7 +146,7 @@ public Collection<MetricData> collectAllMetrics() {
long pastNanoTime = lastCollectionTimestamp.get();
// It hasn't been long enough since the last collection.
boolean disableSynchronousCollection =
(currentNanoTime - pastNanoTime) < MINIMUM_COLLECTION_INTERVAL_NANOS;
(currentNanoTime - pastNanoTime) < minimumCollectionIntervalNanos;
// If we're not disabling metrics, write the current collection time.
// We don't care if this happens in more than one thread, suppression is optimistic, and the
// interval is small enough some jitter isn't important.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.sdk.metrics;

import static io.opentelemetry.api.internal.Utils.checkArgument;

import io.opentelemetry.api.metrics.GlobalMeterProvider;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarFilter;
Expand All @@ -15,9 +17,11 @@
import io.opentelemetry.sdk.metrics.view.InstrumentSelector;
import io.opentelemetry.sdk.metrics.view.View;
import io.opentelemetry.sdk.resources.Resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Builder class for the {@link SdkMeterProvider}. Has fully functional default implementations of
Expand All @@ -31,6 +35,7 @@ public final class SdkMeterProviderBuilder {
private final List<MetricReaderFactory> metricReaders = new ArrayList<>();
// Default the sampling strategy.
private ExemplarFilter exemplarFilter = ExemplarFilter.sampleWithTraces();
private long minimumCollectionIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);

SdkMeterProviderBuilder() {}

Expand Down Expand Up @@ -123,6 +128,20 @@ public SdkMeterProviderBuilder registerMetricReader(MetricReaderFactory reader)
return this;
}

/**
* Configure the minimum duration between synchronous collections. If collections occur more
* frequently than this, synchronous collection will be suppressed.
*
* @param duration The duration.
* @return this
*/
public SdkMeterProviderBuilder setMinimumCollectionInterval(Duration duration) {
Objects.requireNonNull(duration, "duration");
checkArgument(!duration.isNegative(), "duration must not be negative");
minimumCollectionIntervalNanos = duration.toNanos();
return this;
}

/**
* Returns a new {@link SdkMeterProvider} built with the configuration of this {@link
* SdkMeterProviderBuilder}. This provider is not registered as the global {@link
Expand All @@ -135,6 +154,11 @@ public SdkMeterProviderBuilder registerMetricReader(MetricReaderFactory reader)
*/
public SdkMeterProvider build() {
return new SdkMeterProvider(
metricReaders, clock, resource, viewRegistryBuilder.build(), exemplarFilter);
metricReaders,
clock,
resource,
viewRegistryBuilder.build(),
exemplarFilter,
minimumCollectionIntervalNanos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarFilter;
Expand All @@ -24,6 +25,8 @@
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
Expand All @@ -33,6 +36,8 @@
* at any time.
*/
public final class AsynchronousMetricStorage<T> implements MetricStorage {
private static final ThrottlingLogger logger =
new ThrottlingLogger(Logger.getLogger(DeltaMetricStorage.class.getName()));
private final MetricDescriptor metricDescriptor;
private final ReentrantLock collectLock = new ReentrantLock();
private final AsyncAccumulator<T> asyncAccumulator;
Expand All @@ -56,7 +61,7 @@ public static <T> MetricStorage doubleAsynchronousAccumulator(
Aggregator<T> aggregator =
view.getAggregation().createAggregator(instrument, ExemplarFilter.neverSample());

final AsyncAccumulator<T> measurementAccumulator = new AsyncAccumulator<>();
final AsyncAccumulator<T> measurementAccumulator = new AsyncAccumulator<>(instrument);
if (Aggregator.empty() == aggregator) {
return empty();
}
Expand Down Expand Up @@ -90,7 +95,7 @@ public static <T> MetricStorage longAsynchronousAccumulator(
final MetricDescriptor metricDescriptor = MetricDescriptor.create(view, instrument);
Aggregator<T> aggregator =
view.getAggregation().createAggregator(instrument, ExemplarFilter.neverSample());
final AsyncAccumulator<T> measurementAccumulator = new AsyncAccumulator<>();
final AsyncAccumulator<T> measurementAccumulator = new AsyncAccumulator<>(instrument);
final AttributesProcessor attributesProcessor = view.getAttributesProcessor();
// TODO: Find a way to grab the measurement JUST ONCE for all async metrics.
final ObservableLongMeasurement result =
Expand Down Expand Up @@ -159,9 +164,24 @@ public MetricDescriptor getMetricDescriptor() {

/** Helper class to record async measurements on demand. */
private static final class AsyncAccumulator<T> {
private final InstrumentDescriptor instrument;
private Map<Attributes, T> currentAccumulation = new HashMap<>();

AsyncAccumulator(InstrumentDescriptor instrument) {
this.instrument = instrument;
}

public void record(Attributes attributes, T accumulation) {
if (currentAccumulation.size() >= MetricStorageUtils.MAX_ACCUMULATIONS) {
logger.log(
Level.WARNING,
"Instrument "
+ instrument.getName()
+ " has exceeded the maximum allowed accumulations ("
+ MetricStorageUtils.MAX_ACCUMULATIONS
+ ").");
return;
}
// TODO: error on metric overwrites
currentAccumulation.put(attributes, accumulation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public final class DefaultSynchronousMetricStorage<T> implements SynchronousMetr
AttributesProcessor attributesProcessor) {
this.attributesProcessor = attributesProcessor;
this.metricDescriptor = metricDescriptor;
this.deltaMetricStorage = new DeltaMetricStorage<>(aggregator);
this.deltaMetricStorage =
new DeltaMetricStorage<>(aggregator, metricDescriptor.getSourceInstrument());
this.temporalMetricStorage = new TemporalMetricStorage<>(aggregator, /* isSynchronous= */ true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,23 @@

package io.opentelemetry.sdk.metrics.internal.state;

import static io.opentelemetry.sdk.metrics.internal.state.MetricStorageUtils.MAX_ACCUMULATIONS;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.export.CollectionHandle;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -25,13 +32,20 @@
*/
@ThreadSafe
class DeltaMetricStorage<T> {

private static final ThrottlingLogger logger =
new ThrottlingLogger(Logger.getLogger(DeltaMetricStorage.class.getName()));
private static final BoundStorageHandle NOOP_STORAGE_HANDLE = new NoopBoundHandle();

private final Aggregator<T> aggregator;
private final InstrumentDescriptor instrument;
private final ConcurrentHashMap<Attributes, AggregatorHandle<T>> activeCollectionStorage =
new ConcurrentHashMap<>();
private final List<DeltaAccumulation<T>> unreportedDeltas = new ArrayList<>();

DeltaMetricStorage(Aggregator<T> aggregator) {
DeltaMetricStorage(Aggregator<T> aggregator, InstrumentDescriptor instrument) {
this.aggregator = aggregator;
this.instrument = instrument;
}

/**
Expand All @@ -47,9 +61,19 @@ public BoundStorageHandle bind(Attributes attributes) {
return aggregatorHandle;
}

// Missing entry or no longer mapped, try to add a new entry.
// Missing entry or no longer mapped. Try to add a new one if not exceeded cardinality limits.
aggregatorHandle = aggregator.createHandle();
while (true) {
if (activeCollectionStorage.size() >= MAX_ACCUMULATIONS) {
logger.log(
Level.WARNING,
"Instrument "
+ instrument.getName()
+ " has exceeded the maximum allowed accumulations ("
+ MAX_ACCUMULATIONS
+ ").");
return NOOP_STORAGE_HANDLE;
}
AggregatorHandle<?> boundAggregatorHandle =
activeCollectionStorage.putIfAbsent(attributes, aggregatorHandle);
if (boundAggregatorHandle != null) {
Expand Down Expand Up @@ -121,4 +145,17 @@ private synchronized void collectSynchronousDeltaAccumulationAndReset() {
unreportedDeltas.add(new DeltaAccumulation<>(result));
}
}

/** An implementation of {@link BoundStorageHandle} that does not record. */
private static class NoopBoundHandle implements BoundStorageHandle {

@Override
public void recordLong(long value, Attributes attributes, Context context) {}

@Override
public void recordDouble(double value, Attributes attributes, Context context) {}

@Override
public void release() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,37 @@

/** Utilities to help deal w/ {@code Map<Attributes, Accumulation>} in metric storage. */
final class MetricStorageUtils {
/** The max number of metric accumulations for a particular {@link MetricStorage}. */
static final int MAX_ACCUMULATIONS = 2000;

private MetricStorageUtils() {}

/**
* Merges accumulations from {@code toMerge} into {@code result}.
* Merges accumulations from {@code toMerge} into {@code result}. Keys from {@code result} which
* don't appear in {@code toMerge} are removed.
*
* <p>Note: This mutates the result map.
*/
static <T> void mergeInPlace(
Map<Attributes, T> result, Map<Attributes, T> toMerge, Aggregator<T> aggregator) {
result.entrySet().removeIf(entry -> !toMerge.containsKey(entry.getKey()));
toMerge.forEach(
(k, v) -> {
result.compute(k, (k2, v2) -> (v2 != null) ? aggregator.merge(v2, v) : v);
});
}

/**
* Diffs accumulations from {@code toMerge} into {@code result}.
* Diffs accumulations from {@code toMerge} into {@code result}. Keys from {@code result} which
* don't appear in {@code toMerge} are removed.
*
* <p>If no prior value is found, then the value from {@code toDiff} is used.
*
* <p>Note: This mutates the result map.
*/
static <T> void diffInPlace(
Map<Attributes, T> result, Map<Attributes, T> toDiff, Aggregator<T> aggregator) {
result.entrySet().removeIf(entry -> !toDiff.containsKey(entry.getKey()));
toDiff.forEach(
(k, v) -> {
result.compute(k, (k2, v2) -> (v2 != null) ? aggregator.diff(v2, v) : v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,16 @@ synchronized MetricData buildMetricFor(

/** Remembers what was presented to a specific exporter. */
private static class LastReportedAccumulation<T> {
@Nullable private final Map<Attributes, T> accumulation;
private final Map<Attributes, T> accumulation;
private final long epochNanos;

/**
* Constructs a new reporting record.
*
* @param accumulation The last accumulation of metric data or {@code null} if the accumulator
* is not stateful.
* @param accumulation The last accumulation of metric data.
* @param epochNanos The timestamp the data was reported.
*/
LastReportedAccumulation(@Nullable Map<Attributes, T> accumulation, long epochNanos) {
LastReportedAccumulation(Map<Attributes, T> accumulation, long epochNanos) {
this.accumulation = accumulation;
this.epochNanos = epochNanos;
}
Expand All @@ -126,7 +125,6 @@ long getEpochNanos() {
return epochNanos;
}

@Nullable
Map<Attributes, T> getAccumulation() {
return accumulation;
}
Expand Down
Loading

0 comments on commit fc68a88

Please sign in to comment.