Skip to content

Commit

Permalink
PrometheusHttpServer prevent concurrent reads when reusable memory mo…
Browse files Browse the repository at this point in the history
…de (#6371)
  • Loading branch information
jack-berg authored Apr 22, 2024
1 parent a5fc312 commit 8f791f2
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -70,6 +71,12 @@ public static PrometheusHttpServerBuilder builder() {
this.memoryMode = memoryMode;
this.prometheusRegistry = prometheusRegistry;
prometheusRegistry.register(prometheusMetricReader);
// When memory mode is REUSABLE_DATA, concurrent reads lead to data corruption. To prevent this,
// we configure prometheus with a single thread executor such that requests are handled
// sequentially.
if (memoryMode == MemoryMode.REUSABLE_DATA) {
executor = Executors.newSingleThreadExecutor();
}
try {
this.httpServer =
HTTPServer.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Predicate;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -93,7 +94,13 @@ public PrometheusHttpServerBuilder setAllowedResourceAttributesFilter(
return this;
}

/** Set the {@link MemoryMode}. */
/**
* Set the {@link MemoryMode}.
*
* <p>If set to {@link MemoryMode#REUSABLE_DATA}, requests are served sequentially which is
* accomplished by overriding {@link #setExecutor(ExecutorService)} to {@link
* Executors#newSingleThreadExecutor()}.
*/
public PrometheusHttpServerBuilder setMemoryMode(MemoryMode memoryMode) {
requireNonNull(memoryMode, "memoryMode");
this.memoryMode = memoryMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.CollectionRegistration;
Expand All @@ -39,6 +40,7 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -47,6 +49,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.zip.GZIPInputStream;
Expand Down Expand Up @@ -129,6 +132,68 @@ void fetchPrometheus() {
+ "target_info{kr=\"vr\"} 1\n");
}

@Test
void fetch_ReusableMemoryMode() throws InterruptedException {
try (PrometheusHttpServer prometheusServer =
PrometheusHttpServer.builder()
.setHost("localhost")
.setPort(0)
.setMemoryMode(MemoryMode.REUSABLE_DATA)
.build()) {
AtomicBoolean collectInProgress = new AtomicBoolean();
AtomicBoolean concurrentRead = new AtomicBoolean();
prometheusServer.register(
new CollectionRegistration() {
@Override
public Collection<MetricData> collectAllMetrics() {
if (!collectInProgress.compareAndSet(false, true)) {
concurrentRead.set(true);
}
Collection<MetricData> response = metricData.get();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (!collectInProgress.compareAndSet(true, false)) {
concurrentRead.set(true);
}
return response;
}
});

WebClient client =
WebClient.builder("http://localhost:" + prometheusServer.getAddress().getPort())
.decorator(RetryingClient.newDecorator(RetryRule.failsafe()))
.build();

// Spin up 4 threads calling /metrics simultaneously. If concurrent read happens,
// collectAllMetrics will set concurrentRead to true and the test will fail.
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 4; i++) {
Thread thread =
new Thread(
() -> {
for (int j = 0; j < 10; j++) {
AggregatedHttpResponse response = client.get("/metrics").aggregate().join();
assertThat(response.status()).isEqualTo(HttpStatus.OK);
}
});
thread.setDaemon(true);
thread.start();
threads.add(thread);
}

// Wait for threads to complete
for (Thread thread : threads) {
thread.join();
}

// Confirm no concurrent reads took place
assertThat(concurrentRead.get()).isFalse();
}
}

@Test
void fetchOpenMetrics() {
AggregatedHttpResponse response =
Expand Down

0 comments on commit 8f791f2

Please sign in to comment.