diff --git a/bom/test/pom.xml b/bom/test/pom.xml
index 2933c5fb998de..bda70c0af5435 100644
--- a/bom/test/pom.xml
+++ b/bom/test/pom.xml
@@ -21,6 +21,8 @@
1.3.8
0.100.0
+
+ 0.20.0-alpha
@@ -66,6 +68,13 @@
jaxb-api
${jaxb-api.version}
+
+
+ io.opentelemetry.proto
+ opentelemetry-proto
+ ${opentelemetry-proto.version}
+
+
diff --git a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java
index e307913e61bec..d7ff085e7d956 100644
--- a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java
+++ b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java
@@ -1,5 +1,6 @@
package io.quarkus.opentelemetry.deployment;
+import static io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem.SPI_ROOT;
import static io.quarkus.opentelemetry.runtime.OpenTelemetryRecorder.OPEN_TELEMETRY_DRIVER;
import static java.util.stream.Collectors.toList;
@@ -17,6 +18,7 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.exporter.otlp.internal.OtlpSpanExporterProvider;
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider;
@@ -40,10 +42,14 @@
import io.quarkus.deployment.annotations.Produce;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.LaunchModeBuildItem;
+import io.quarkus.deployment.builditem.RemovedResourceBuildItem;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
+import io.quarkus.deployment.builditem.nativeimage.RuntimeReinitializedClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem;
+import io.quarkus.deployment.util.ServiceUtil;
+import io.quarkus.maven.dependency.ArtifactKey;
import io.quarkus.opentelemetry.runtime.OpenTelemetryProducer;
import io.quarkus.opentelemetry.runtime.OpenTelemetryRecorder;
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;
@@ -73,11 +79,30 @@ AdditionalBeanBuildItem ensureProducerIsRetained() {
}
@BuildStep
- void registerNativeImageResources(BuildProducer services) throws IOException {
- services.produce(ServiceProviderBuildItem.allProvidersFromClassPath(
- ConfigurableSpanExporterProvider.class.getName()));
+ void registerNativeImageResources(BuildProducer services,
+ BuildProducer removedResources,
+ BuildProducer runtimeReinitialized) throws IOException {
+
+ List spanExporterProviders = ServiceUtil.classNamesNamedIn(
+ Thread.currentThread().getContextClassLoader(),
+ SPI_ROOT + ConfigurableSpanExporterProvider.class.getName())
+ .stream()
+ .filter(p -> !OtlpSpanExporterProvider.class.getName().equals(p)).collect(toList()); // filter out OtlpSpanExporterProvider since it depends on OkHttp
+ if (!spanExporterProviders.isEmpty()) {
+ services.produce(
+ new ServiceProviderBuildItem(ConfigurableSpanExporterProvider.class.getName(), spanExporterProviders));
+ }
+ // remove the service file that contains OtlpSpanExporterProvider
+ removedResources.produce(new RemovedResourceBuildItem(
+ ArtifactKey.fromString("io.opentelemetry:opentelemetry-exporter-otlp"),
+ Set.of("META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider")));
+
+ runtimeReinitialized.produce(
+ new RuntimeReinitializedClassBuildItem("io.opentelemetry.sdk.autoconfigure.TracerProviderConfiguration"));
+
services.produce(ServiceProviderBuildItem.allProvidersFromClassPath(
ConfigurableSamplerProvider.class.getName()));
+
// The following are added but not officially supported, yet.
services.produce(ServiceProviderBuildItem.allProvidersFromClassPath(
AutoConfigurationCustomizerProvider.class.getName()));
diff --git a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/exporter/otlp/OtlpExporterProcessor.java b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/exporter/otlp/OtlpExporterProcessor.java
index 519b1e33d8794..92d2d12e9d604 100644
--- a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/exporter/otlp/OtlpExporterProcessor.java
+++ b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/exporter/otlp/OtlpExporterProcessor.java
@@ -17,6 +17,7 @@
import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterRuntimeConfig;
import io.quarkus.opentelemetry.runtime.exporter.otlp.OtlpExporterProvider;
import io.quarkus.opentelemetry.runtime.exporter.otlp.OtlpRecorder;
+import io.quarkus.vertx.deployment.VertxBuildItem;
@BuildSteps(onlyIf = OtlpExporterProcessor.OtlpExporterEnabled.class)
public class OtlpExporterProcessor {
@@ -47,9 +48,11 @@ void installBatchSpanProcessorForOtlp(
LaunchModeBuildItem launchModeBuildItem,
OTelRuntimeConfig otelRuntimeConfig,
OtlpExporterRuntimeConfig exporterRuntimeConfig,
+ VertxBuildItem vertxBuildItem,
BeanContainerBuildItem beanContainerBuildItem) {
recorder.installBatchSpanProcessorForOtlp(otelRuntimeConfig,
exporterRuntimeConfig,
+ vertxBuildItem.getVertx(),
launchModeBuildItem.getLaunchMode());
}
}
diff --git a/extensions/opentelemetry/runtime/pom.xml b/extensions/opentelemetry/runtime/pom.xml
index cce632cc789ce..0e0dcf59af816 100644
--- a/extensions/opentelemetry/runtime/pom.xml
+++ b/extensions/opentelemetry/runtime/pom.xml
@@ -111,6 +111,10 @@
org.checkerframework
checker-qual
+
+ com.squareup.okhttp3
+ okhttp
+
@@ -125,6 +129,36 @@
org.checkerframework
checker-qual
+
+ com.squareup.okhttp3
+ okhttp
+
+
+
+
+ io.vertx
+ vertx-grpc-client
+
+
+ org.codehaus.mojo
+ animal-sniffer-annotations
+
+
+ com.google.code.findbugs
+ jsr305
+
+
+ org.checkerframework
+ checker-qual
+
+
+ javax.annotation
+ javax.annotation-api
+
+
+ com.google.android
+ annotations
+
diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/runtime/exporter/OtlpExporterTracesConfig.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/runtime/exporter/OtlpExporterTracesConfig.java
index 5cc4daa9c2bf5..95ccbd654753c 100644
--- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/runtime/exporter/OtlpExporterTracesConfig.java
+++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/runtime/exporter/OtlpExporterTracesConfig.java
@@ -31,21 +31,6 @@ public interface OtlpExporterTracesConfig {
@WithDefault(DEFAULT_GRPC_BASE_URI)
Optional legacyEndpoint();
- // /**
- // * Sets the certificate chain to use for verifying servers when TLS is enabled. The {@code byte[]}
- // * should contain an X.509 certificate collection in PEM format. If not set, TLS connections will
- // * use the system default trusted certificates.
- // */
- // @ConfigItem()
- // public Optional certificate;
-
- // /**
- // * Sets ths client key and the certificate chain to use for verifying client when TLS is enabled.
- // * The key must be PKCS8, and both must be in PEM format.
- // */
- // @ConfigItem()
- // public Optional client;
-
/**
* Key-value pairs to be used as headers associated with gRPC requests.
* The format is similar to the {@code OTEL_EXPORTER_OTLP_HEADERS} environment variable,
@@ -73,6 +58,37 @@ public interface OtlpExporterTracesConfig {
@WithDefault(Protocol.HTTP_PROTOBUF)
Optional protocol();
+ /**
+ * Key/cert configuration in the PEM format.
+ */
+ @WithName("key-cert")
+ KeyCert keyCert();
+
+ /**
+ * Trust configuration in the PEM format.
+ */
+ @WithName("trust-cert")
+ TrustCert trustCert();
+
+ interface KeyCert {
+ /**
+ * Comma-separated list of the path to the key files (Pem format).
+ */
+ Optional> keys();
+
+ /**
+ * Comma-separated list of the path to the certificate files (Pem format).
+ */
+ Optional> certs();
+ }
+
+ interface TrustCert {
+ /**
+ * Comma-separated list of the trust certificate files (Pem format).
+ */
+ Optional> certs();
+ }
+
public static class Protocol {
public static final String GRPC = "grpc";
public static final String HTTP_PROTOBUF = "http/protobuf";
diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java
index 088bd5ec4849c..7704736dcc051 100644
--- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java
+++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java
@@ -3,20 +3,33 @@
import static io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterRuntimeConfig.DEFAULT_GRPC_BASE_URI;
import static io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterTracesConfig.Protocol.HTTP_PROTOBUF;
+import java.net.URI;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.spi.CDI;
-import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
-import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder;
+import io.opentelemetry.api.metrics.MeterProvider;
+import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
+import io.opentelemetry.exporter.internal.otlp.OtlpUserAgent;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.quarkus.opentelemetry.runtime.config.runtime.OTelRuntimeConfig;
+import io.quarkus.opentelemetry.runtime.config.runtime.exporter.CompressionType;
import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterRuntimeConfig;
+import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterTracesConfig;
import io.quarkus.runtime.LaunchMode;
+import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpClientOptions;
+import io.vertx.core.net.KeyCertOptions;
+import io.vertx.core.net.PemKeyCertOptions;
+import io.vertx.core.net.PemTrustOptions;
@Recorder
public class OtlpRecorder {
@@ -39,22 +52,23 @@ private static boolean excludeDefaultEndpoint(String endpoint) {
public void installBatchSpanProcessorForOtlp(
OTelRuntimeConfig otelRuntimeConfig,
OtlpExporterRuntimeConfig exporterRuntimeConfig,
+ RuntimeValue vertx,
LaunchMode launchMode) {
if (otelRuntimeConfig.sdkDisabled()) {
return;
}
- String endpoint = resolveEndpoint(exporterRuntimeConfig).trim();
+ String grpcBaseUri = resolveEndpoint(exporterRuntimeConfig).trim();
// Only create the OtlpGrpcSpanExporter if an endpoint was set in runtime config
- if (endpoint.length() > 0) {
+ if (grpcBaseUri.length() > 0) {
try {
// Load span exporter if provided by user
SpanExporter spanExporter = CDI.current()
.select(SpanExporter.class, Any.Literal.INSTANCE).stream().findFirst().orElse(null);
// CDI exporter was already added to a processor by OTEL
if (spanExporter == null) {
- spanExporter = createOtlpGrpcSpanExporter(exporterRuntimeConfig, endpoint);
+ spanExporter = createOtlpGrpcSpanExporter(exporterRuntimeConfig, grpcBaseUri, vertx.getValue());
// Create BatchSpanProcessor for OTLP and install into LateBoundBatchSpanProcessor
LateBoundBatchSpanProcessor delayedProcessor = CDI.current()
@@ -76,17 +90,26 @@ public void installBatchSpanProcessorForOtlp(
}
}
- private OtlpGrpcSpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfig exporterRuntimeConfig, String endpoint) {
- OtlpGrpcSpanExporterBuilder exporterBuilder = OtlpGrpcSpanExporter.builder()
- .setEndpoint(endpoint)
- .setTimeout(exporterRuntimeConfig.traces().timeout());
+ private SpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfig exporterRuntimeConfig, String endpoint,
+ Vertx vertx) {
- // FIXME TLS Support. Was not available before but will be available soon.
- // exporterRuntimeConfig.traces.certificate.ifPresent(exporterBuilder::setTrustedCertificates);
- // exporterRuntimeConfig.client.ifPresent(exporterBuilder::setClientTls);
+ OtlpExporterTracesConfig tracesConfig = exporterRuntimeConfig.traces();
+ if (tracesConfig.protocol().isPresent()) {
+ if (!tracesConfig.protocol().get().equals(HTTP_PROTOBUF)) {
+ throw new IllegalStateException("Only the GRPC Exporter is currently supported. " +
+ "Please check `quarkus.otel.exporter.otlp.traces.protocol` property");
+ }
+ }
- if (exporterRuntimeConfig.traces().headers().isPresent()) {
- List headers = exporterRuntimeConfig.traces().headers().get();
+ boolean compressionEnabled = false;
+ if (tracesConfig.compression().isPresent()) {
+ compressionEnabled = (tracesConfig.compression().get() == CompressionType.GZIP);
+ }
+
+ Map headersMap = new HashMap<>();
+ OtlpUserAgent.addUserAgentHeader(headersMap::put);
+ if (tracesConfig.headers().isPresent()) {
+ List headers = tracesConfig.headers().get();
if (!headers.isEmpty()) {
for (String header : headers) {
if (header.isEmpty()) {
@@ -95,22 +118,65 @@ private OtlpGrpcSpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfi
String[] parts = header.split("=", 2);
String key = parts[0].trim();
String value = parts[1].trim();
- exporterBuilder.addHeader(key, value);
+ headersMap.put(key, value);
}
}
}
- if (exporterRuntimeConfig.traces().compression().isPresent()) {
- exporterBuilder.setCompression(exporterRuntimeConfig.traces().compression().get().getValue());
- }
+ URI grpcBaseUri = ExporterBuilderUtil.validateEndpoint(endpoint);
+ return new VertxGrpcExporter(
+ "otlp", // use the same as OTel does
+ "span", // use the same as OTel does
+ MeterProvider::noop,
+ grpcBaseUri,
+ compressionEnabled,
+ tracesConfig.timeout(),
+ headersMap,
+ new Consumer<>() {
+ @Override
+ public void accept(HttpClientOptions options) {
+ configureTLS(options);
+ }
- if (exporterRuntimeConfig.traces().protocol().isPresent()) {
- if (!exporterRuntimeConfig.traces().protocol().get().equals(HTTP_PROTOBUF)) {
- throw new IllegalStateException("Only the GRPC Exporter is currently supported. " +
- "Please check `quarkus.otel.exporter.otlp.traces.protocol` property");
- }
- }
+ private void configureTLS(HttpClientOptions options) {
+ // TODO: this can reuse existing stuff when https://github.com/quarkusio/quarkus/pull/33228 is in
+ options.setKeyCertOptions(toPemKeyCertOptions(tracesConfig));
+ options.setPemTrustOptions(toPemTrustOptions(tracesConfig));
+
+ if (VertxGrpcExporter.isHttps(grpcBaseUri)) {
+ options.setSsl(true);
+ options.setUseAlpn(true);
+ }
+ }
+
+ private KeyCertOptions toPemKeyCertOptions(OtlpExporterTracesConfig configuration) {
+ PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions();
+ OtlpExporterTracesConfig.KeyCert keyCert = configuration.keyCert();
+ if (keyCert.certs().isPresent()) {
+ for (String cert : keyCert.certs().get()) {
+ pemKeyCertOptions.addCertPath(cert);
+ }
+ }
+ if (keyCert.keys().isPresent()) {
+ for (String cert : keyCert.keys().get()) {
+ pemKeyCertOptions.addKeyPath(cert);
+ }
+ }
+ return pemKeyCertOptions;
+ }
+
+ private PemTrustOptions toPemTrustOptions(OtlpExporterTracesConfig configuration) {
+ PemTrustOptions pemTrustOptions = new PemTrustOptions();
+ OtlpExporterTracesConfig.TrustCert trustCert = configuration.trustCert();
+ if (trustCert.certs().isPresent()) {
+ for (String cert : trustCert.certs().get()) {
+ pemTrustOptions.addCertPath(cert);
+ }
+ }
+ return pemTrustOptions;
+ }
+ },
+ vertx);
- return exporterBuilder.build();
}
}
diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java
new file mode 100644
index 0000000000000..6d4a4e54cea87
--- /dev/null
+++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java
@@ -0,0 +1,371 @@
+package io.quarkus.opentelemetry.runtime.exporter.otlp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.opentelemetry.api.metrics.MeterProvider;
+import io.opentelemetry.exporter.internal.ExporterMetrics;
+import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
+import io.opentelemetry.sdk.common.CompletableResultCode;
+import io.opentelemetry.sdk.internal.ThrottlingLogger;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SpanExporter;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClientOptions;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.core.tracing.TracingPolicy;
+import io.vertx.grpc.client.GrpcClient;
+import io.vertx.grpc.client.GrpcClientRequest;
+import io.vertx.grpc.client.GrpcClientResponse;
+import io.vertx.grpc.common.GrpcStatus;
+import io.vertx.grpc.common.ServiceName;
+
+final class VertxGrpcExporter implements SpanExporter {
+
+ private static final String GRPC_SERVICE_NAME = "opentelemetry.proto.collector.trace.v1.TraceService";
+ private static final String GRPC_METHOD_NAME = "Export";
+
+ private static final String GRPC_STATUS = "grpc-status";
+ private static final String GRPC_MESSAGE = "grpc-message";
+
+ private static final Logger internalLogger = Logger.getLogger(VertxGrpcExporter.class.getName());
+
+ private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); // TODO: is there something in JBoss Logging we can use?
+
+ // We only log unimplemented once since it's a configuration issue that won't be recovered.
+ private final AtomicBoolean loggedUnimplemented = new AtomicBoolean();
+ private final AtomicBoolean isShutdown = new AtomicBoolean();
+ private final String type;
+ private final ExporterMetrics exporterMetrics;
+ private final SocketAddress server;
+ private final boolean compressionEnabled;
+ private final Map headers;
+
+ private final GrpcClient client;
+
+ VertxGrpcExporter(
+ String exporterName,
+ String type,
+ Supplier meterProviderSupplier,
+ URI grpcBaseUri, boolean compressionEnabled,
+ Duration timeout,
+ Map headersMap,
+ Consumer clientOptionsCustomizer,
+ Vertx vertx) {
+ this.type = type;
+ this.exporterMetrics = ExporterMetrics.createGrpcOkHttp(exporterName, type, meterProviderSupplier);
+ this.server = SocketAddress.inetSocketAddress(getPort(grpcBaseUri), grpcBaseUri.getHost());
+ this.compressionEnabled = compressionEnabled;
+ this.headers = headersMap;
+ var httpClientOptions = new HttpClientOptions()
+ .setHttp2ClearTextUpgrade(false) // needed otherwise connections get closed immediately
+ .setReadIdleTimeout((int) timeout.getSeconds())
+ .setTracingPolicy(TracingPolicy.IGNORE); // needed to avoid tracing the calls from this gRPC client
+ clientOptionsCustomizer.accept(httpClientOptions);
+ this.client = GrpcClient.client(vertx, httpClientOptions);
+ }
+
+ private static int getPort(URI uri) {
+ int originalPort = uri.getPort();
+ if (originalPort > -1) {
+ return originalPort;
+ }
+
+ if (isHttps(uri)) {
+ return 443;
+ }
+ return 80;
+ }
+
+ static boolean isHttps(URI uri) {
+ return "https".equals(uri.getScheme().toLowerCase(Locale.ROOT));
+ }
+
+ private CompletableResultCode export(TraceRequestMarshaler marshaler, int numItems) {
+ if (isShutdown.get()) {
+ return CompletableResultCode.ofFailure();
+ }
+
+ exporterMetrics.addSeen(numItems);
+
+ var result = new CompletableResultCode();
+ var onSuccessHandler = new ClientRequestOnSuccessHandler(headers, compressionEnabled, exporterMetrics, marshaler,
+ loggedUnimplemented, logger, type, numItems, result);
+ client.request(server)
+ .onSuccess(onSuccessHandler)
+ .onFailure(new Handler<>() {
+ @Override
+ public void handle(Throwable t) {
+ // TODO: is there a better way todo retry?
+ // TODO: should we only retry on a specific errors?
+
+ client.request(server)
+ .onSuccess(onSuccessHandler)
+ .onFailure(new Handler<>() {
+ @Override
+ public void handle(Throwable event) {
+ failOnClientRequest(numItems, t, result);
+ }
+ });
+ }
+ });
+
+ return result;
+ }
+
+ private void failOnClientRequest(int numItems, Throwable t, CompletableResultCode result) {
+ exporterMetrics.addFailed(numItems);
+ logger.log(
+ Level.SEVERE,
+ "Failed to export "
+ + type
+ + "s. The request could not be executed. Full error message: "
+ + t.getMessage());
+ result.fail();
+ }
+
+ @Override
+ public CompletableResultCode export(Collection spans) {
+ TraceRequestMarshaler request = TraceRequestMarshaler.create(spans);
+
+ return export(request, spans.size());
+ }
+
+ @Override
+ public CompletableResultCode flush() {
+ return CompletableResultCode.ofSuccess();
+ }
+
+ @Override
+ public CompletableResultCode shutdown() {
+ if (!isShutdown.compareAndSet(false, true)) {
+ logger.log(Level.INFO, "Calling shutdown() multiple times.");
+ return CompletableResultCode.ofSuccess();
+ }
+ client.close();
+ return CompletableResultCode.ofSuccess();
+ }
+
+ private static final class NonCopyingByteArrayOutputStream extends ByteArrayOutputStream {
+
+ public NonCopyingByteArrayOutputStream(int size) {
+ super(size);
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ return buf;
+ }
+ }
+
+ private static final class ClientRequestOnSuccessHandler implements Handler> {
+
+ private final Map headers;
+ private final boolean compressionEnabled;
+ private final ExporterMetrics exporterMetrics;
+
+ private final TraceRequestMarshaler marshaler;
+ private final AtomicBoolean loggedUnimplemented;
+ private final ThrottlingLogger logger;
+ private final String type;
+ private final int numItems;
+ private final CompletableResultCode result;
+
+ public ClientRequestOnSuccessHandler(Map headers,
+ boolean compressionEnabled,
+ ExporterMetrics exporterMetrics,
+ TraceRequestMarshaler marshaler,
+ AtomicBoolean loggedUnimplemented,
+ ThrottlingLogger logger,
+ String type,
+ int numItems,
+ CompletableResultCode result) {
+ this.headers = headers;
+ this.compressionEnabled = compressionEnabled;
+ this.exporterMetrics = exporterMetrics;
+ this.marshaler = marshaler;
+ this.loggedUnimplemented = loggedUnimplemented;
+ this.logger = logger;
+ this.type = type;
+ this.numItems = numItems;
+ this.result = result;
+ }
+
+ @Override
+ public void handle(GrpcClientRequest request) {
+ if (compressionEnabled) {
+ request.encoding("gzip");
+ }
+
+ // Set the service name and the method to call
+ request.serviceName(ServiceName.create(GRPC_SERVICE_NAME));
+ request.methodName(GRPC_METHOD_NAME);
+
+ if (!headers.isEmpty()) {
+ var vertxHeaders = request.headers();
+ for (var entry : headers.entrySet()) {
+ vertxHeaders.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ try {
+ int messageSize = marshaler.getBinarySerializedSize();
+ var baos = new NonCopyingByteArrayOutputStream(messageSize);
+ marshaler.writeBinaryTo(baos);
+ Buffer buffer = Buffer.buffer(messageSize);
+ buffer.appendBytes(baos.toByteArray());
+ request.send(buffer).onSuccess(new Handler<>() {
+ @Override
+ public void handle(GrpcClientResponse response) {
+ GrpcStatus status = getStatus(response);
+ if (status == GrpcStatus.OK) {
+ exporterMetrics.addSuccess(numItems);
+ result.succeed();
+ return;
+ }
+ String statusMessage = getStatusMessage(response);
+ if (statusMessage == null) {
+ // TODO: this needs investigation, when this happened, the spans actually got to the server, but for some reason no status code was present in the result
+ exporterMetrics.addSuccess(numItems);
+ result.succeed();
+ return;
+ }
+
+ logAppropriateWarning(status, statusMessage);
+ exporterMetrics.addFailed(numItems);
+ result.fail();
+ }
+
+ private void logAppropriateWarning(GrpcStatus status,
+ String statusMessage) {
+ if (status == GrpcStatus.UNIMPLEMENTED) {
+ if (loggedUnimplemented.compareAndSet(false, true)) {
+ logUnimplemented(internalLogger, type, statusMessage);
+ }
+ } else if (status == GrpcStatus.UNAVAILABLE) {
+ logger.log(
+ Level.SEVERE,
+ "Failed to export "
+ + type
+ + "s. Server is UNAVAILABLE. "
+ + "Make sure your collector is running and reachable from this network. "
+ + "Full error message:"
+ + statusMessage);
+ } else {
+ if (status == null) {
+ logger.log(
+ Level.WARNING,
+ "Failed to export "
+ + type
+ + "s. Server responded with error message: "
+ + statusMessage);
+ } else {
+ logger.log(
+ Level.WARNING,
+ "Failed to export "
+ + type
+ + "s. Server responded with "
+ + status.code
+ + ". Error message: "
+ + statusMessage);
+ }
+ }
+ }
+
+ private void logUnimplemented(Logger logger, String type, String fullErrorMessage) {
+ String envVar;
+ switch (type) {
+ case "span":
+ envVar = "OTEL_TRACES_EXPORTER";
+ break;
+ case "metric":
+ envVar = "OTEL_METRICS_EXPORTER";
+ break;
+ case "log":
+ envVar = "OTEL_LOGS_EXPORTER";
+ break;
+ default:
+ throw new IllegalStateException(
+ "Unrecognized type, this is a programming bug in the OpenTelemetry SDK");
+ }
+
+ logger.log(
+ Level.SEVERE,
+ "Failed to export "
+ + type
+ + "s. Server responded with UNIMPLEMENTED. "
+ + "This usually means that your collector is not configured with an otlp "
+ + "receiver in the \"pipelines\" section of the configuration. "
+ + "If export is not desired and you are using OpenTelemetry autoconfiguration or the javaagent, "
+ + "disable export by setting "
+ + envVar
+ + "=none. "
+ + "Full error message: "
+ + fullErrorMessage);
+ }
+
+ private GrpcStatus getStatus(GrpcClientResponse, ?> response) {
+ // Status can either be in the headers or trailers depending on error
+ GrpcStatus result = response.status();
+ if (result == null) {
+ String statusFromTrailer = response.trailers().get(GRPC_STATUS);
+ if (statusFromTrailer != null) {
+ result = GrpcStatus.valueOf(Integer.parseInt(statusFromTrailer));
+ }
+ }
+ return result;
+ }
+
+ private String getStatusMessage(GrpcClientResponse response) {
+ // Status message can either be in the headers or trailers depending on error
+ String result = response.statusMessage();
+ if (result == null) {
+ result = response.trailers().get(GRPC_MESSAGE);
+ if (result != null) {
+ result = QueryStringDecoder.decodeComponent(result, StandardCharsets.UTF_8);
+ }
+
+ }
+ return result;
+ }
+
+ }).onFailure(new Handler<>() {
+ @Override
+ public void handle(Throwable t) {
+ exporterMetrics.addFailed(numItems);
+ logger.log(
+ Level.SEVERE,
+ "Failed to export "
+ + type
+ + "s. The request could not be executed. Full error message: "
+ + t.getMessage());
+ result.fail();
+ }
+ });
+ } catch (IOException e) {
+ exporterMetrics.addFailed(numItems);
+ logger.log(
+ Level.SEVERE,
+ "Failed to export "
+ + type
+ + "s. Unable to serialize payload. Full error message: "
+ + e.getMessage());
+ result.fail();
+ }
+ }
+ }
+}
diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/graal/Target_io_opentelemetry_exporter_otlp_internal_OtlpSpanExporterProvider.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/graal/Target_io_opentelemetry_exporter_otlp_internal_OtlpSpanExporterProvider.java
new file mode 100644
index 0000000000000..5ce28d2793845
--- /dev/null
+++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/graal/Target_io_opentelemetry_exporter_otlp_internal_OtlpSpanExporterProvider.java
@@ -0,0 +1,9 @@
+package io.quarkus.opentelemetry.runtime.exporter.otlp.graal;
+
+import com.oracle.svm.core.annotate.Delete;
+import com.oracle.svm.core.annotate.TargetClass;
+
+@TargetClass(className = "io.opentelemetry.exporter.otlp.internal.OtlpSpanExporterProvider")
+@Delete
+public final class Target_io_opentelemetry_exporter_otlp_internal_OtlpSpanExporterProvider {
+}
diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/InstrumenterVertxTracer.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/InstrumenterVertxTracer.java
index c1ec2a629ef98..e4c88ada61308 100644
--- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/InstrumenterVertxTracer.java
+++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/InstrumenterVertxTracer.java
@@ -29,6 +29,10 @@ default SpanOperation receiveRequest(
final Iterable> headers,
final TagExtractor tagExtractor) {
+ if (TracingPolicy.IGNORE == policy) {
+ return null;
+ }
+
Instrumenter instrumenter = getReceiveRequestInstrumenter();
io.opentelemetry.context.Context parentContext = QuarkusContextStorage.getContext(context);
if (parentContext == null) {
@@ -80,6 +84,10 @@ default SpanOperation sendRequest(
final BiConsumer headers,
final TagExtractor tagExtractor) {
+ if (TracingPolicy.IGNORE == policy) {
+ return null;
+ }
+
Instrumenter instrumenter = getSendRequestInstrumenter();
io.opentelemetry.context.Context parentContext = QuarkusContextStorage.getContext(context);
if (parentContext == null) {
diff --git a/extensions/opentelemetry/runtime/src/test/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorderTest.java b/extensions/opentelemetry/runtime/src/test/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorderTest.java
index 8fdb47c4c8e96..643667e8a3a1b 100644
--- a/extensions/opentelemetry/runtime/src/test/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorderTest.java
+++ b/extensions/opentelemetry/runtime/src/test/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorderTest.java
@@ -109,6 +109,31 @@ public Duration timeout() {
public Optional protocol() {
return Optional.empty();
}
+
+ @Override
+ public KeyCert keyCert() {
+ return new KeyCert() {
+ @Override
+ public Optional> keys() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional> certs() {
+ return Optional.empty();
+ }
+ };
+ }
+
+ @Override
+ public TrustCert trustCert() {
+ return new TrustCert() {
+ @Override
+ public Optional> certs() {
+ return Optional.empty();
+ }
+ };
+ }
};
}
};
diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/pom.xml b/integration-tests/opentelemetry-vertx-grpc-exporter/pom.xml
new file mode 100644
index 0000000000000..0635e60e990d6
--- /dev/null
+++ b/integration-tests/opentelemetry-vertx-grpc-exporter/pom.xml
@@ -0,0 +1,177 @@
+
+
+ 4.0.0
+
+
+ io.quarkus
+ quarkus-integration-tests-parent
+ 999-SNAPSHOT
+
+
+ quarkus-integration-test-opentelemetry-vertx-grpc-exporter
+ Quarkus - Integration Tests - OpenTelemetry Vert.x gRPC exporter
+
+
+
+ io.quarkus
+ quarkus-opentelemetry
+
+
+ io.quarkus
+ quarkus-resteasy-reactive-jackson
+
+
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.bouncycastle
+ bcpkix-jdk18on
+ test
+
+
+
+ io.vertx
+ vertx-grpc-server
+ test
+
+
+ org.codehaus.mojo
+ animal-sniffer-annotations
+
+
+ com.google.code.findbugs
+ jsr305
+
+
+ org.checkerframework
+ checker-qual
+
+
+ javax.annotation
+ javax.annotation-api
+
+
+ com.google.android
+ annotations
+
+
+
+
+
+ io.opentelemetry.proto
+ opentelemetry-proto
+ test
+
+
+
+
+ io.quarkus
+ quarkus-opentelemetry-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+ io.quarkus
+ quarkus-resteasy-reactive-jackson-deployment
+ ${project.version}
+ pom
+ test
+
+
+ *
+ *
+
+
+
+
+
+
+
+
+ maven-surefire-plugin
+
+ true
+
+
+
+ maven-failsafe-plugin
+
+ true
+
+
+
+ io.quarkus
+ quarkus-maven-plugin
+
+
+
+ build
+
+
+
+
+
+
+
+
+
+ test-otel-grpc-exporter
+
+
+ test-containers
+
+
+
+
+
+ maven-surefire-plugin
+
+ false
+
+
+
+ maven-failsafe-plugin
+
+ false
+
+
+
+
+
+
+
+
diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/main/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/HelloResource.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/main/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/HelloResource.java
new file mode 100644
index 0000000000000..7e2021e2ed5fe
--- /dev/null
+++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/main/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/HelloResource.java
@@ -0,0 +1,14 @@
+package io.quarkus.it.opentelemetry.vertx.grpc.exporter;
+
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+
+@Path("hello")
+public class HelloResource {
+
+ @GET
+ public String get() {
+ return "get";
+ }
+
+}
diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/main/resources/application.properties b/integration-tests/opentelemetry-vertx-grpc-exporter/src/main/resources/application.properties
new file mode 100644
index 0000000000000..edb424258ea0a
--- /dev/null
+++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/main/resources/application.properties
@@ -0,0 +1 @@
+quarkus.application.name=integration test
diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/AbstractExporterTest.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/AbstractExporterTest.java
new file mode 100644
index 0000000000000..1b8805686c2a1
--- /dev/null
+++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/AbstractExporterTest.java
@@ -0,0 +1,79 @@
+package io.quarkus.it.opentelemetry.vertx.grpc.exporter;
+
+import static io.restassured.RestAssured.when;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.time.Duration;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.trace.v1.ResourceSpans;
+import io.opentelemetry.proto.trace.v1.ScopeSpans;
+import io.opentelemetry.proto.trace.v1.Span;
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
+
+public abstract class AbstractExporterTest {
+
+ Traces traces;
+
+ @BeforeEach
+ @AfterEach
+ void setUp() {
+ traces.reset();
+ }
+
+ @Test
+ void test() {
+ verifyHttpResponse();
+ verifyTraces();
+ }
+
+ private void verifyHttpResponse() {
+ when()
+ .get("/hello")
+ .then()
+ .statusCode(200);
+ }
+
+ private void verifyTraces() {
+ await()
+ .atMost(Duration.ofSeconds(30))
+ .untilAsserted(() -> assertThat(traces.getTraceRequests()).hasSize(1));
+
+ ExportTraceServiceRequest request = traces.getTraceRequests().get(0);
+ assertThat(request.getResourceSpansCount()).isEqualTo(1);
+
+ ResourceSpans resourceSpans = request.getResourceSpans(0);
+ assertThat(resourceSpans.getResource().getAttributesList())
+ .contains(
+ KeyValue.newBuilder()
+ .setKey(ResourceAttributes.SERVICE_NAME.getKey())
+ .setValue(AnyValue.newBuilder()
+ .setStringValue("integration test").build())
+ .build())
+ .contains(
+ KeyValue.newBuilder()
+ .setKey(ResourceAttributes.WEBENGINE_NAME.getKey())
+ .setValue(AnyValue.newBuilder()
+ .setStringValue("Quarkus").build())
+ .build());
+ assertThat(resourceSpans.getScopeSpansCount()).isEqualTo(1);
+ ScopeSpans scopeSpans = resourceSpans.getScopeSpans(0);
+ assertThat(scopeSpans.getSpansCount()).isEqualTo(1);
+ Span span = scopeSpans.getSpans(0);
+ assertThat(span.getName()).isEqualTo("GET /hello");
+ assertThat(span.getAttributesList())
+ .contains(
+ KeyValue.newBuilder()
+ .setKey("http.method")
+ .setValue(AnyValue.newBuilder()
+ .setStringValue("GET").build())
+ .build());
+ }
+}
diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSNoCompressionTest.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSNoCompressionTest.java
new file mode 100644
index 0000000000000..47f6d31a98d4e
--- /dev/null
+++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSNoCompressionTest.java
@@ -0,0 +1,10 @@
+package io.quarkus.it.opentelemetry.vertx.grpc.exporter;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+
+@QuarkusTest
+@QuarkusTestResource(value = OtelCollectorLifecycleManager.class, restrictToAnnotatedClass = true)
+public class NoTLSNoCompressionTest extends AbstractExporterTest {
+
+}
diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSWithCompressionTest.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSWithCompressionTest.java
new file mode 100644
index 0000000000000..65f67e19d9f6e
--- /dev/null
+++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSWithCompressionTest.java
@@ -0,0 +1,11 @@
+package io.quarkus.it.opentelemetry.vertx.grpc.exporter;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.common.ResourceArg;
+import io.quarkus.test.junit.QuarkusTest;
+
+@QuarkusTest
+@QuarkusTestResource(value = OtelCollectorLifecycleManager.class, initArgs = @ResourceArg(name = "enableCompression", value = "true"), restrictToAnnotatedClass = true)
+public class NoTLSWithCompressionTest extends AbstractExporterTest {
+
+}
diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/OtelCollectorLifecycleManager.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/OtelCollectorLifecycleManager.java
new file mode 100644
index 0000000000000..fdcf042e8d973
--- /dev/null
+++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/OtelCollectorLifecycleManager.java
@@ -0,0 +1,186 @@
+package io.quarkus.it.opentelemetry.vertx.grpc.exporter;
+
+import static org.testcontainers.Testcontainers.exposeHostPorts;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.PullPolicy;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
+import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import io.vertx.core.Vertx;
+import io.vertx.core.VertxOptions;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.core.net.SelfSignedCertificate;
+import io.vertx.grpc.common.GrpcStatus;
+import io.vertx.grpc.common.ServiceName;
+import io.vertx.grpc.server.GrpcServer;
+
+public class OtelCollectorLifecycleManager implements QuarkusTestResourceLifecycleManager {
+
+ private static final String COLLECTOR_IMAGE = "ghcr.io/open-telemetry/opentelemetry-java/otel-collector";
+ private static final Integer COLLECTOR_OTLP_GRPC_PORT = 4317;
+ private static final Integer COLLECTOR_OTLP_HTTP_PORT = 4318;
+ private static final Integer COLLECTOR_OTLP_GRPC_MTLS_PORT = 5317;
+ private static final Integer COLLECTOR_OTLP_HTTP_MTLS_PORT = 5318;
+ private static final Integer COLLECTOR_HEALTH_CHECK_PORT = 13133;
+ private static final ServiceName TRACE_SERVICE_NAME = ServiceName
+ .create("opentelemetry.proto.collector.trace.v1.TraceService");
+ private static final String TRACE_METHOD_NAME = "Export";
+
+ private SelfSignedCertificate serverTls;
+ private SelfSignedCertificate clientTlS;
+ private boolean enableTLS = false;
+ private boolean enableCompression = false;
+ private Vertx vertx;
+
+ private HttpServer server;
+
+ private GenericContainer> collector;
+ private Traces collectedTraces;
+
+ @Override
+ public void init(Map initArgs) {
+ var enableTLSStr = initArgs.get("enableTLS");
+ if (enableTLSStr != null && !enableTLSStr.isEmpty()) {
+ enableTLS = Boolean.parseBoolean(enableTLSStr);
+ }
+
+ var enableCompressionStr = initArgs.get("enableCompression");
+ if (enableCompressionStr != null && !enableCompressionStr.isEmpty()) {
+ enableCompression = Boolean.parseBoolean(enableCompressionStr);
+ }
+ }
+
+ @Override
+ public Map start() {
+ setupVertxGrpcServer();
+ int vertxGrpcServerPort = server.actualPort();
+
+ // Expose the port the in-process OTLP gRPC server will run on before the collector is
+ // initialized so the collector can connect to it.
+ exposeHostPorts(vertxGrpcServerPort);
+
+ serverTls = SelfSignedCertificate.create();
+ clientTlS = SelfSignedCertificate.create();
+
+ collector = new GenericContainer<>(DockerImageName.parse(COLLECTOR_IMAGE))
+ .withImagePullPolicy(PullPolicy.alwaysPull())
+ .withEnv("LOGGING_EXPORTER_VERBOSITY_LEVEL", "normal")
+ .withCopyFileToContainer(
+ MountableFile.forHostPath(serverTls.certificatePath(), 0555),
+ "/server.cert")
+ .withCopyFileToContainer(
+ MountableFile.forHostPath(serverTls.privateKeyPath(), 0555), "/server.key")
+ .withCopyFileToContainer(
+ MountableFile.forHostPath(clientTlS.certificatePath(), 0555),
+ "/client.cert")
+ .withEnv(
+ "OTLP_EXPORTER_ENDPOINT", "host.testcontainers.internal:" + vertxGrpcServerPort)
+ .withEnv("MTLS_CLIENT_CERTIFICATE", "/client.cert")
+ .withEnv("MTLS_SERVER_CERTIFICATE", "/server.cert")
+ .withEnv("MTLS_SERVER_KEY", "/server.key")
+ .withClasspathResourceMapping(
+ "otel-config.yaml", "/otel-config.yaml", BindMode.READ_ONLY)
+ .withCommand("--config", "/otel-config.yaml")
+ .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("otel-collector")))
+ .withExposedPorts(
+ COLLECTOR_OTLP_GRPC_PORT,
+ COLLECTOR_OTLP_HTTP_PORT,
+ COLLECTOR_OTLP_GRPC_MTLS_PORT,
+ COLLECTOR_OTLP_HTTP_MTLS_PORT,
+ COLLECTOR_HEALTH_CHECK_PORT)
+ .waitingFor(Wait.forHttp("/").forPort(COLLECTOR_HEALTH_CHECK_PORT));
+ collector.start();
+
+ Map result = new HashMap<>();
+
+ if (enableTLS) {
+ result.put("quarkus.otel.exporter.otlp.traces.endpoint",
+ "https://" + collector.getHost() + ":" + collector.getMappedPort(COLLECTOR_OTLP_GRPC_MTLS_PORT));
+ result.put("quarkus.otel.exporter.otlp.traces.trust-cert.certs", serverTls.certificatePath());
+ result.put("quarkus.otel.exporter.otlp.traces.key-cert.certs", clientTlS.certificatePath());
+ result.put("quarkus.otel.exporter.otlp.traces.key-cert.keys", clientTlS.privateKeyPath());
+ } else {
+ result.put("quarkus.otel.exporter.otlp.traces.endpoint",
+ "http://" + collector.getHost() + ":" + collector.getMappedPort(COLLECTOR_OTLP_GRPC_PORT));
+ }
+
+ if (enableCompression) {
+ result.put("quarkus.otel.exporter.otlp.traces.compression", "gzip");
+ }
+
+ return result;
+ }
+
+ @Override
+ public void inject(TestInjector testInjector) {
+ testInjector.injectIntoFields(collectedTraces, f -> f.getType().equals(Traces.class));
+ }
+
+ private void setupVertxGrpcServer() {
+ vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(1).setEventLoopPoolSize(1));
+ GrpcServer grpcServer = GrpcServer.server(vertx);
+ collectedTraces = new Traces();
+ grpcServer.callHandler(request -> {
+
+ if (request.serviceName().equals(TRACE_SERVICE_NAME) && request.methodName().equals(TRACE_METHOD_NAME)) {
+
+ request.handler(message -> {
+ try {
+ collectedTraces.getTraceRequests().add(ExportTraceServiceRequest.parseFrom(message.getBytes()));
+ request.response().end(Buffer.buffer(ExportTraceServiceResponse.getDefaultInstance().toByteArray()));
+ } catch (InvalidProtocolBufferException e) {
+ request.response()
+ .status(GrpcStatus.INVALID_ARGUMENT)
+ .end();
+ }
+ });
+ } else {
+ request.response()
+ .status(GrpcStatus.NOT_FOUND)
+ .end();
+ }
+ });
+ server = vertx.createHttpServer(new HttpServerOptions().setPort(0));
+ try {
+ server.requestHandler(grpcServer).listen().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (server != null) {
+ server.close().andThen(ar -> {
+ if (vertx != null) {
+ vertx.close();
+ }
+ });
+ }
+ if (serverTls != null) {
+ serverTls.delete();
+ }
+ if (clientTlS != null) {
+ clientTlS.delete();
+ }
+ if (collector != null) {
+ collector.stop();
+ }
+ }
+}
diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/Traces.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/Traces.java
new file mode 100644
index 0000000000000..30e46df57dbc8
--- /dev/null
+++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/Traces.java
@@ -0,0 +1,19 @@
+package io.quarkus.it.opentelemetry.vertx.grpc.exporter;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
+
+public final class Traces {
+
+ private final List traceRequests = new CopyOnWriteArrayList<>();
+
+ public List getTraceRequests() {
+ return traceRequests;
+ }
+
+ public void reset() {
+ traceRequests.clear();
+ }
+}
diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSNoCompressionTest.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSNoCompressionTest.java
new file mode 100644
index 0000000000000..23d8aea2581b4
--- /dev/null
+++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSNoCompressionTest.java
@@ -0,0 +1,11 @@
+package io.quarkus.it.opentelemetry.vertx.grpc.exporter;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.common.ResourceArg;
+import io.quarkus.test.junit.QuarkusTest;
+
+@QuarkusTest
+@QuarkusTestResource(value = OtelCollectorLifecycleManager.class, initArgs = @ResourceArg(name = "enableTLS", value = "true"), restrictToAnnotatedClass = true)
+public class WithTLSNoCompressionTest extends AbstractExporterTest {
+
+}
diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSWithCompressionTest.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSWithCompressionTest.java
new file mode 100644
index 0000000000000..42b958965d3c6
--- /dev/null
+++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSWithCompressionTest.java
@@ -0,0 +1,14 @@
+package io.quarkus.it.opentelemetry.vertx.grpc.exporter;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.common.ResourceArg;
+import io.quarkus.test.junit.QuarkusTest;
+
+@QuarkusTest
+@QuarkusTestResource(value = OtelCollectorLifecycleManager.class, initArgs = {
+ @ResourceArg(name = "enableTLS", value = "true"),
+ @ResourceArg(name = "enableCompression", value = "true")
+}, restrictToAnnotatedClass = true)
+public class WithTLSWithCompressionTest extends AbstractExporterTest {
+
+}
diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/resources/otel-config.yaml b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/resources/otel-config.yaml
new file mode 100644
index 0000000000000..24e1239387835
--- /dev/null
+++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/resources/otel-config.yaml
@@ -0,0 +1,43 @@
+extensions:
+ health_check: {}
+receivers:
+ otlp:
+ protocols:
+ grpc:
+ endpoint: 0.0.0.0:4317
+# http:
+# endpoint: 0.0.0.0:4318
+ otlp/mtls:
+ protocols:
+ grpc:
+ endpoint: 0.0.0.0:5317
+ tls:
+ client_ca_file: $MTLS_CLIENT_CERTIFICATE
+ cert_file: $MTLS_SERVER_CERTIFICATE
+ key_file: $MTLS_SERVER_KEY
+# http:
+# endpoint: 0.0.0.0:5318
+# tls:
+# client_ca_file: $MTLS_CLIENT_CERTIFICATE
+# cert_file: $MTLS_SERVER_CERTIFICATE
+# key_file: $MTLS_SERVER_KEY
+exporters:
+ logging:
+ verbosity: $LOGGING_EXPORTER_VERBOSITY_LEVEL
+ otlp:
+ endpoint: $OTLP_EXPORTER_ENDPOINT
+ tls:
+ insecure: true
+ compression: none
+service:
+ extensions: [health_check]
+ pipelines:
+# metrics:
+# receivers: [otlp, otlp/mtls]
+# exporters: [logging, otlp]
+ traces:
+ receivers: [otlp, otlp/mtls]
+ exporters: [logging, otlp]
+# logs:
+# receivers: [otlp, otlp/mtls]
+# exporters: [logging, otlp]
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 7f2e5546f937f..0718dc18ff9c8 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -351,6 +351,7 @@
opentelemetry-vertx
opentelemetry-reactive
opentelemetry-grpc
+ opentelemetry-vertx-grpc-exporter
opentelemetry-reactive-messaging
logging-json
jaxb