Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix delta metric storage concurrency bug #5932

Merged
merged 6 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Meter build() {
.get("io.opentelemetry.sdk.metrics");
}
}),
SDK(
SDK_CUMULATIVE(
new SdkBuilder() {
@Override
Meter build() {
Expand All @@ -50,6 +50,19 @@ Meter build() {
.build()
.get("io.opentelemetry.sdk.metrics");
}
}),
SDK_DELTA(
new SdkBuilder() {
@Override
Meter build() {
return SdkMeterProvider.builder()
.setClock(Clock.getDefault())
.setResource(Resource.empty())
// Must register reader for real SDK.
.registerMetricReader(InMemoryMetricReader.createDelta())
.build()
.get("io.opentelemetry.sdk.metrics");
}
});

private final SdkBuilder sdkBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.StampedLock;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -46,7 +47,8 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
private final MetricDescriptor metricDescriptor;
private final AggregationTemporality aggregationTemporality;
private final Aggregator<T, U> aggregator;
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles =
private final StampedLock sl = new StampedLock();
private ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles =
new ConcurrentHashMap<>();
private final AttributesProcessor attributesProcessor;

Expand Down Expand Up @@ -83,8 +85,13 @@ Queue<AggregatorHandle<T, U>> getAggregatorHandlePool() {

@Override
public void recordLong(long value, Attributes attributes, Context context) {
AggregatorHandle<T, U> handle = getAggregatorHandle(attributes, context);
handle.recordLong(value, attributes, context);
long stamp = sl.readLock();
Copy link
Contributor

@asafm asafm Oct 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an important note and a question.

Important note: We must never block upon recording, as we deteriorate performance (recording must wait for write lock to finish).
I suggest we create a data structure called AggregatorHandles:

AggregatorHandles {
    ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles
    ReadWriteLock lock;
}

we should have an activeAggregatorHandles and standbyAggregatorHandles of that type.

When we record, we call readLock().tryLock. If we get false it means the write lock has been taken and we need to refresh the value from activeAggregatorHandles (explained below why) hence upon false we re-read the value at activeAggregatorHandles and call tryLock again - this should never fail - if it does, fail.

Upon collecting:

  1. First we switch between the activeAggregatorHandles and standbyAggregatorHandles - saving the value in active as the AggregatorHandles we will work on.
  2. We make sure to define both active and standby variable as volatile, updating them will be immediately visible.
  3. We take the the write lock writeLock().lock() - This will cause us to block until all readers which took a handle will finish recording. Since it's not user-dependant, it should be immediate and guaranteed to happen. It's ok to block in collect() as it's not as latency sensitive as record().
  4. Since we first made the switch and then obtained the write lock it means, right after the assignment to active variable, all record() will use that AggregatorHandles and obtain and use another lock (the newly active lock). The only "left-overs" we have are handles which retrieves the previous AggregatorHandles, and haven't yet managed to call readLock.lock(). There are two options for them:
    a. writeLock() was already called hence they will get "false" - this is a signal for them that the activeAggregatorHandles was switched - they need to re-read its value and obtain a read lock. We can wrap with a while loop. I tried thinking about it a lot I can't see it spinning forever - doesn't seem like a realistic option.
    b. readLock.lock() returns true and they continue to use it, and writeLock will wait for them.

Question.
I wonder why use StampedLock vs using ReentrantReadWriteLock which seems much easier to reason about, and doesn't require persisting a stamp in memory. It's harder to reason about the code when you see the stamp from lock without understanding why need the stamp.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm switching to ReentrantReadWriteLock for reasons described here.

We must never block upon recording, as we deteriorate performance (recording must wait for write lock to finish).

Sure. The approach you outline is optimistic reads. There's a number of different ways to do this, and they appear to be simpler / higher performance with StampedLock. But in either case, I'll rework it so recording measurements doesn't have to block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're still blocking :) Once a collection start you grab a writeLock, which blocks all readLock until collection has finished. Hence I suggested a different approach outlined above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic you suggest does check out and does not block. I'm running the performance tests now to evaluate if reading a volatile variable every time we record on the hot path degrades performance in a serious way. It could be the case that reading a non-volatile variable and only blocking for an extremely short amount of time once per collection is better than reading a volatile variable every record but never blocking.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so here's the JMH results for the plain read write lock solution which blocks for a narrow window when collecting vs. @asafm's proposal to always read from a volatile but never block. Source code for the volatile, never blocking solution here.

The solution to always read from a volatile and never block reduces performance on the record path by a modest ~4% versus the read write lock approach which blocks briefly during collection. Its also worth noting that as implemented, the volatile never block approach impacts the cumulative performance as well, which isn't strictly necessary since this concurrency bug only affects delta. Cumulative should be able to read a non-volatile variable safely since it never needs to change it.

try {
AggregatorHandle<T, U> handle = getAggregatorHandle(attributes, context);
handle.recordLong(value, attributes, context);
} finally {
sl.unlockRead(stamp);
}
}

@Override
Expand All @@ -99,8 +106,13 @@ public void recordDouble(double value, Attributes attributes, Context context) {
+ ". Dropping measurement.");
return;
}
AggregatorHandle<T, U> handle = getAggregatorHandle(attributes, context);
handle.recordDouble(value, attributes, context);
long stamp = sl.readLock();
try {
AggregatorHandle<T, U> handle = getAggregatorHandle(attributes, context);
handle.recordDouble(value, attributes, context);
} finally {
sl.unlockRead(stamp);
}
}

private AggregatorHandle<T, U> getAggregatorHandle(Attributes attributes, Context context) {
Expand Down Expand Up @@ -146,13 +158,25 @@ public MetricData collect(
? registeredReader.getLastCollectEpochNanos()
: startEpochNanos;

ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles;
if (reset) {
long stamp = sl.writeLock();
try {
aggregatorHandles = this.aggregatorHandles;
this.aggregatorHandles = new ConcurrentHashMap<>();
} finally {
sl.unlockWrite(stamp);
}
} else {
aggregatorHandles = this.aggregatorHandles;
}

// Grab aggregated points.
List<T> points = new ArrayList<>(aggregatorHandles.size());
aggregatorHandles.forEach(
(attributes, handle) -> {
T point = handle.aggregateThenMaybeReset(start, epochNanos, attributes, reset);
if (reset) {
aggregatorHandles.remove(attributes, handle);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the bug:

  • On line 153 we grab the aggregated data from the handle
  • On line 155 we remove the handle from the map
  • Removing the handle from the map prompts new measurements for the attributes to add a new handle to the map
  • Aggregating the data from the handle and removing it from the map is not atomic. This results in the possibility of lost writes: where another recording thread records to the handle after it has been aggregated but before it is removed from the map.

The solution is to guard the aggregatorHandles map with a read write lock. When collecting, we obtain a write lock and replace the aggregatorHandles map with a new instance. When recording, we obtain a read lock, which is cheap and blocks the collect thread from replacing the aggregatorHandles map until the record thread releases the read lock.

// Return the aggregator to the pool.
aggregatorHandlePool.offer(handle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.Uninterruptibles;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
Expand All @@ -21,9 +23,11 @@
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData;
Expand All @@ -37,8 +41,17 @@
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.sdk.testing.time.TestClock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;

@SuppressLogger(DefaultSynchronousMetricStorage.class)
Expand Down Expand Up @@ -370,4 +383,79 @@ void recordAndCollect_DeltaAtLimit() {
assertThat(storage.getAggregatorHandlePool()).hasSize(CARDINALITY_LIMIT);
logs.assertContains("Instrument name has exceeded the maximum allowed cardinality");
}

@ParameterizedTest
@MethodSource("concurrentStressTestArguments")
void recordAndCollect_concurrentStressTest(
DefaultSynchronousMetricStorage<?, ?> storage, BiConsumer<Double, AtomicDouble> collect) {
// Define record threads. Each records a value of 1.0, 2000 times
List<Thread> threads = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
Thread thread =
new Thread(
() -> {
for (int j = 0; j < 2000; j++) {
storage.recordDouble(1.0, Attributes.empty(), Context.current());
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1));
}
latch.countDown();
});
threads.add(thread);
}

// Define collect thread. Collect thread collects and aggregates the
AtomicDouble cumulativeSum = new AtomicDouble();
Thread collectThread =
new Thread(
() -> {
do {
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(1));
MetricData metricData =
storage.collect(Resource.empty(), InstrumentationScopeInfo.empty(), 0, 1);
if (metricData.isEmpty()) {
continue;
}
metricData.getDoubleSumData().getPoints().stream()
.findFirst()
.ifPresent(pointData -> collect.accept(pointData.getValue(), cumulativeSum));
} while (latch.getCount() != 0);
});

// Start all the threads
collectThread.start();
threads.forEach(Thread::start);

// Wait for the collect thread to end, which collects until the record threads are done
Uninterruptibles.joinUninterruptibly(collectThread);

assertThat(cumulativeSum.get()).isEqualTo(8000.0);
}

private static Stream<Arguments> concurrentStressTestArguments() {
Aggregator<PointData, ExemplarData> aggregator =
((AggregatorFactory) Aggregation.sum())
.createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff());
return Stream.of(
Arguments.of(
// Delta
new DefaultSynchronousMetricStorage<>(
RegisteredReader.create(InMemoryMetricReader.createDelta(), ViewRegistry.create()),
METRIC_DESCRIPTOR,
aggregator,
AttributesProcessor.noop(),
CARDINALITY_LIMIT),
(BiConsumer<Double, AtomicDouble>)
(value, cumulativeCount) -> cumulativeCount.addAndGet(value)),
Arguments.of(
// Cumulative
new DefaultSynchronousMetricStorage<>(
RegisteredReader.create(InMemoryMetricReader.create(), ViewRegistry.create()),
METRIC_DESCRIPTOR,
aggregator,
AttributesProcessor.noop(),
CARDINALITY_LIMIT),
(BiConsumer<Double, AtomicDouble>)
(value, cumulativeCount) -> cumulativeCount.set(value)));
}
}
Loading