Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Mode: Adding first part support for synchronous instruments - storage #5998

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ public enum MemoryMode {
*
* <p>In this mode, the SDK reuses objects to reduce allocations, at the expense of disallowing
* concurrent collections / exports.
*
* <p>Metric Signal: For DELTA aggregation temporality, the memory used for recording and
* aggregating metric values is kept between MetricReader collect operation, to avoid memory
* allocations. When the configured maximum cardinality of Attributes is reached, unused
* Attributes are cleared from memory during collect operation, at the cost of requiring new
* memory allocations the next time those attributes are used. Allocations can be minimized by
* increasing the configured max cardinality. For example, suppose instrumentation has recorded
* values for 1000 unique Attributes while the max cardinality configured was 2000. If after a
* collection only 100 unique Attributes values are recorded, the MetricReader's collect operation
* would return 100 points, while in memory the Attributes data structure keeps 1000 unique
* Attributes. If a user recorded values for 3000 unique attributes, the values for the first 1999
* Attributes would be recorded, and the rest of 1001 unique Attributes values would be recorded
* in the CARDINALITY_OVERFLOW Attributes. If after several collect operations, the user now
* records values to only 500 unique attributes, during collect operation, the unused 1500
* Attributes memory would be cleared from memory.
*/
REUSABLE_DATA,

Expand All @@ -25,6 +40,9 @@ public enum MemoryMode {
*
* <p>In this mode, the SDK passes immutable objects to exporters / readers, increasing
* allocations but ensuring safe concurrent exports.
*
* <p>Metric Signal: In DELTA aggregation temporality, the memory used for recording and
* aggregating Attributes values is cleared during a MetricReader collect operation.
*/
IMMUTABLE_DATA
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor.setIncludes;

import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.state.MetricStorage;
Expand Down Expand Up @@ -96,6 +97,7 @@ public ViewBuilder setAttributeFilter(Predicate<String> keyFilter) {
* <p>Note: not currently stable but additional attribute processors can be configured via {@link
* SdkMeterProviderUtil#appendAllBaggageAttributes(ViewBuilder)}.
*/
@SuppressWarnings("unused")
ViewBuilder addAttributesProcessor(AttributesProcessor attributesProcessor) {
this.processor = this.processor.then(attributesProcessor);
return this;
Expand All @@ -105,7 +107,10 @@ ViewBuilder addAttributesProcessor(AttributesProcessor attributesProcessor) {
* Set the cardinality limit.
*
* <p>Note: not currently stable but cardinality limit can be configured via
* SdkMeterProviderUtil#setCardinalityLimit(ViewBuilder, int)}.
* SdkMeterProviderUtil#setCardinalityLimit(ViewBuilder, int).
*
* <p>Read {@link MemoryMode} to understand the memory usage behavior of reaching cardinality
* limit.
*
* @param cardinalityLimit the maximum number of series for a metric
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public abstract class AggregatorHandle<T extends PointData, U extends ExemplarDa

// A reservoir of sampled exemplars for this time period.
private final ExemplarReservoir<U> exemplarReservoir;
private volatile boolean valuesRecorded = false;

protected AggregatorHandle(ExemplarReservoir<U> exemplarReservoir) {
this.exemplarReservoir = exemplarReservoir;
Expand All @@ -39,6 +40,10 @@ protected AggregatorHandle(ExemplarReservoir<U> exemplarReservoir) {
*/
public final T aggregateThenMaybeReset(
long startEpochNanos, long epochNanos, Attributes attributes, boolean reset) {
if (reset) {
valuesRecorded = false;
}

return doAggregateThenMaybeReset(
startEpochNanos,
epochNanos,
Expand Down Expand Up @@ -69,6 +74,7 @@ public final void recordLong(long value, Attributes attributes, Context context)
*/
public final void recordLong(long value) {
doRecordLong(value);
valuesRecorded = true;
}

/**
Expand All @@ -94,6 +100,7 @@ public final void recordDouble(double value, Attributes attributes, Context cont
*/
public final void recordDouble(double value) {
doRecordDouble(value);
valuesRecorded = true;
}

/**
Expand All @@ -104,4 +111,13 @@ protected void doRecordDouble(double value) {
throw new UnsupportedOperationException(
"This aggregator does not support recording double values.");
}

/**
* Checks whether this handle has values recorded.
*
* @return True if values has been recorded to it
*/
public boolean hasRecordedValues() {
return valuesRecorded;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@

package io.opentelemetry.sdk.metrics.internal.state;

import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA;
import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA;
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
Expand Down Expand Up @@ -50,6 +55,16 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
private volatile AggregatorHolder<T, U> aggregatorHolder = new AggregatorHolder<>();
private final AttributesProcessor attributesProcessor;

private final MemoryMode memoryMode;

// Only populated if memoryMode == REUSABLE_DATA
private final ArrayList<T> reusableResultList = new ArrayList<>();

// Only populated if memoryMode == REUSABLE_DATA and
// aggregationTemporality is DELTA
private volatile ConcurrentHashMap<Attributes, AggregatorHandle<T, U>>
previousCollectionAggregatorHandles = new ConcurrentHashMap<>();

/**
* This field is set to 1 less than the actual intended cardinality limit, allowing the last slot
* to be filled by the {@link MetricStorage#CARDINALITY_OVERFLOW} series.
Expand All @@ -74,6 +89,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
this.aggregator = aggregator;
this.attributesProcessor = attributesProcessor;
this.maxCardinality = maxCardinality - 1;
this.memoryMode = registeredReader.getReader().getMemoryMode();
}

// Visible for testing
Expand Down Expand Up @@ -139,7 +155,7 @@ private AggregatorHolder<T, U> getHolderForRecord() {

/**
* Called on the {@link AggregatorHolder} obtained from {@link #getHolderForRecord()} to indicate
* that recording is complete and it is safe to collect.
* that recording is complete, and it is safe to collect.
*/
private void releaseHolderForRecord(AggregatorHolder<T, U> aggregatorHolder) {
aggregatorHolder.activeRecordingThreads.addAndGet(-2);
Expand Down Expand Up @@ -185,16 +201,20 @@ public MetricData collect(
InstrumentationScopeInfo instrumentationScopeInfo,
long startEpochNanos,
long epochNanos) {
boolean reset = aggregationTemporality == AggregationTemporality.DELTA;
boolean reset = aggregationTemporality == DELTA;
long start =
aggregationTemporality == AggregationTemporality.DELTA
aggregationTemporality == DELTA
? registeredReader.getLastCollectEpochNanos()
: startEpochNanos;

ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles;
if (reset) {
AggregatorHolder<T, U> holder = this.aggregatorHolder;
this.aggregatorHolder = new AggregatorHolder<>();
this.aggregatorHolder =
(memoryMode == REUSABLE_DATA)
? new AggregatorHolder<>(previousCollectionAggregatorHandles)
: new AggregatorHolder<>();

// Increment recordsInProgress by 1, which produces an odd number acting as a signal that
// record operations should re-read the volatile this.aggregatorHolder.
// Repeatedly grab recordsInProgress until it is <= 1, which signals all active record
Expand All @@ -208,15 +228,56 @@ public MetricData collect(
aggregatorHandles = this.aggregatorHolder.aggregatorHandles;
}

List<T> points;
if (memoryMode == REUSABLE_DATA) {
reusableResultList.clear();
points = reusableResultList;
} else {
points = new ArrayList<>(aggregatorHandles.size());
}

// In DELTA aggregation temporality each Attributes is reset to 0
// every time we perform a collection (by definition of DELTA).
// In IMMUTABLE_DATA MemoryMode, this is accomplished by removing all aggregator handles
// (into which the values are recorded) effectively starting from 0
// for each recorded Attributes.
// In REUSABLE_DATA MemoryMode, we strive for zero allocations. Since even removing
// a key-value from a map and putting it again on next recording will cost an allocation,
// we are keeping the aggregator handles in their map, and only reset their value once
// we finish collecting the aggregated value from each one.
// The SDK must adhere to keeping no more than maxCardinality unique Attributes in memory,
// hence during collect(), when the map is at full capacity, we try to clear away unused
// aggregator handles, so on next recording cycle using this map, there will be room for newly
// recorded Attributes. This comes at the expanse of memory allocations. This can be avoided
// if the user chooses to increase the maxCardinality.
if (memoryMode == REUSABLE_DATA && reset) {
if (aggregatorHandles.size() >= maxCardinality) {
aggregatorHandles.forEach(
(attribute, handle) -> {
if (!handle.hasRecordedValues()) {
aggregatorHandles.remove(attribute);
asafm marked this conversation as resolved.
Show resolved Hide resolved
}
});
}
}

// Grab aggregated points.
List<T> points = new ArrayList<>(aggregatorHandles.size());
aggregatorHandles.forEach(
(attributes, handle) -> {
asafm marked this conversation as resolved.
Show resolved Hide resolved
if (!handle.hasRecordedValues()) {
return;
}
T point = handle.aggregateThenMaybeReset(start, epochNanos, attributes, reset);
if (reset) {

if (reset && memoryMode == IMMUTABLE_DATA) {
// Return the aggregator to the pool.
// The pool is only used in DELTA temporality (since in CUMULATIVE the handler is
// always used as it is the place accumulating the values and never resets)
// AND only in IMMUTABLE_DATA memory mode since in REUSABLE_DATA we avoid
// using the pool since it allocates memory internally on each put() or remove()
aggregatorHandlePool.offer(handle);
}

if (point != null) {
points.add(point);
}
Expand All @@ -229,6 +290,10 @@ public MetricData collect(
aggregatorHandlePool.poll();
}

if (reset && memoryMode == REUSABLE_DATA) {
previousCollectionAggregatorHandles = aggregatorHandles;
}

if (points.isEmpty()) {
return EmptyMetricData.getInstance();
}
Expand All @@ -243,8 +308,7 @@ public MetricDescriptor getMetricDescriptor() {
}

private static class AggregatorHolder<T extends PointData, U extends ExemplarData> {
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles;
// Recording threads grab the current interval (AggregatorHolder) and atomically increment
// this by 2 before recording against it (and then decrement by two when done).
//
Expand All @@ -260,5 +324,14 @@ private static class AggregatorHolder<T extends PointData, U extends ExemplarDat
// all it needs to do is release the "read lock" it just obtained (decrementing by 2),
// and then grab and record against the new current interval (AggregatorHolder).
private final AtomicInteger activeRecordingThreads = new AtomicInteger(0);

private AggregatorHolder() {
aggregatorHandles = new ConcurrentHashMap<>();
}

private AggregatorHolder(
ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles) {
this.aggregatorHandles = aggregatorHandles;
}
}
}
Loading