Skip to content

Commit

Permalink
Merge pull request quarkusio#38925 from jimkont/vertx-otel-exporters-…
Browse files Browse the repository at this point in the history
…shutdown

Improve shutdown of VertxHttpExporter and VertxGrpcExporter
  • Loading branch information
brunobat authored Feb 21, 2024
2 parents a7bcd02 + 295395d commit 266f49b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ final class VertxGrpcExporter implements SpanExporter {
// We only log unimplemented once since it's a configuration issue that won't be recovered.
private final AtomicBoolean loggedUnimplemented = new AtomicBoolean();
private final AtomicBoolean isShutdown = new AtomicBoolean();
private final CompletableResultCode shutdownResult = new CompletableResultCode();
private final String type;
private final ExporterMetrics exporterMetrics;
private final SocketAddress server;
Expand Down Expand Up @@ -145,11 +146,25 @@ public CompletableResultCode flush() {
@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
logger.log(Level.FINE, "Calling shutdown() multiple times.");
return shutdownResult;
}
client.close();
return CompletableResultCode.ofSuccess();

client.close()
.onSuccess(
new Handler<>() {
@Override
public void handle(Void event) {
shutdownResult.succeed();
}
})
.onFailure(new Handler<>() {
@Override
public void handle(Throwable event) {
shutdownResult.fail();
}
});
return shutdownResult;
}

private static final class ClientRequestOnSuccessHandler implements Handler<GrpcClientRequest<Buffer, Buffer>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;

import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.http.HttpSender;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.quarkus.vertx.core.runtime.BufferOutputStream;
Expand All @@ -31,6 +35,9 @@

final class VertxHttpExporter implements SpanExporter {

private static final Logger internalLogger = Logger.getLogger(VertxHttpExporter.class.getName());
private static final ThrottlingLogger logger = new ThrottlingLogger(internalLogger);

private final HttpExporter<TraceRequestMarshaler> delegate;

VertxHttpExporter(HttpExporter<TraceRequestMarshaler> delegate) {
Expand Down Expand Up @@ -83,6 +90,9 @@ static final class VertxHttpSender implements HttpSender {
this.client = vertx.createHttpClient(httpClientOptions);
}

private final AtomicBoolean isShutdown = new AtomicBoolean();
private final CompletableResultCode shutdownResult = new CompletableResultCode();

private static String determineBasePath(URI baseUri) {
String path = baseUri.getPath();
if (path.isEmpty() || path.equals("/")) {
Expand Down Expand Up @@ -173,8 +183,26 @@ public byte[] responseBody() {

@Override
public CompletableResultCode shutdown() {
client.close();
return CompletableResultCode.ofSuccess();
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.FINE, "Calling shutdown() multiple times.");
return shutdownResult;
}

client.close()
.onSuccess(
new Handler<>() {
@Override
public void handle(Void event) {
shutdownResult.succeed();
}
})
.onFailure(new Handler<>() {
@Override
public void handle(Throwable event) {
shutdownResult.fail();
}
});
return shutdownResult;
}
}
}

0 comments on commit 266f49b

Please sign in to comment.