From 7fa8659dd214b3f05e5bc6fda5fda98e7104a42b Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Mon, 10 Jul 2023 16:06:18 +0300 Subject: [PATCH 1/4] Take TracingPolicy.IGNORE into account --- .../intrumentation/vertx/InstrumenterVertxTracer.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/InstrumenterVertxTracer.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/InstrumenterVertxTracer.java index c1ec2a629ef98..e4c88ada61308 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/InstrumenterVertxTracer.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/intrumentation/vertx/InstrumenterVertxTracer.java @@ -29,6 +29,10 @@ default SpanOperation receiveRequest( final Iterable> headers, final TagExtractor tagExtractor) { + if (TracingPolicy.IGNORE == policy) { + return null; + } + Instrumenter instrumenter = getReceiveRequestInstrumenter(); io.opentelemetry.context.Context parentContext = QuarkusContextStorage.getContext(context); if (parentContext == null) { @@ -80,6 +84,10 @@ default SpanOperation sendRequest( final BiConsumer headers, final TagExtractor tagExtractor) { + if (TracingPolicy.IGNORE == policy) { + return null; + } + Instrumenter instrumenter = getSendRequestInstrumenter(); io.opentelemetry.context.Context parentContext = QuarkusContextStorage.getContext(context); if (parentContext == null) { From c3e43e9680201e82be107e4d981c06d6f6979331 Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Mon, 10 Jul 2023 09:10:48 +0300 Subject: [PATCH 2/4] Replace OkHttp tracing backend with Vert.x --- .../exporter/otlp/OtlpExporterProcessor.java | 3 + extensions/opentelemetry/runtime/pom.xml | 26 ++ .../runtime/exporter/otlp/OtlpRecorder.java | 59 ++- .../exporter/otlp/VertxGrpcExporter.java | 364 ++++++++++++++++++ 4 files changed, 431 insertions(+), 21 deletions(-) create mode 100644 extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java diff --git a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/exporter/otlp/OtlpExporterProcessor.java b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/exporter/otlp/OtlpExporterProcessor.java index 519b1e33d8794..92d2d12e9d604 100644 --- a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/exporter/otlp/OtlpExporterProcessor.java +++ b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/exporter/otlp/OtlpExporterProcessor.java @@ -17,6 +17,7 @@ import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterRuntimeConfig; import io.quarkus.opentelemetry.runtime.exporter.otlp.OtlpExporterProvider; import io.quarkus.opentelemetry.runtime.exporter.otlp.OtlpRecorder; +import io.quarkus.vertx.deployment.VertxBuildItem; @BuildSteps(onlyIf = OtlpExporterProcessor.OtlpExporterEnabled.class) public class OtlpExporterProcessor { @@ -47,9 +48,11 @@ void installBatchSpanProcessorForOtlp( LaunchModeBuildItem launchModeBuildItem, OTelRuntimeConfig otelRuntimeConfig, OtlpExporterRuntimeConfig exporterRuntimeConfig, + VertxBuildItem vertxBuildItem, BeanContainerBuildItem beanContainerBuildItem) { recorder.installBatchSpanProcessorForOtlp(otelRuntimeConfig, exporterRuntimeConfig, + vertxBuildItem.getVertx(), launchModeBuildItem.getLaunchMode()); } } diff --git a/extensions/opentelemetry/runtime/pom.xml b/extensions/opentelemetry/runtime/pom.xml index cce632cc789ce..2b03eb5fae352 100644 --- a/extensions/opentelemetry/runtime/pom.xml +++ b/extensions/opentelemetry/runtime/pom.xml @@ -127,6 +127,32 @@ + + io.vertx + vertx-grpc-client + + + org.codehaus.mojo + animal-sniffer-annotations + + + com.google.code.findbugs + jsr305 + + + org.checkerframework + checker-qual + + + javax.annotation + javax.annotation-api + + + com.google.android + annotations + + + diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java index 088bd5ec4849c..ed3d6148177e0 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java @@ -3,20 +3,26 @@ import static io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterRuntimeConfig.DEFAULT_GRPC_BASE_URI; import static io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterTracesConfig.Protocol.HTTP_PROTOBUF; +import java.util.HashMap; import java.util.List; +import java.util.Map; import jakarta.enterprise.inject.Any; import jakarta.enterprise.inject.spi.CDI; -import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; -import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.ExporterBuilderUtil; +import io.opentelemetry.exporter.internal.otlp.OtlpUserAgent; import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; import io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder; import io.opentelemetry.sdk.trace.export.SpanExporter; import io.quarkus.opentelemetry.runtime.config.runtime.OTelRuntimeConfig; +import io.quarkus.opentelemetry.runtime.config.runtime.exporter.CompressionType; import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterRuntimeConfig; import io.quarkus.runtime.LaunchMode; +import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.annotations.Recorder; +import io.vertx.core.Vertx; @Recorder public class OtlpRecorder { @@ -39,22 +45,23 @@ private static boolean excludeDefaultEndpoint(String endpoint) { public void installBatchSpanProcessorForOtlp( OTelRuntimeConfig otelRuntimeConfig, OtlpExporterRuntimeConfig exporterRuntimeConfig, + RuntimeValue vertx, LaunchMode launchMode) { if (otelRuntimeConfig.sdkDisabled()) { return; } - String endpoint = resolveEndpoint(exporterRuntimeConfig).trim(); + String grpcBaseUri = resolveEndpoint(exporterRuntimeConfig).trim(); // Only create the OtlpGrpcSpanExporter if an endpoint was set in runtime config - if (endpoint.length() > 0) { + if (grpcBaseUri.length() > 0) { try { // Load span exporter if provided by user SpanExporter spanExporter = CDI.current() .select(SpanExporter.class, Any.Literal.INSTANCE).stream().findFirst().orElse(null); // CDI exporter was already added to a processor by OTEL if (spanExporter == null) { - spanExporter = createOtlpGrpcSpanExporter(exporterRuntimeConfig, endpoint); + spanExporter = createOtlpGrpcSpanExporter(exporterRuntimeConfig, grpcBaseUri, vertx.getValue()); // Create BatchSpanProcessor for OTLP and install into LateBoundBatchSpanProcessor LateBoundBatchSpanProcessor delayedProcessor = CDI.current() @@ -76,15 +83,27 @@ public void installBatchSpanProcessorForOtlp( } } - private OtlpGrpcSpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfig exporterRuntimeConfig, String endpoint) { - OtlpGrpcSpanExporterBuilder exporterBuilder = OtlpGrpcSpanExporter.builder() - .setEndpoint(endpoint) - .setTimeout(exporterRuntimeConfig.traces().timeout()); + private SpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfig exporterRuntimeConfig, String endpoint, + Vertx vertx) { + + if (exporterRuntimeConfig.traces().protocol().isPresent()) { + if (!exporterRuntimeConfig.traces().protocol().get().equals(HTTP_PROTOBUF)) { + throw new IllegalStateException("Only the GRPC Exporter is currently supported. " + + "Please check `quarkus.otel.exporter.otlp.traces.protocol` property"); + } + } + + boolean compressionEnabled = false; + if (exporterRuntimeConfig.traces().compression().isPresent()) { + compressionEnabled = (exporterRuntimeConfig.traces().compression().get() == CompressionType.GZIP); + } // FIXME TLS Support. Was not available before but will be available soon. // exporterRuntimeConfig.traces.certificate.ifPresent(exporterBuilder::setTrustedCertificates); // exporterRuntimeConfig.client.ifPresent(exporterBuilder::setClientTls); + Map headersMap = new HashMap<>(); + OtlpUserAgent.addUserAgentHeader(headersMap::put); if (exporterRuntimeConfig.traces().headers().isPresent()) { List headers = exporterRuntimeConfig.traces().headers().get(); if (!headers.isEmpty()) { @@ -95,22 +114,20 @@ private OtlpGrpcSpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfi String[] parts = header.split("=", 2); String key = parts[0].trim(); String value = parts[1].trim(); - exporterBuilder.addHeader(key, value); + headersMap.put(key, value); } } } - if (exporterRuntimeConfig.traces().compression().isPresent()) { - exporterBuilder.setCompression(exporterRuntimeConfig.traces().compression().get().getValue()); - } - - if (exporterRuntimeConfig.traces().protocol().isPresent()) { - if (!exporterRuntimeConfig.traces().protocol().get().equals(HTTP_PROTOBUF)) { - throw new IllegalStateException("Only the GRPC Exporter is currently supported. " + - "Please check `quarkus.otel.exporter.otlp.traces.protocol` property"); - } - } + return new VertxGrpcExporter( + "otlp", // use the same as OTel does + "span", // use the same as OTel does + MeterProvider::noop, + ExporterBuilderUtil.validateEndpoint(endpoint), + compressionEnabled, + exporterRuntimeConfig.traces().timeout(), + headersMap, + vertx); - return exporterBuilder.build(); } } diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java new file mode 100644 index 0000000000000..47c50654c823d --- /dev/null +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java @@ -0,0 +1,364 @@ +package io.quarkus.opentelemetry.runtime.exporter.otlp; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collection; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; + +import io.netty.handler.codec.http.QueryStringDecoder; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.ExporterMetrics; +import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.internal.ThrottlingLogger; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.net.SocketAddress; +import io.vertx.core.tracing.TracingPolicy; +import io.vertx.grpc.client.GrpcClient; +import io.vertx.grpc.client.GrpcClientRequest; +import io.vertx.grpc.client.GrpcClientResponse; +import io.vertx.grpc.common.GrpcStatus; +import io.vertx.grpc.common.ServiceName; + +final class VertxGrpcExporter implements SpanExporter { + + private static final String GRPC_SERVICE_NAME = "opentelemetry.proto.collector.trace.v1.TraceService"; + private static final String GRPC_METHOD_NAME = "Export"; + + private static final String GRPC_STATUS = "grpc-status"; + private static final String GRPC_MESSAGE = "grpc-message"; + + private static final Logger internalLogger = Logger.getLogger(VertxGrpcExporter.class.getName()); + + private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); // TODO: is there something in JBoss Logging we can use? + + // We only log unimplemented once since it's a configuration issue that won't be recovered. + private final AtomicBoolean loggedUnimplemented = new AtomicBoolean(); + private final AtomicBoolean isShutdown = new AtomicBoolean(); + private final String type; + private final ExporterMetrics exporterMetrics; + private final SocketAddress server; + private final boolean compressionEnabled; + private final Map headers; + + private final GrpcClient client; + + VertxGrpcExporter( + String exporterName, + String type, + Supplier meterProviderSupplier, + URI grpcBaseUri, boolean compressionEnabled, + Duration timeout, + Map headersMap, + Vertx vertx) { + this.type = type; + this.exporterMetrics = ExporterMetrics.createGrpcOkHttp(exporterName, type, meterProviderSupplier); + this.server = SocketAddress.inetSocketAddress(getPort(grpcBaseUri), grpcBaseUri.getHost()); + this.compressionEnabled = compressionEnabled; + this.headers = headersMap; + var httpClientOptions = new HttpClientOptions() + .setHttp2ClearTextUpgrade(false) // needed otherwise connections get closed immediately + .setReadIdleTimeout((int) timeout.getSeconds()) + .setTracingPolicy(TracingPolicy.IGNORE); // needed to avoid tracing the calls from this gRPC client + this.client = GrpcClient.client(vertx, httpClientOptions); + } + + private static int getPort(URI uri) { + int originalPort = uri.getPort(); + if (originalPort > -1) { + return originalPort; + } + + if ("https".equals(uri.getScheme().toLowerCase(Locale.ROOT))) { + return 443; + } + return 80; + } + + private CompletableResultCode export(TraceRequestMarshaler marshaler, int numItems) { + if (isShutdown.get()) { + return CompletableResultCode.ofFailure(); + } + + exporterMetrics.addSeen(numItems); + + var result = new CompletableResultCode(); + var onSuccessHandler = new ClientRequestOnSuccessHandler(headers, compressionEnabled, exporterMetrics, marshaler, + loggedUnimplemented, logger, type, numItems, result); + client.request(server) + .onSuccess(onSuccessHandler) + .onFailure(new Handler<>() { + @Override + public void handle(Throwable t) { + // TODO: is there a better way todo retry? + // TODO: should we only retry on a specific errors? + + client.request(server) + .onSuccess(onSuccessHandler) + .onFailure(new Handler<>() { + @Override + public void handle(Throwable event) { + failOnClientRequest(numItems, t, result); + } + }); + } + }); + + return result; + } + + private void failOnClientRequest(int numItems, Throwable t, CompletableResultCode result) { + exporterMetrics.addFailed(numItems); + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. The request could not be executed. Full error message: " + + t.getMessage()); + result.fail(); + } + + @Override + public CompletableResultCode export(Collection spans) { + TraceRequestMarshaler request = TraceRequestMarshaler.create(spans); + + return export(request, spans.size()); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + if (!isShutdown.compareAndSet(false, true)) { + logger.log(Level.INFO, "Calling shutdown() multiple times."); + return CompletableResultCode.ofSuccess(); + } + client.close(); + return CompletableResultCode.ofSuccess(); + } + + private static final class NonCopyingByteArrayOutputStream extends ByteArrayOutputStream { + + public NonCopyingByteArrayOutputStream(int size) { + super(size); + } + + @Override + public byte[] toByteArray() { + return buf; + } + } + + private static final class ClientRequestOnSuccessHandler implements Handler> { + + private final Map headers; + private final boolean compressionEnabled; + private final ExporterMetrics exporterMetrics; + + private final TraceRequestMarshaler marshaler; + private final AtomicBoolean loggedUnimplemented; + private final ThrottlingLogger logger; + private final String type; + private final int numItems; + private final CompletableResultCode result; + + public ClientRequestOnSuccessHandler(Map headers, + boolean compressionEnabled, + ExporterMetrics exporterMetrics, + TraceRequestMarshaler marshaler, + AtomicBoolean loggedUnimplemented, + ThrottlingLogger logger, + String type, + int numItems, + CompletableResultCode result) { + this.headers = headers; + this.compressionEnabled = compressionEnabled; + this.exporterMetrics = exporterMetrics; + this.marshaler = marshaler; + this.loggedUnimplemented = loggedUnimplemented; + this.logger = logger; + this.type = type; + this.numItems = numItems; + this.result = result; + } + + @Override + public void handle(GrpcClientRequest request) { + if (compressionEnabled) { + request.encoding("gzip"); + } + + // Set the service name and the method to call + request.serviceName(ServiceName.create(GRPC_SERVICE_NAME)); + request.methodName(GRPC_METHOD_NAME); + + if (!headers.isEmpty()) { + var vertxHeaders = request.headers(); + for (var entry : headers.entrySet()) { + vertxHeaders.set(entry.getKey(), entry.getValue()); + } + } + + try { + int messageSize = marshaler.getBinarySerializedSize(); + var baos = new NonCopyingByteArrayOutputStream(messageSize); + marshaler.writeBinaryTo(baos); + Buffer buffer = Buffer.buffer(messageSize); + buffer.appendBytes(baos.toByteArray()); + request.send(buffer).onSuccess(new Handler<>() { + @Override + public void handle(GrpcClientResponse response) { + GrpcStatus status = getStatus(response); + if (status == GrpcStatus.OK) { + exporterMetrics.addSuccess(numItems); + result.succeed(); + return; + } + String statusMessage = getStatusMessage(response); + if (statusMessage == null) { + // TODO: this needs investigation, when this happened, the spans actually got to the server, but for some reason no status code was present in the result + exporterMetrics.addSuccess(numItems); + result.succeed(); + return; + } + + logAppropriateWarning(status, statusMessage); + exporterMetrics.addFailed(numItems); + result.fail(); + } + + private void logAppropriateWarning(GrpcStatus status, + String statusMessage) { + if (status == GrpcStatus.UNIMPLEMENTED) { + if (loggedUnimplemented.compareAndSet(false, true)) { + logUnimplemented(internalLogger, type, statusMessage); + } + } else if (status == GrpcStatus.UNAVAILABLE) { + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. Server is UNAVAILABLE. " + + "Make sure your collector is running and reachable from this network. " + + "Full error message:" + + statusMessage); + } else { + if (status == null) { + logger.log( + Level.WARNING, + "Failed to export " + + type + + "s. Server responded with error message: " + + statusMessage); + } else { + logger.log( + Level.WARNING, + "Failed to export " + + type + + "s. Server responded with " + + status.code + + ". Error message: " + + statusMessage); + } + } + } + + private void logUnimplemented(Logger logger, String type, String fullErrorMessage) { + String envVar; + switch (type) { + case "span": + envVar = "OTEL_TRACES_EXPORTER"; + break; + case "metric": + envVar = "OTEL_METRICS_EXPORTER"; + break; + case "log": + envVar = "OTEL_LOGS_EXPORTER"; + break; + default: + throw new IllegalStateException( + "Unrecognized type, this is a programming bug in the OpenTelemetry SDK"); + } + + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. Server responded with UNIMPLEMENTED. " + + "This usually means that your collector is not configured with an otlp " + + "receiver in the \"pipelines\" section of the configuration. " + + "If export is not desired and you are using OpenTelemetry autoconfiguration or the javaagent, " + + "disable export by setting " + + envVar + + "=none. " + + "Full error message: " + + fullErrorMessage); + } + + private GrpcStatus getStatus(GrpcClientResponse response) { + // Status can either be in the headers or trailers depending on error + GrpcStatus result = response.status(); + if (result == null) { + String statusFromTrailer = response.trailers().get(GRPC_STATUS); + if (statusFromTrailer != null) { + result = GrpcStatus.valueOf(Integer.parseInt(statusFromTrailer)); + } + } + return result; + } + + private String getStatusMessage(GrpcClientResponse response) { + // Status message can either be in the headers or trailers depending on error + String result = response.statusMessage(); + if (result == null) { + result = response.trailers().get(GRPC_MESSAGE); + if (result != null) { + result = QueryStringDecoder.decodeComponent(result, StandardCharsets.UTF_8); + } + + } + return result; + } + + }).onFailure(new Handler<>() { + @Override + public void handle(Throwable t) { + exporterMetrics.addFailed(numItems); + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. The request could not be executed. Full error message: " + + t.getMessage()); + result.fail(); + } + }); + } catch (IOException e) { + exporterMetrics.addFailed(numItems); + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. Unable to serialize payload. Full error message: " + + e.getMessage()); + result.fail(); + } + } + } +} From 1602ee122009a4e7b28d24400fad48439328faf9 Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Mon, 10 Jul 2023 17:03:40 +0300 Subject: [PATCH 3/4] Remove OkHttp dependency from OpenTelemetry extension --- .../deployment/OpenTelemetryProcessor.java | 31 +++++++++++++++++-- extensions/opentelemetry/runtime/pom.xml | 8 +++++ ...tlp_internal_OtlpSpanExporterProvider.java | 9 ++++++ 3 files changed, 45 insertions(+), 3 deletions(-) create mode 100644 extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/graal/Target_io_opentelemetry_exporter_otlp_internal_OtlpSpanExporterProvider.java diff --git a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java index e307913e61bec..d7ff085e7d956 100644 --- a/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java +++ b/extensions/opentelemetry/deployment/src/main/java/io/quarkus/opentelemetry/deployment/OpenTelemetryProcessor.java @@ -1,5 +1,6 @@ package io.quarkus.opentelemetry.deployment; +import static io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem.SPI_ROOT; import static io.quarkus.opentelemetry.runtime.OpenTelemetryRecorder.OPEN_TELEMETRY_DRIVER; import static java.util.stream.Collectors.toList; @@ -17,6 +18,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.exporter.otlp.internal.OtlpSpanExporterProvider; import io.opentelemetry.instrumentation.annotations.SpanAttribute; import io.opentelemetry.instrumentation.annotations.WithSpan; import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider; @@ -40,10 +42,14 @@ import io.quarkus.deployment.annotations.Produce; import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.LaunchModeBuildItem; +import io.quarkus.deployment.builditem.RemovedResourceBuildItem; import io.quarkus.deployment.builditem.ShutdownContextBuildItem; import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; +import io.quarkus.deployment.builditem.nativeimage.RuntimeReinitializedClassBuildItem; import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem; +import io.quarkus.deployment.util.ServiceUtil; +import io.quarkus.maven.dependency.ArtifactKey; import io.quarkus.opentelemetry.runtime.OpenTelemetryProducer; import io.quarkus.opentelemetry.runtime.OpenTelemetryRecorder; import io.quarkus.opentelemetry.runtime.QuarkusContextStorage; @@ -73,11 +79,30 @@ AdditionalBeanBuildItem ensureProducerIsRetained() { } @BuildStep - void registerNativeImageResources(BuildProducer services) throws IOException { - services.produce(ServiceProviderBuildItem.allProvidersFromClassPath( - ConfigurableSpanExporterProvider.class.getName())); + void registerNativeImageResources(BuildProducer services, + BuildProducer removedResources, + BuildProducer runtimeReinitialized) throws IOException { + + List spanExporterProviders = ServiceUtil.classNamesNamedIn( + Thread.currentThread().getContextClassLoader(), + SPI_ROOT + ConfigurableSpanExporterProvider.class.getName()) + .stream() + .filter(p -> !OtlpSpanExporterProvider.class.getName().equals(p)).collect(toList()); // filter out OtlpSpanExporterProvider since it depends on OkHttp + if (!spanExporterProviders.isEmpty()) { + services.produce( + new ServiceProviderBuildItem(ConfigurableSpanExporterProvider.class.getName(), spanExporterProviders)); + } + // remove the service file that contains OtlpSpanExporterProvider + removedResources.produce(new RemovedResourceBuildItem( + ArtifactKey.fromString("io.opentelemetry:opentelemetry-exporter-otlp"), + Set.of("META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider"))); + + runtimeReinitialized.produce( + new RuntimeReinitializedClassBuildItem("io.opentelemetry.sdk.autoconfigure.TracerProviderConfiguration")); + services.produce(ServiceProviderBuildItem.allProvidersFromClassPath( ConfigurableSamplerProvider.class.getName())); + // The following are added but not officially supported, yet. services.produce(ServiceProviderBuildItem.allProvidersFromClassPath( AutoConfigurationCustomizerProvider.class.getName())); diff --git a/extensions/opentelemetry/runtime/pom.xml b/extensions/opentelemetry/runtime/pom.xml index 2b03eb5fae352..0e0dcf59af816 100644 --- a/extensions/opentelemetry/runtime/pom.xml +++ b/extensions/opentelemetry/runtime/pom.xml @@ -111,6 +111,10 @@ org.checkerframework checker-qual + + com.squareup.okhttp3 + okhttp + @@ -125,6 +129,10 @@ org.checkerframework checker-qual + + com.squareup.okhttp3 + okhttp + diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/graal/Target_io_opentelemetry_exporter_otlp_internal_OtlpSpanExporterProvider.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/graal/Target_io_opentelemetry_exporter_otlp_internal_OtlpSpanExporterProvider.java new file mode 100644 index 0000000000000..5ce28d2793845 --- /dev/null +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/graal/Target_io_opentelemetry_exporter_otlp_internal_OtlpSpanExporterProvider.java @@ -0,0 +1,9 @@ +package io.quarkus.opentelemetry.runtime.exporter.otlp.graal; + +import com.oracle.svm.core.annotate.Delete; +import com.oracle.svm.core.annotate.TargetClass; + +@TargetClass(className = "io.opentelemetry.exporter.otlp.internal.OtlpSpanExporterProvider") +@Delete +public final class Target_io_opentelemetry_exporter_otlp_internal_OtlpSpanExporterProvider { +} From bd6cfd63e367ed407c83b9fe95d1b46fcb14e59d Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Mon, 17 Jul 2023 09:35:51 +0300 Subject: [PATCH 4/4] Add integration test for Vert.x gRPC exporter Also add TLS support to the exporter --- bom/test/pom.xml | 9 + .../exporter/OtlpExporterTracesConfig.java | 46 +++-- .../runtime/exporter/otlp/OtlpRecorder.java | 73 +++++-- .../exporter/otlp/VertxGrpcExporter.java | 9 +- .../exporter/otlp/OtlpRecorderTest.java | 25 +++ .../opentelemetry-vertx-grpc-exporter/pom.xml | 177 +++++++++++++++++ .../vertx/grpc/exporter/HelloResource.java | 14 ++ .../src/main/resources/application.properties | 1 + .../grpc/exporter/AbstractExporterTest.java | 79 ++++++++ .../grpc/exporter/NoTLSNoCompressionTest.java | 10 + .../exporter/NoTLSWithCompressionTest.java | 11 ++ .../OtelCollectorLifecycleManager.java | 186 ++++++++++++++++++ .../vertx/grpc/exporter/Traces.java | 19 ++ .../exporter/WithTLSNoCompressionTest.java | 11 ++ .../exporter/WithTLSWithCompressionTest.java | 14 ++ .../src/test/resources/otel-config.yaml | 43 ++++ integration-tests/pom.xml | 1 + 17 files changed, 700 insertions(+), 28 deletions(-) create mode 100644 integration-tests/opentelemetry-vertx-grpc-exporter/pom.xml create mode 100644 integration-tests/opentelemetry-vertx-grpc-exporter/src/main/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/HelloResource.java create mode 100644 integration-tests/opentelemetry-vertx-grpc-exporter/src/main/resources/application.properties create mode 100644 integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/AbstractExporterTest.java create mode 100644 integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSNoCompressionTest.java create mode 100644 integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSWithCompressionTest.java create mode 100644 integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/OtelCollectorLifecycleManager.java create mode 100644 integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/Traces.java create mode 100644 integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSNoCompressionTest.java create mode 100644 integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSWithCompressionTest.java create mode 100644 integration-tests/opentelemetry-vertx-grpc-exporter/src/test/resources/otel-config.yaml diff --git a/bom/test/pom.xml b/bom/test/pom.xml index 2933c5fb998de..bda70c0af5435 100644 --- a/bom/test/pom.xml +++ b/bom/test/pom.xml @@ -21,6 +21,8 @@ 1.3.8 0.100.0 + + 0.20.0-alpha @@ -66,6 +68,13 @@ jaxb-api ${jaxb-api.version} + + + io.opentelemetry.proto + opentelemetry-proto + ${opentelemetry-proto.version} + + diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/runtime/exporter/OtlpExporterTracesConfig.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/runtime/exporter/OtlpExporterTracesConfig.java index 5cc4daa9c2bf5..95ccbd654753c 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/runtime/exporter/OtlpExporterTracesConfig.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/config/runtime/exporter/OtlpExporterTracesConfig.java @@ -31,21 +31,6 @@ public interface OtlpExporterTracesConfig { @WithDefault(DEFAULT_GRPC_BASE_URI) Optional legacyEndpoint(); - // /** - // * Sets the certificate chain to use for verifying servers when TLS is enabled. The {@code byte[]} - // * should contain an X.509 certificate collection in PEM format. If not set, TLS connections will - // * use the system default trusted certificates. - // */ - // @ConfigItem() - // public Optional certificate; - - // /** - // * Sets ths client key and the certificate chain to use for verifying client when TLS is enabled. - // * The key must be PKCS8, and both must be in PEM format. - // */ - // @ConfigItem() - // public Optional client; - /** * Key-value pairs to be used as headers associated with gRPC requests. * The format is similar to the {@code OTEL_EXPORTER_OTLP_HEADERS} environment variable, @@ -73,6 +58,37 @@ public interface OtlpExporterTracesConfig { @WithDefault(Protocol.HTTP_PROTOBUF) Optional protocol(); + /** + * Key/cert configuration in the PEM format. + */ + @WithName("key-cert") + KeyCert keyCert(); + + /** + * Trust configuration in the PEM format. + */ + @WithName("trust-cert") + TrustCert trustCert(); + + interface KeyCert { + /** + * Comma-separated list of the path to the key files (Pem format). + */ + Optional> keys(); + + /** + * Comma-separated list of the path to the certificate files (Pem format). + */ + Optional> certs(); + } + + interface TrustCert { + /** + * Comma-separated list of the trust certificate files (Pem format). + */ + Optional> certs(); + } + public static class Protocol { public static final String GRPC = "grpc"; public static final String HTTP_PROTOBUF = "http/protobuf"; diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java index ed3d6148177e0..7704736dcc051 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorder.java @@ -3,9 +3,11 @@ import static io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterRuntimeConfig.DEFAULT_GRPC_BASE_URI; import static io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterTracesConfig.Protocol.HTTP_PROTOBUF; +import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import jakarta.enterprise.inject.Any; import jakarta.enterprise.inject.spi.CDI; @@ -19,10 +21,15 @@ import io.quarkus.opentelemetry.runtime.config.runtime.OTelRuntimeConfig; import io.quarkus.opentelemetry.runtime.config.runtime.exporter.CompressionType; import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterRuntimeConfig; +import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterTracesConfig; import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.annotations.Recorder; import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.net.KeyCertOptions; +import io.vertx.core.net.PemKeyCertOptions; +import io.vertx.core.net.PemTrustOptions; @Recorder public class OtlpRecorder { @@ -86,26 +93,23 @@ public void installBatchSpanProcessorForOtlp( private SpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfig exporterRuntimeConfig, String endpoint, Vertx vertx) { - if (exporterRuntimeConfig.traces().protocol().isPresent()) { - if (!exporterRuntimeConfig.traces().protocol().get().equals(HTTP_PROTOBUF)) { + OtlpExporterTracesConfig tracesConfig = exporterRuntimeConfig.traces(); + if (tracesConfig.protocol().isPresent()) { + if (!tracesConfig.protocol().get().equals(HTTP_PROTOBUF)) { throw new IllegalStateException("Only the GRPC Exporter is currently supported. " + "Please check `quarkus.otel.exporter.otlp.traces.protocol` property"); } } boolean compressionEnabled = false; - if (exporterRuntimeConfig.traces().compression().isPresent()) { - compressionEnabled = (exporterRuntimeConfig.traces().compression().get() == CompressionType.GZIP); + if (tracesConfig.compression().isPresent()) { + compressionEnabled = (tracesConfig.compression().get() == CompressionType.GZIP); } - // FIXME TLS Support. Was not available before but will be available soon. - // exporterRuntimeConfig.traces.certificate.ifPresent(exporterBuilder::setTrustedCertificates); - // exporterRuntimeConfig.client.ifPresent(exporterBuilder::setClientTls); - Map headersMap = new HashMap<>(); OtlpUserAgent.addUserAgentHeader(headersMap::put); - if (exporterRuntimeConfig.traces().headers().isPresent()) { - List headers = exporterRuntimeConfig.traces().headers().get(); + if (tracesConfig.headers().isPresent()) { + List headers = tracesConfig.headers().get(); if (!headers.isEmpty()) { for (String header : headers) { if (header.isEmpty()) { @@ -119,14 +123,59 @@ private SpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfig export } } + URI grpcBaseUri = ExporterBuilderUtil.validateEndpoint(endpoint); return new VertxGrpcExporter( "otlp", // use the same as OTel does "span", // use the same as OTel does MeterProvider::noop, - ExporterBuilderUtil.validateEndpoint(endpoint), + grpcBaseUri, compressionEnabled, - exporterRuntimeConfig.traces().timeout(), + tracesConfig.timeout(), headersMap, + new Consumer<>() { + @Override + public void accept(HttpClientOptions options) { + configureTLS(options); + } + + private void configureTLS(HttpClientOptions options) { + // TODO: this can reuse existing stuff when https://github.com/quarkusio/quarkus/pull/33228 is in + options.setKeyCertOptions(toPemKeyCertOptions(tracesConfig)); + options.setPemTrustOptions(toPemTrustOptions(tracesConfig)); + + if (VertxGrpcExporter.isHttps(grpcBaseUri)) { + options.setSsl(true); + options.setUseAlpn(true); + } + } + + private KeyCertOptions toPemKeyCertOptions(OtlpExporterTracesConfig configuration) { + PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions(); + OtlpExporterTracesConfig.KeyCert keyCert = configuration.keyCert(); + if (keyCert.certs().isPresent()) { + for (String cert : keyCert.certs().get()) { + pemKeyCertOptions.addCertPath(cert); + } + } + if (keyCert.keys().isPresent()) { + for (String cert : keyCert.keys().get()) { + pemKeyCertOptions.addKeyPath(cert); + } + } + return pemKeyCertOptions; + } + + private PemTrustOptions toPemTrustOptions(OtlpExporterTracesConfig configuration) { + PemTrustOptions pemTrustOptions = new PemTrustOptions(); + OtlpExporterTracesConfig.TrustCert trustCert = configuration.trustCert(); + if (trustCert.certs().isPresent()) { + for (String cert : trustCert.certs().get()) { + pemTrustOptions.addCertPath(cert); + } + } + return pemTrustOptions; + } + }, vertx); } diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java index 47c50654c823d..6d4a4e54cea87 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java @@ -9,6 +9,7 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -63,6 +64,7 @@ final class VertxGrpcExporter implements SpanExporter { URI grpcBaseUri, boolean compressionEnabled, Duration timeout, Map headersMap, + Consumer clientOptionsCustomizer, Vertx vertx) { this.type = type; this.exporterMetrics = ExporterMetrics.createGrpcOkHttp(exporterName, type, meterProviderSupplier); @@ -73,6 +75,7 @@ final class VertxGrpcExporter implements SpanExporter { .setHttp2ClearTextUpgrade(false) // needed otherwise connections get closed immediately .setReadIdleTimeout((int) timeout.getSeconds()) .setTracingPolicy(TracingPolicy.IGNORE); // needed to avoid tracing the calls from this gRPC client + clientOptionsCustomizer.accept(httpClientOptions); this.client = GrpcClient.client(vertx, httpClientOptions); } @@ -82,12 +85,16 @@ private static int getPort(URI uri) { return originalPort; } - if ("https".equals(uri.getScheme().toLowerCase(Locale.ROOT))) { + if (isHttps(uri)) { return 443; } return 80; } + static boolean isHttps(URI uri) { + return "https".equals(uri.getScheme().toLowerCase(Locale.ROOT)); + } + private CompletableResultCode export(TraceRequestMarshaler marshaler, int numItems) { if (isShutdown.get()) { return CompletableResultCode.ofFailure(); diff --git a/extensions/opentelemetry/runtime/src/test/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorderTest.java b/extensions/opentelemetry/runtime/src/test/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorderTest.java index 8fdb47c4c8e96..643667e8a3a1b 100644 --- a/extensions/opentelemetry/runtime/src/test/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorderTest.java +++ b/extensions/opentelemetry/runtime/src/test/java/io/quarkus/opentelemetry/runtime/exporter/otlp/OtlpRecorderTest.java @@ -109,6 +109,31 @@ public Duration timeout() { public Optional protocol() { return Optional.empty(); } + + @Override + public KeyCert keyCert() { + return new KeyCert() { + @Override + public Optional> keys() { + return Optional.empty(); + } + + @Override + public Optional> certs() { + return Optional.empty(); + } + }; + } + + @Override + public TrustCert trustCert() { + return new TrustCert() { + @Override + public Optional> certs() { + return Optional.empty(); + } + }; + } }; } }; diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/pom.xml b/integration-tests/opentelemetry-vertx-grpc-exporter/pom.xml new file mode 100644 index 0000000000000..0635e60e990d6 --- /dev/null +++ b/integration-tests/opentelemetry-vertx-grpc-exporter/pom.xml @@ -0,0 +1,177 @@ + + + 4.0.0 + + + io.quarkus + quarkus-integration-tests-parent + 999-SNAPSHOT + + + quarkus-integration-test-opentelemetry-vertx-grpc-exporter + Quarkus - Integration Tests - OpenTelemetry Vert.x gRPC exporter + + + + io.quarkus + quarkus-opentelemetry + + + io.quarkus + quarkus-resteasy-reactive-jackson + + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + org.testcontainers + testcontainers + test + + + org.assertj + assertj-core + test + + + + org.bouncycastle + bcpkix-jdk18on + test + + + + io.vertx + vertx-grpc-server + test + + + org.codehaus.mojo + animal-sniffer-annotations + + + com.google.code.findbugs + jsr305 + + + org.checkerframework + checker-qual + + + javax.annotation + javax.annotation-api + + + com.google.android + annotations + + + + + + io.opentelemetry.proto + opentelemetry-proto + test + + + + + io.quarkus + quarkus-opentelemetry-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-reactive-jackson-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + maven-surefire-plugin + + true + + + + maven-failsafe-plugin + + true + + + + io.quarkus + quarkus-maven-plugin + + + + build + + + + + + + + + + test-otel-grpc-exporter + + + test-containers + + + + + + maven-surefire-plugin + + false + + + + maven-failsafe-plugin + + false + + + + + + + + diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/main/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/HelloResource.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/main/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/HelloResource.java new file mode 100644 index 0000000000000..7e2021e2ed5fe --- /dev/null +++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/main/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/HelloResource.java @@ -0,0 +1,14 @@ +package io.quarkus.it.opentelemetry.vertx.grpc.exporter; + +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +@Path("hello") +public class HelloResource { + + @GET + public String get() { + return "get"; + } + +} diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/main/resources/application.properties b/integration-tests/opentelemetry-vertx-grpc-exporter/src/main/resources/application.properties new file mode 100644 index 0000000000000..edb424258ea0a --- /dev/null +++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/main/resources/application.properties @@ -0,0 +1 @@ +quarkus.application.name=integration test diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/AbstractExporterTest.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/AbstractExporterTest.java new file mode 100644 index 0000000000000..1b8805686c2a1 --- /dev/null +++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/AbstractExporterTest.java @@ -0,0 +1,79 @@ +package io.quarkus.it.opentelemetry.vertx.grpc.exporter; + +import static io.restassured.RestAssured.when; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.ScopeSpans; +import io.opentelemetry.proto.trace.v1.Span; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; + +public abstract class AbstractExporterTest { + + Traces traces; + + @BeforeEach + @AfterEach + void setUp() { + traces.reset(); + } + + @Test + void test() { + verifyHttpResponse(); + verifyTraces(); + } + + private void verifyHttpResponse() { + when() + .get("/hello") + .then() + .statusCode(200); + } + + private void verifyTraces() { + await() + .atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> assertThat(traces.getTraceRequests()).hasSize(1)); + + ExportTraceServiceRequest request = traces.getTraceRequests().get(0); + assertThat(request.getResourceSpansCount()).isEqualTo(1); + + ResourceSpans resourceSpans = request.getResourceSpans(0); + assertThat(resourceSpans.getResource().getAttributesList()) + .contains( + KeyValue.newBuilder() + .setKey(ResourceAttributes.SERVICE_NAME.getKey()) + .setValue(AnyValue.newBuilder() + .setStringValue("integration test").build()) + .build()) + .contains( + KeyValue.newBuilder() + .setKey(ResourceAttributes.WEBENGINE_NAME.getKey()) + .setValue(AnyValue.newBuilder() + .setStringValue("Quarkus").build()) + .build()); + assertThat(resourceSpans.getScopeSpansCount()).isEqualTo(1); + ScopeSpans scopeSpans = resourceSpans.getScopeSpans(0); + assertThat(scopeSpans.getSpansCount()).isEqualTo(1); + Span span = scopeSpans.getSpans(0); + assertThat(span.getName()).isEqualTo("GET /hello"); + assertThat(span.getAttributesList()) + .contains( + KeyValue.newBuilder() + .setKey("http.method") + .setValue(AnyValue.newBuilder() + .setStringValue("GET").build()) + .build()); + } +} diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSNoCompressionTest.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSNoCompressionTest.java new file mode 100644 index 0000000000000..47f6d31a98d4e --- /dev/null +++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSNoCompressionTest.java @@ -0,0 +1,10 @@ +package io.quarkus.it.opentelemetry.vertx.grpc.exporter; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +@QuarkusTestResource(value = OtelCollectorLifecycleManager.class, restrictToAnnotatedClass = true) +public class NoTLSNoCompressionTest extends AbstractExporterTest { + +} diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSWithCompressionTest.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSWithCompressionTest.java new file mode 100644 index 0000000000000..65f67e19d9f6e --- /dev/null +++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/NoTLSWithCompressionTest.java @@ -0,0 +1,11 @@ +package io.quarkus.it.opentelemetry.vertx.grpc.exporter; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.common.ResourceArg; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +@QuarkusTestResource(value = OtelCollectorLifecycleManager.class, initArgs = @ResourceArg(name = "enableCompression", value = "true"), restrictToAnnotatedClass = true) +public class NoTLSWithCompressionTest extends AbstractExporterTest { + +} diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/OtelCollectorLifecycleManager.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/OtelCollectorLifecycleManager.java new file mode 100644 index 0000000000000..fdcf042e8d973 --- /dev/null +++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/OtelCollectorLifecycleManager.java @@ -0,0 +1,186 @@ +package io.quarkus.it.opentelemetry.vertx.grpc.exporter; + +import static org.testcontainers.Testcontainers.exposeHostPorts; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.PullPolicy; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import com.google.protobuf.InvalidProtocolBufferException; + +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.net.SelfSignedCertificate; +import io.vertx.grpc.common.GrpcStatus; +import io.vertx.grpc.common.ServiceName; +import io.vertx.grpc.server.GrpcServer; + +public class OtelCollectorLifecycleManager implements QuarkusTestResourceLifecycleManager { + + private static final String COLLECTOR_IMAGE = "ghcr.io/open-telemetry/opentelemetry-java/otel-collector"; + private static final Integer COLLECTOR_OTLP_GRPC_PORT = 4317; + private static final Integer COLLECTOR_OTLP_HTTP_PORT = 4318; + private static final Integer COLLECTOR_OTLP_GRPC_MTLS_PORT = 5317; + private static final Integer COLLECTOR_OTLP_HTTP_MTLS_PORT = 5318; + private static final Integer COLLECTOR_HEALTH_CHECK_PORT = 13133; + private static final ServiceName TRACE_SERVICE_NAME = ServiceName + .create("opentelemetry.proto.collector.trace.v1.TraceService"); + private static final String TRACE_METHOD_NAME = "Export"; + + private SelfSignedCertificate serverTls; + private SelfSignedCertificate clientTlS; + private boolean enableTLS = false; + private boolean enableCompression = false; + private Vertx vertx; + + private HttpServer server; + + private GenericContainer collector; + private Traces collectedTraces; + + @Override + public void init(Map initArgs) { + var enableTLSStr = initArgs.get("enableTLS"); + if (enableTLSStr != null && !enableTLSStr.isEmpty()) { + enableTLS = Boolean.parseBoolean(enableTLSStr); + } + + var enableCompressionStr = initArgs.get("enableCompression"); + if (enableCompressionStr != null && !enableCompressionStr.isEmpty()) { + enableCompression = Boolean.parseBoolean(enableCompressionStr); + } + } + + @Override + public Map start() { + setupVertxGrpcServer(); + int vertxGrpcServerPort = server.actualPort(); + + // Expose the port the in-process OTLP gRPC server will run on before the collector is + // initialized so the collector can connect to it. + exposeHostPorts(vertxGrpcServerPort); + + serverTls = SelfSignedCertificate.create(); + clientTlS = SelfSignedCertificate.create(); + + collector = new GenericContainer<>(DockerImageName.parse(COLLECTOR_IMAGE)) + .withImagePullPolicy(PullPolicy.alwaysPull()) + .withEnv("LOGGING_EXPORTER_VERBOSITY_LEVEL", "normal") + .withCopyFileToContainer( + MountableFile.forHostPath(serverTls.certificatePath(), 0555), + "/server.cert") + .withCopyFileToContainer( + MountableFile.forHostPath(serverTls.privateKeyPath(), 0555), "/server.key") + .withCopyFileToContainer( + MountableFile.forHostPath(clientTlS.certificatePath(), 0555), + "/client.cert") + .withEnv( + "OTLP_EXPORTER_ENDPOINT", "host.testcontainers.internal:" + vertxGrpcServerPort) + .withEnv("MTLS_CLIENT_CERTIFICATE", "/client.cert") + .withEnv("MTLS_SERVER_CERTIFICATE", "/server.cert") + .withEnv("MTLS_SERVER_KEY", "/server.key") + .withClasspathResourceMapping( + "otel-config.yaml", "/otel-config.yaml", BindMode.READ_ONLY) + .withCommand("--config", "/otel-config.yaml") + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("otel-collector"))) + .withExposedPorts( + COLLECTOR_OTLP_GRPC_PORT, + COLLECTOR_OTLP_HTTP_PORT, + COLLECTOR_OTLP_GRPC_MTLS_PORT, + COLLECTOR_OTLP_HTTP_MTLS_PORT, + COLLECTOR_HEALTH_CHECK_PORT) + .waitingFor(Wait.forHttp("/").forPort(COLLECTOR_HEALTH_CHECK_PORT)); + collector.start(); + + Map result = new HashMap<>(); + + if (enableTLS) { + result.put("quarkus.otel.exporter.otlp.traces.endpoint", + "https://" + collector.getHost() + ":" + collector.getMappedPort(COLLECTOR_OTLP_GRPC_MTLS_PORT)); + result.put("quarkus.otel.exporter.otlp.traces.trust-cert.certs", serverTls.certificatePath()); + result.put("quarkus.otel.exporter.otlp.traces.key-cert.certs", clientTlS.certificatePath()); + result.put("quarkus.otel.exporter.otlp.traces.key-cert.keys", clientTlS.privateKeyPath()); + } else { + result.put("quarkus.otel.exporter.otlp.traces.endpoint", + "http://" + collector.getHost() + ":" + collector.getMappedPort(COLLECTOR_OTLP_GRPC_PORT)); + } + + if (enableCompression) { + result.put("quarkus.otel.exporter.otlp.traces.compression", "gzip"); + } + + return result; + } + + @Override + public void inject(TestInjector testInjector) { + testInjector.injectIntoFields(collectedTraces, f -> f.getType().equals(Traces.class)); + } + + private void setupVertxGrpcServer() { + vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(1).setEventLoopPoolSize(1)); + GrpcServer grpcServer = GrpcServer.server(vertx); + collectedTraces = new Traces(); + grpcServer.callHandler(request -> { + + if (request.serviceName().equals(TRACE_SERVICE_NAME) && request.methodName().equals(TRACE_METHOD_NAME)) { + + request.handler(message -> { + try { + collectedTraces.getTraceRequests().add(ExportTraceServiceRequest.parseFrom(message.getBytes())); + request.response().end(Buffer.buffer(ExportTraceServiceResponse.getDefaultInstance().toByteArray())); + } catch (InvalidProtocolBufferException e) { + request.response() + .status(GrpcStatus.INVALID_ARGUMENT) + .end(); + } + }); + } else { + request.response() + .status(GrpcStatus.NOT_FOUND) + .end(); + } + }); + server = vertx.createHttpServer(new HttpServerOptions().setPort(0)); + try { + server.requestHandler(grpcServer).listen().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void stop() { + if (server != null) { + server.close().andThen(ar -> { + if (vertx != null) { + vertx.close(); + } + }); + } + if (serverTls != null) { + serverTls.delete(); + } + if (clientTlS != null) { + clientTlS.delete(); + } + if (collector != null) { + collector.stop(); + } + } +} diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/Traces.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/Traces.java new file mode 100644 index 0000000000000..30e46df57dbc8 --- /dev/null +++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/Traces.java @@ -0,0 +1,19 @@ +package io.quarkus.it.opentelemetry.vertx.grpc.exporter; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; + +public final class Traces { + + private final List traceRequests = new CopyOnWriteArrayList<>(); + + public List getTraceRequests() { + return traceRequests; + } + + public void reset() { + traceRequests.clear(); + } +} diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSNoCompressionTest.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSNoCompressionTest.java new file mode 100644 index 0000000000000..23d8aea2581b4 --- /dev/null +++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSNoCompressionTest.java @@ -0,0 +1,11 @@ +package io.quarkus.it.opentelemetry.vertx.grpc.exporter; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.common.ResourceArg; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +@QuarkusTestResource(value = OtelCollectorLifecycleManager.class, initArgs = @ResourceArg(name = "enableTLS", value = "true"), restrictToAnnotatedClass = true) +public class WithTLSNoCompressionTest extends AbstractExporterTest { + +} diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSWithCompressionTest.java b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSWithCompressionTest.java new file mode 100644 index 0000000000000..42b958965d3c6 --- /dev/null +++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/java/io/quarkus/it/opentelemetry/vertx/grpc/exporter/WithTLSWithCompressionTest.java @@ -0,0 +1,14 @@ +package io.quarkus.it.opentelemetry.vertx.grpc.exporter; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.common.ResourceArg; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +@QuarkusTestResource(value = OtelCollectorLifecycleManager.class, initArgs = { + @ResourceArg(name = "enableTLS", value = "true"), + @ResourceArg(name = "enableCompression", value = "true") +}, restrictToAnnotatedClass = true) +public class WithTLSWithCompressionTest extends AbstractExporterTest { + +} diff --git a/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/resources/otel-config.yaml b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/resources/otel-config.yaml new file mode 100644 index 0000000000000..24e1239387835 --- /dev/null +++ b/integration-tests/opentelemetry-vertx-grpc-exporter/src/test/resources/otel-config.yaml @@ -0,0 +1,43 @@ +extensions: + health_check: {} +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 +# http: +# endpoint: 0.0.0.0:4318 + otlp/mtls: + protocols: + grpc: + endpoint: 0.0.0.0:5317 + tls: + client_ca_file: $MTLS_CLIENT_CERTIFICATE + cert_file: $MTLS_SERVER_CERTIFICATE + key_file: $MTLS_SERVER_KEY +# http: +# endpoint: 0.0.0.0:5318 +# tls: +# client_ca_file: $MTLS_CLIENT_CERTIFICATE +# cert_file: $MTLS_SERVER_CERTIFICATE +# key_file: $MTLS_SERVER_KEY +exporters: + logging: + verbosity: $LOGGING_EXPORTER_VERBOSITY_LEVEL + otlp: + endpoint: $OTLP_EXPORTER_ENDPOINT + tls: + insecure: true + compression: none +service: + extensions: [health_check] + pipelines: +# metrics: +# receivers: [otlp, otlp/mtls] +# exporters: [logging, otlp] + traces: + receivers: [otlp, otlp/mtls] + exporters: [logging, otlp] +# logs: +# receivers: [otlp, otlp/mtls] +# exporters: [logging, otlp] diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 7f2e5546f937f..0718dc18ff9c8 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -351,6 +351,7 @@ opentelemetry-vertx opentelemetry-reactive opentelemetry-grpc + opentelemetry-vertx-grpc-exporter opentelemetry-reactive-messaging logging-json jaxb