From 63bec0727e8a3217920202d7e6a7b43ee52aaa2a Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Wed, 6 Oct 2021 16:26:11 +0900 Subject: [PATCH] Allow OTLP gRPC exporters to work without grpc-java using okhttp directly. (#3684) * Allow OTLP HTTP exporter to also export in gRPC format. * detect classpath * Revert HTTP exporter * Apply to metrics / trace * Most * Fix * Integration tests * Clean * Fix log message * Assume validated --- dependencyManagement/build.gradle.kts | 4 +- .../internal/grpc/DefaultGrpcExporter.java | 142 +++++++++++ .../grpc/DefaultGrpcExporterBuilder.java | 149 +++++++++++ .../otlp/internal/grpc/GrpcExporter.java | 43 ++++ .../internal/grpc/GrpcExporterBuilder.java | 30 +++ .../otlp/internal/grpc/GrpcExporterUtil.java | 50 ++++ .../otlp/internal/grpc/GrpcRequestBody.java | 81 ++++++ .../internal/grpc/MarshalerServiceStub.java | 28 +++ .../internal/grpc/OkHttpGrpcExporter.java | 234 ++++++++++++++++++ .../grpc/OkHttpGrpcExporterBuilder.java | 137 ++++++++++ exporters/otlp/logs/build.gradle.kts | 17 +- .../otlp/logs/MarshalerLogsServiceGrpc.java | 20 +- .../otlp/logs/OtlpGrpcLogExporter.java | 161 ++---------- .../otlp/logs/OtlpGrpcLogExporterBuilder.java | 104 +++----- .../otlp/logs/OtlpGrpcLogsExporterTest.java | 10 +- .../exporter/otlp/logs/ExportTest.java | 7 + .../exporter/otlp/logs/ExportTest.java | 7 + .../exporter/otlp/logs/ExportTest.java | 7 + .../otlp/logs/OkHttpOnlyExportTest.java | 158 ++++++++++++ exporters/otlp/metrics/build.gradle.kts | 19 +- .../metrics/MarshalerMetricsServiceGrpc.java | 21 +- .../otlp/metrics/OtlpGrpcMetricExporter.java | 124 ++-------- .../OtlpGrpcMetricExporterBuilder.java | 102 +++----- .../metrics/OtlpGrpcMetricExporterTest.java | 10 +- .../exporter/otlp/metrics/ExportTest.java | 7 + .../exporter/otlp/metrics/ExportTest.java | 7 + .../exporter/otlp/metrics/ExportTest.java | 7 + .../otlp/metrics/OkHttpOnlyExportTest.java | 164 ++++++++++++ exporters/otlp/trace/build.gradle.kts | 21 +- .../otlp/trace/MarshalerTraceServiceGrpc.java | 7 +- .../otlp/trace/OtlpGrpcSpanExporter.java | 165 ++---------- .../trace/OtlpGrpcSpanExporterBuilder.java | 102 +++----- .../otlp/trace/OtlpGrpcSpanExporterTest.java | 10 +- .../exporter/otlp/trace/ExportTest.java | 7 + .../exporter/otlp/trace/ExportTest.java | 7 + .../exporter/otlp/trace/ExportTest.java | 7 + .../otlp/trace/OkHttpOnlyExportTest.java | 151 +++++++++++ integration-tests/build.gradle.kts | 74 ++++-- .../JaegerExporterIntegrationTest.java | 2 +- .../GrpcJavaOtlpIntegrationTest.java | 18 ++ .../OtlpExporterIntegrationTest.java | 137 +++++----- .../resources/otel-config.yaml | 0 .../NoGrpcJavaOtlpIntegrationTest.java | 19 ++ .../SpanExporterConfigurationTest.java | 2 +- .../sdk/autoconfigure/OtlpGrpcConfigTest.java | 16 +- 45 files changed, 1878 insertions(+), 717 deletions(-) create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporter.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporterBuilder.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporter.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporterBuilder.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporterUtil.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcRequestBody.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/MarshalerServiceStub.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/OkHttpGrpcExporter.java create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/OkHttpGrpcExporterBuilder.java create mode 100644 exporters/otlp/logs/src/testOkHttpOnly/java/io/opentelemetry/exporter/otlp/logs/OkHttpOnlyExportTest.java create mode 100644 exporters/otlp/metrics/src/testOkHttpOnly/java/io/opentelemetry/exporter/otlp/metrics/OkHttpOnlyExportTest.java create mode 100644 exporters/otlp/trace/src/testOkhttpOnly/java/io/opentelemetry/exporter/otlp/trace/OkHttpOnlyExportTest.java rename integration-tests/src/{test/java/io/opentelemetry => testJaeger/java/io/opentelemetry/integrationtest}/JaegerExporterIntegrationTest.java (99%) create mode 100644 integration-tests/src/testOtlp/java/io/opentelemetry/integrationtest/GrpcJavaOtlpIntegrationTest.java rename integration-tests/src/{test/java/io/opentelemetry => testOtlpCommon/java/io/opentelemetry/integrationtest}/OtlpExporterIntegrationTest.java (83%) rename integration-tests/src/{test => testOtlpCommon}/resources/otel-config.yaml (100%) create mode 100644 integration-tests/src/testOtlpNoGrpcJava/java/io/opentelemetry/integrationtest/NoGrpcJavaOtlpIntegrationTest.java diff --git a/dependencyManagement/build.gradle.kts b/dependencyManagement/build.gradle.kts index f9544ccf8c0..90ce587bf37 100644 --- a/dependencyManagement/build.gradle.kts +++ b/dependencyManagement/build.gradle.kts @@ -75,8 +75,8 @@ val DEPENDENCIES = listOf( // using old version of okhttp to avoid pulling in kotlin stdlib // not using (old) okhttp bom because that is pulling in old guava version // and overriding the guava bom - "com.squareup.okhttp3:okhttp:3.12.13", - "com.squareup.okhttp3:okhttp-tls:3.12.13", + "com.squareup.okhttp3:okhttp:3.14.9", + "com.squareup.okhttp3:okhttp-tls:3.14.9", "com.sun.net.httpserver:http:20070405", "com.tngtech.archunit:archunit-junit5:0.21.0", "com.uber.nullaway:nullaway:0.9.2", diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporter.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporter.java new file mode 100644 index 00000000000..82598408ba3 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporter.java @@ -0,0 +1,142 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Codec; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.BoundLongCounter; +import io.opentelemetry.api.metrics.GlobalMeterProvider; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.internal.ThrottlingLogger; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A {@link GrpcExporter} which uses the standard grpc-java library. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class DefaultGrpcExporter implements GrpcExporter { + + private static final Logger internalLogger = + Logger.getLogger(DefaultGrpcExporter.class.getName()); + + private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); + + private final String type; + private final ManagedChannel managedChannel; + private final MarshalerServiceStub stub; + private final long timeoutNanos; + + private final BoundLongCounter seen; + private final BoundLongCounter success; + private final BoundLongCounter failed; + + /** Creates a new {@link DefaultGrpcExporter}. */ + DefaultGrpcExporter( + String type, + ManagedChannel channel, + MarshalerServiceStub stub, + long timeoutNanos, + boolean compressionEnabled) { + this.type = type; + Meter meter = GlobalMeterProvider.get().get("io.opentelemetry.exporters.otlp-grpc"); + Attributes attributes = Attributes.builder().put("type", type).build(); + seen = meter.counterBuilder("otlp.exporter.seen").build().bind(attributes); + LongCounter exported = meter.counterBuilder("otlp.exported.exported").build(); + success = exported.bind(attributes.toBuilder().put("success", true).build()); + failed = exported.bind(attributes.toBuilder().put("success", false).build()); + + this.managedChannel = channel; + this.timeoutNanos = timeoutNanos; + Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE; + this.stub = stub.withCompression(codec.getMessageEncoding()); + } + + @Override + public CompletableResultCode export(T exportRequest, int numItems) { + seen.add(numItems); + + CompletableResultCode result = new CompletableResultCode(); + + MarshalerServiceStub stub = this.stub; + if (timeoutNanos > 0) { + stub = stub.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS); + } + Futures.addCallback( + stub.export(exportRequest), + new FutureCallback() { + @Override + public void onSuccess(@Nullable Object unused) { + success.add(numItems); + result.succeed(); + } + + @Override + public void onFailure(Throwable t) { + failed.add(numItems); + Status status = Status.fromThrowable(t); + switch (status.getCode()) { + case UNIMPLEMENTED: + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. Server responded with UNIMPLEMENTED. " + + "This usually means that your collector is not configured with an otlp " + + "receiver in the \"pipelines\" section of the configuration. " + + "Full error message: " + + t.getMessage()); + break; + case UNAVAILABLE: + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. Server is UNAVAILABLE. " + + "Make sure your collector is running and reachable from this network. " + + "Full error message:" + + t.getMessage()); + break; + default: + logger.log( + Level.WARNING, + "Failed to export " + type + "s. Error message: " + t.getMessage()); + break; + } + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "Failed to export " + type + "s. Details follow: " + t); + } + result.fail(); + } + }, + MoreExecutors.directExecutor()); + + return result; + } + + @Override + public CompletableResultCode shutdown() { + if (managedChannel.isTerminated()) { + return CompletableResultCode.ofSuccess(); + } + seen.unbind(); + success.unbind(); + failed.unbind(); + return ManagedChannelUtil.shutdownChannel(managedChannel); + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporterBuilder.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporterBuilder.java new file mode 100644 index 00000000000..925d5a2f7c5 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporterBuilder.java @@ -0,0 +1,149 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; + +import io.grpc.Codec; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.stub.MetadataUtils; +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.annotation.Nullable; +import javax.net.ssl.SSLException; + +/** + * A builder for {@link DefaultGrpcExporter}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class DefaultGrpcExporterBuilder + implements GrpcExporterBuilder { + + private final String type; + private final Function> stubFactory; + + @Nullable private ManagedChannel channel; + private long timeoutNanos; + private URI endpoint; + private boolean compressionEnabled = false; + @Nullable private Metadata metadata; + @Nullable private byte[] trustedCertificatesPem; + + /** Creates a new {@link DefaultGrpcExporterBuilder}. */ + DefaultGrpcExporterBuilder( + String type, + Function> stubFactory, + long defaultTimeoutSecs, + URI defaultEndpoint) { + this.type = type; + this.stubFactory = stubFactory; + timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs); + endpoint = defaultEndpoint; + } + + @Override + public DefaultGrpcExporterBuilder setChannel(ManagedChannel channel) { + this.channel = channel; + return this; + } + + @Override + public DefaultGrpcExporterBuilder setTimeout(long timeout, TimeUnit unit) { + timeoutNanos = unit.toNanos(timeout); + return this; + } + + @Override + public DefaultGrpcExporterBuilder setTimeout(Duration timeout) { + return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + } + + @Override + public DefaultGrpcExporterBuilder setEndpoint(String endpoint) { + URI uri; + try { + uri = new URI(endpoint); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e); + } + + if (uri.getScheme() == null + || (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) { + throw new IllegalArgumentException( + "Invalid endpoint, must start with http:// or https://: " + uri); + } + + this.endpoint = uri; + return this; + } + + @Override + public DefaultGrpcExporterBuilder setCompression(String compressionMethod) { + this.compressionEnabled = true; + return this; + } + + @Override + public DefaultGrpcExporterBuilder setTrustedCertificates(byte[] trustedCertificatesPem) { + this.trustedCertificatesPem = trustedCertificatesPem; + return this; + } + + @Override + public DefaultGrpcExporterBuilder addHeader(String key, String value) { + if (metadata == null) { + metadata = new Metadata(); + } + metadata.put(Metadata.Key.of(key, ASCII_STRING_MARSHALLER), value); + return this; + } + + @Override + public GrpcExporter build() { + ManagedChannel channel = this.channel; + if (channel == null) { + final ManagedChannelBuilder managedChannelBuilder = + ManagedChannelBuilder.forTarget(endpoint.getAuthority()); + + if (endpoint.getScheme().equals("https")) { + managedChannelBuilder.useTransportSecurity(); + } else { + managedChannelBuilder.usePlaintext(); + } + + if (metadata != null) { + managedChannelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata)); + } + + if (trustedCertificatesPem != null) { + try { + ManagedChannelUtil.setTrustedCertificatesPem( + managedChannelBuilder, trustedCertificatesPem); + } catch (SSLException e) { + throw new IllegalStateException( + "Could not set trusted certificates for gRPC TLS connection, are they valid " + + "X.509 in PEM format?", + e); + } + } + + channel = managedChannelBuilder.build(); + } + + Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE; + MarshalerServiceStub stub = + stubFactory.apply(channel).withCompression(codec.getMessageEncoding()); + return new DefaultGrpcExporter<>(type, channel, stub, timeoutNanos, compressionEnabled); + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporter.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporter.java new file mode 100644 index 00000000000..a27d913e700 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporter.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import io.grpc.ManagedChannel; +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.net.URI; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * An exporter of a {@link io.opentelemetry.exporter.otlp.internal.Marshaler} using the gRPC wire + * format. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public interface GrpcExporter { + + /** Returns a new {@link GrpcExporterBuilder}. */ + static GrpcExporterBuilder builder( + String type, + long defaultTimeoutSecs, + URI defaultEndpoint, + Supplier>> stubFactory, + String grpcEndpointPath) { + return GrpcExporterUtil.exporterBuilder( + type, defaultTimeoutSecs, defaultEndpoint, stubFactory, grpcEndpointPath); + } + + /** + * Exports the {@code exportRequest} which is a request {@link Marshaler} for {@code numItems} + * items. + */ + CompletableResultCode export(T exportRequest, int numItems); + + /** Shuts the exporter down. */ + CompletableResultCode shutdown(); +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporterBuilder.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporterBuilder.java new file mode 100644 index 00000000000..4b8780c8cd7 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporterBuilder.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import io.grpc.ManagedChannel; +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +/** A builder for {@link GrpcExporter}. */ +public interface GrpcExporterBuilder { + GrpcExporterBuilder setChannel(ManagedChannel channel); + + GrpcExporterBuilder setTimeout(long timeout, TimeUnit unit); + + GrpcExporterBuilder setTimeout(Duration timeout); + + GrpcExporterBuilder setEndpoint(String endpoint); + + GrpcExporterBuilder setCompression(String compressionMethod); + + GrpcExporterBuilder setTrustedCertificates(byte[] trustedCertificatesPem); + + GrpcExporterBuilder addHeader(String key, String value); + + GrpcExporter build(); +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporterUtil.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporterUtil.java new file mode 100644 index 00000000000..e1b3151c1b3 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporterUtil.java @@ -0,0 +1,50 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import io.grpc.ManagedChannel; +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import java.net.URI; +import java.util.function.Function; +import java.util.function.Supplier; + +final class GrpcExporterUtil { + + private static final boolean USE_OKHTTP; + + static { + boolean useOkhttp = false; + // Use the OkHttp exporter if ManagedChannel is not found and OkHttp is. + try { + Class.forName("io.grpc.ManagedChannel"); + } catch (ClassNotFoundException e) { + try { + Class.forName("okhttp3.OkHttpClient"); + useOkhttp = true; + } catch (ClassNotFoundException classNotFoundException) { + // Fall through + } + } + USE_OKHTTP = useOkhttp; + } + + static GrpcExporterBuilder exporterBuilder( + String type, + long defaultTimeoutSecs, + URI defaultEndpoint, + Supplier>> stubFactory, + String grpcEndpointPath) { + if (USE_OKHTTP) { + return new OkHttpGrpcExporterBuilder<>( + type, grpcEndpointPath, defaultTimeoutSecs, defaultEndpoint); + } else { + return new DefaultGrpcExporterBuilder<>( + type, stubFactory.get(), defaultTimeoutSecs, defaultEndpoint); + } + } + + private GrpcExporterUtil() {} +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcRequestBody.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcRequestBody.java new file mode 100644 index 00000000000..565a427c7ed --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcRequestBody.java @@ -0,0 +1,81 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import java.io.IOException; +import javax.annotation.Nullable; +import okhttp3.MediaType; +import okhttp3.RequestBody; +import okio.Buffer; +import okio.BufferedSink; +import okio.GzipSink; +import okio.Okio; + +/** + * A {@link RequestBody} for reading from a {@link Marshaler} and writing in gRPC wire format. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +final class GrpcRequestBody extends RequestBody { + + private static final int HEADER_LENGTH = 5; + + private static final byte UNCOMPRESSED_FLAG = 0; + private static final byte COMPRESSED_FLAG = 1; + + private static final MediaType GRPC_MEDIA_TYPE = MediaType.parse("application/grpc"); + + private final Marshaler marshaler; + private final int messageSize; + private final int contentLength; + private final boolean compressed; + + /** Creates a new {@link GrpcRequestBody}. */ + GrpcRequestBody(Marshaler marshaler, boolean compressed) { + this.marshaler = marshaler; + this.compressed = compressed; + + messageSize = marshaler.getBinarySerializedSize(); + if (compressed) { + // Content length not known since we want to compress on the I/O thread. + contentLength = -1; + } else { + contentLength = HEADER_LENGTH + messageSize; + } + } + + @Nullable + @Override + public MediaType contentType() { + return GRPC_MEDIA_TYPE; + } + + @Override + public long contentLength() { + return contentLength; + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + if (!compressed) { + sink.writeByte(UNCOMPRESSED_FLAG); + sink.writeInt(messageSize); + marshaler.writeBinaryTo(sink.outputStream()); + } else { + try (Buffer compressedBody = new Buffer()) { + try (BufferedSink gzipSink = Okio.buffer(new GzipSink(compressedBody))) { + marshaler.writeBinaryTo(gzipSink.outputStream()); + } + sink.writeByte(COMPRESSED_FLAG); + int compressedBytes = (int) compressedBody.size(); + sink.writeInt(compressedBytes); + sink.write(compressedBody, compressedBytes); + } + } + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/MarshalerServiceStub.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/MarshalerServiceStub.java new file mode 100644 index 00000000000..02f3bac7ff8 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/MarshalerServiceStub.java @@ -0,0 +1,28 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.stub.AbstractFutureStub; +import io.opentelemetry.exporter.otlp.internal.Marshaler; + +/** + * A gRPC stub that uses a {@link Marshaler}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public abstract class MarshalerServiceStub< + T extends Marshaler, U, S extends MarshalerServiceStub> + extends AbstractFutureStub { + protected MarshalerServiceStub(Channel channel, CallOptions callOptions) { + super(channel, callOptions); + } + + public abstract ListenableFuture export(T request); +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/OkHttpGrpcExporter.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/OkHttpGrpcExporter.java new file mode 100644 index 00000000000..fa838263a26 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/OkHttpGrpcExporter.java @@ -0,0 +1,234 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: + +/* + * Copyright 2014 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.BoundLongCounter; +import io.opentelemetry.api.metrics.GlobalMeterProvider; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.internal.ThrottlingLogger; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.Headers; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + +/** + * A {@link GrpcExporter} which uses OkHttp instead of grpc-java. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class OkHttpGrpcExporter implements GrpcExporter { + + private static final String GRPC_STATUS = "grpc-status"; + private static final String GRPC_MESSAGE = "grpc-message"; + + private final ThrottlingLogger logger = + new ThrottlingLogger(Logger.getLogger(OkHttpGrpcExporter.class.getName())); + + private final String type; + private final OkHttpClient client; + private final String endpoint; + private final Headers headers; + private final boolean compressionEnabled; + + private final BoundLongCounter seen; + private final BoundLongCounter success; + private final BoundLongCounter failed; + + /** Creates a new {@link OkHttpGrpcExporter}. */ + OkHttpGrpcExporter( + String type, + OkHttpClient client, + String endpoint, + Headers headers, + boolean compressionEnabled) { + this.type = type; + this.client = client; + this.endpoint = endpoint; + this.headers = headers; + this.compressionEnabled = compressionEnabled; + + Meter meter = GlobalMeterProvider.get().get("io.opentelemetry.exporters.otlp-grpc-okhttp"); + Attributes attributes = Attributes.builder().put("type", type).build(); + seen = meter.counterBuilder("otlp.exporter.seen").build().bind(attributes); + LongCounter exported = meter.counterBuilder("otlp.exported.exported").build(); + success = exported.bind(attributes.toBuilder().put("success", true).build()); + failed = exported.bind(attributes.toBuilder().put("success", false).build()); + } + + @Override + public CompletableResultCode export(T exportRequest, int numItems) { + seen.add(numItems); + + Request.Builder requestBuilder = new Request.Builder().url(endpoint).headers(headers); + + RequestBody requestBody = new GrpcRequestBody(exportRequest, compressionEnabled); + requestBuilder.post(requestBody); + + CompletableResultCode result = new CompletableResultCode(); + + client + .newCall(requestBuilder.build()) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + failed.add(numItems); + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. The request could not be executed. Full error message: " + + e.getMessage()); + result.fail(); + } + + @Override + public void onResponse(Call call, Response response) { + // Response body is empty but must be consumed to access trailers. + try { + response.body().bytes(); + } catch (IOException e) { + logger.log( + Level.WARNING, + "Failed to export " + type + "s, could not consume server response.", + e); + failed.add(numItems); + result.fail(); + return; + } + + String status = grpcStatus(response); + if ("0".equals(status)) { + success.add(numItems); + result.succeed(); + return; + } + + failed.add(numItems); + + String codeMessage = + status != null + ? "gRPC status code " + status + : "HTTP status code " + response.code(); + String errorMessage = grpcMessage(response); + logger.log( + Level.WARNING, + "Failed to export " + + type + + "s. Server responded with " + + codeMessage + + ". Error message: " + + errorMessage); + result.fail(); + } + }); + + return result; + } + + @Nullable + private static String grpcStatus(Response response) { + // Status can either be in the headers or trailers depending on error + String grpcStatus = response.header(GRPC_STATUS); + if (grpcStatus == null) { + try { + grpcStatus = response.trailers().get(GRPC_STATUS); + } catch (IOException e) { + // Could not read a status, this generally means the HTTP status is the error. + return null; + } + } + return grpcStatus; + } + + private static String grpcMessage(Response response) { + String message = response.header(GRPC_MESSAGE); + if (message == null) { + try { + message = response.trailers().get(GRPC_MESSAGE); + } catch (IOException e) { + // Fall through + } + } + if (message != null) { + return unescape(message); + } + // Couldn't get message for some reason, use the HTTP status. + return response.message(); + } + + @Override + public CompletableResultCode shutdown() { + client.dispatcher().cancelAll(); + this.seen.unbind(); + this.success.unbind(); + this.failed.unbind(); + return CompletableResultCode.ofSuccess(); + } + + // From grpc-java + + /** Unescape the provided ascii to a unicode {@link String}. */ + private static String unescape(String value) { + for (int i = 0; i < value.length(); i++) { + final char c = value.charAt(i); + if (c < ' ' || c >= '~' || (c == '%' && i + 2 < value.length())) { + return doUnescape(value.getBytes(StandardCharsets.US_ASCII)); + } + } + return value; + } + + private static String doUnescape(byte[] value) { + final ByteBuffer buf = ByteBuffer.allocate(value.length); + for (int i = 0; i < value.length; ) { + if (value[i] == '%' && i + 2 < value.length) { + try { + buf.put((byte) Integer.parseInt(new String(value, i + 1, 2, StandardCharsets.UTF_8), 16)); + i += 3; + continue; + } catch (NumberFormatException e) { + // ignore, fall through, just push the bytes. + } + } + buf.put(value[i]); + i += 1; + } + return new String(buf.array(), 0, buf.position(), StandardCharsets.UTF_8); + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/OkHttpGrpcExporterBuilder.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/OkHttpGrpcExporterBuilder.java new file mode 100644 index 00000000000..93e57fa1017 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/OkHttpGrpcExporterBuilder.java @@ -0,0 +1,137 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import io.grpc.ManagedChannel; +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import io.opentelemetry.exporter.otlp.internal.TlsUtil; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import javax.net.ssl.SSLException; +import javax.net.ssl.X509TrustManager; +import okhttp3.Headers; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; + +/** + * A builder for {@link OkHttpGrpcExporter}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class OkHttpGrpcExporterBuilder + implements GrpcExporterBuilder { + + private final String type; + private final String grpcEndpointPath; + + private long timeoutNanos; + private URI endpoint; + private boolean compressionEnabled = false; + private final Headers.Builder headers = new Headers.Builder(); + @Nullable private byte[] trustedCertificatesPem; + + /** Creates a new {@link OkHttpGrpcExporterBuilder}. */ + OkHttpGrpcExporterBuilder( + String type, String grpcEndpointPath, long defaultTimeoutSecs, URI defaultEndpoint) { + this.type = type; + this.grpcEndpointPath = grpcEndpointPath; + timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs); + endpoint = defaultEndpoint; + } + + @Override + public OkHttpGrpcExporterBuilder setChannel(ManagedChannel channel) { + throw new UnsupportedOperationException("Only available on DefaultGrpcExporter"); + } + + @Override + public OkHttpGrpcExporterBuilder setTimeout(long timeout, TimeUnit unit) { + timeoutNanos = unit.toNanos(timeout); + return this; + } + + @Override + public OkHttpGrpcExporterBuilder setTimeout(Duration timeout) { + return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + } + + @Override + public OkHttpGrpcExporterBuilder setEndpoint(String endpoint) { + URI uri; + try { + uri = new URI(endpoint); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e); + } + + if (uri.getScheme() == null + || (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) { + throw new IllegalArgumentException( + "Invalid endpoint, must start with http:// or https://: " + uri); + } + + this.endpoint = uri; + return this; + } + + @Override + public OkHttpGrpcExporterBuilder setCompression(String compressionMethod) { + this.compressionEnabled = true; + return this; + } + + @Override + public OkHttpGrpcExporterBuilder setTrustedCertificates(byte[] trustedCertificatesPem) { + this.trustedCertificatesPem = trustedCertificatesPem; + return this; + } + + @Override + public OkHttpGrpcExporterBuilder addHeader(String key, String value) { + headers.add(key, value); + return this; + } + + @Override + public GrpcExporter build() { + OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder(); + + Headers.Builder headers = this.headers != null ? this.headers : new Headers.Builder(); + + clientBuilder.callTimeout(Duration.ofNanos(timeoutNanos)); + + if (trustedCertificatesPem != null) { + try { + X509TrustManager trustManager = TlsUtil.trustManager(trustedCertificatesPem); + clientBuilder.sslSocketFactory(TlsUtil.sslSocketFactory(trustManager), trustManager); + } catch (SSLException e) { + throw new IllegalStateException( + "Could not set trusted certificates, are they valid X.509 in PEM format?", e); + } + } + + String endpoint = this.endpoint.resolve(grpcEndpointPath).toString(); + if (endpoint.startsWith("http://")) { + clientBuilder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE)); + } else { + clientBuilder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1)); + } + + headers.add("te", "trailers"); + if (compressionEnabled) { + headers.add("grpc-encoding", "gzip"); + } + + return new OkHttpGrpcExporter<>( + type, clientBuilder.build(), endpoint, headers.build(), compressionEnabled); + } +} diff --git a/exporters/otlp/logs/build.gradle.kts b/exporters/otlp/logs/build.gradle.kts index 0e8997db7cf..536c41b3218 100644 --- a/exporters/otlp/logs/build.gradle.kts +++ b/exporters/otlp/logs/build.gradle.kts @@ -14,6 +14,7 @@ testSets { create("testGrpcNetty") create("testGrpcNettyShaded") create("testGrpcOkhttp") + create("testOkHttpOnly") } dependencies { @@ -50,10 +51,24 @@ dependencies { add("testGrpcOkhttpImplementation", "com.linecorp.armeria:armeria-junit5") add("testGrpcOkhttpRuntimeOnly", "io.grpc:grpc-okhttp") add("testGrpcOkhttpRuntimeOnly", "org.bouncycastle:bcpkix-jdk15on") + + add("testOkHttpOnlyImplementation", "com.linecorp.armeria:armeria-grpc-protocol") + add("testOkHttpOnlyImplementation", "com.linecorp.armeria:armeria-junit5") + add("testOkHttpOnlyImplementation", "com.squareup.okhttp3:okhttp") + add("testOkHttpOnlyImplementation", "com.squareup.okhttp3:okhttp-tls") + add("testOkHttpOnlyRuntimeOnly", "org.bouncycastle:bcpkix-jdk15on") } tasks { named("check") { - dependsOn("testGrpcNetty", "testGrpcNettyShaded", "testGrpcOkhttp") + dependsOn("testGrpcNetty", "testGrpcNettyShaded", "testGrpcOkhttp", "testOkHttpOnly") + } +} + +configurations { + named("testOkHttpOnlyRuntimeClasspath") { + dependencies { + exclude("io.grpc") + } } } diff --git a/exporters/otlp/logs/src/main/java/io/opentelemetry/exporter/otlp/logs/MarshalerLogsServiceGrpc.java b/exporters/otlp/logs/src/main/java/io/opentelemetry/exporter/otlp/logs/MarshalerLogsServiceGrpc.java index 45ef09a679c..4ec1c68ff28 100644 --- a/exporters/otlp/logs/src/main/java/io/opentelemetry/exporter/otlp/logs/MarshalerLogsServiceGrpc.java +++ b/exporters/otlp/logs/src/main/java/io/opentelemetry/exporter/otlp/logs/MarshalerLogsServiceGrpc.java @@ -7,8 +7,13 @@ import static io.grpc.MethodDescriptor.generateFullMethodName; +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.CallOptions; +import io.grpc.Channel; import io.grpc.MethodDescriptor; +import io.grpc.stub.ClientCalls; import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerInputStream; +import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerServiceStub; import io.opentelemetry.exporter.otlp.internal.logs.LogsRequestMarshaler; import java.io.InputStream; @@ -52,25 +57,26 @@ public ExportLogsServiceResponse parse(InputStream stream) { .setResponseMarshaller(RESPONSE_MARSHALER) .build(); - static LogsServiceFutureStub newFutureStub(io.grpc.Channel channel) { + static LogsServiceFutureStub newFutureStub(Channel channel) { return LogsServiceFutureStub.newStub(LogsServiceFutureStub::new, channel); } static final class LogsServiceFutureStub - extends io.grpc.stub.AbstractFutureStub { - private LogsServiceFutureStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { + extends MarshalerServiceStub< + LogsRequestMarshaler, ExportLogsServiceResponse, LogsServiceFutureStub> { + private LogsServiceFutureStub(Channel channel, CallOptions callOptions) { super(channel, callOptions); } @Override protected MarshalerLogsServiceGrpc.LogsServiceFutureStub build( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { + Channel channel, CallOptions callOptions) { return new MarshalerLogsServiceGrpc.LogsServiceFutureStub(channel, callOptions); } - com.google.common.util.concurrent.ListenableFuture export( - LogsRequestMarshaler request) { - return io.grpc.stub.ClientCalls.futureUnaryCall( + @Override + public ListenableFuture export(LogsRequestMarshaler request) { + return ClientCalls.futureUnaryCall( getChannel().newCall(getExportMethod, getCallOptions()), request); } } diff --git a/exporters/otlp/logs/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogExporter.java b/exporters/otlp/logs/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogExporter.java index 2ae7f5948b4..3151f47dc46 100644 --- a/exporters/otlp/logs/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogExporter.java +++ b/exporters/otlp/logs/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogExporter.java @@ -5,144 +5,29 @@ package io.opentelemetry.exporter.otlp.logs; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; -import io.grpc.Codec; -import io.grpc.ManagedChannel; -import io.grpc.Status; -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.BoundLongCounter; -import io.opentelemetry.api.metrics.GlobalMeterProvider; -import io.opentelemetry.api.metrics.LongCounter; -import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.otlp.internal.logs.LogsRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.logging.data.LogRecord; import io.opentelemetry.sdk.logging.export.LogExporter; import java.util.Collection; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; /** Exports logs using OTLP via gRPC, using OpenTelemetry's protobuf model. */ @ThreadSafe public final class OtlpGrpcLogExporter implements LogExporter { - private static final AttributeKey EXPORTER_KEY = AttributeKey.stringKey("exporter"); - private static final AttributeKey SUCCESS_KEY = AttributeKey.stringKey("success"); - private static final String EXPORTER_NAME = OtlpGrpcLogExporter.class.getSimpleName(); - private static final Attributes EXPORTER_NAME_Attributes = - Attributes.of(EXPORTER_KEY, EXPORTER_NAME); - private static final Attributes EXPORT_SUCCESS_ATTRIBUTES = - Attributes.of(EXPORTER_KEY, EXPORTER_NAME, SUCCESS_KEY, "true"); - private static final Attributes EXPORT_FAILURE_ATTRIBUTES = - Attributes.of(EXPORTER_KEY, EXPORTER_NAME, SUCCESS_KEY, "false"); - private static final Logger internalLogger = - Logger.getLogger(OtlpGrpcLogExporter.class.getName()); - - private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); - - private final MarshalerLogsServiceGrpc.LogsServiceFutureStub logsService; - - private final ManagedChannel managedChannel; - private final long timeoutNanos; - private final BoundLongCounter logsSeen; - private final BoundLongCounter logsExportedSuccess; - private final BoundLongCounter logsExportedFailure; - - /** - * Creates a new OTLP gRPC Logs Reporter with the given name, using the given channel. - * - * @param channel the channel to use when communicating with the OpenTelemetry Collector. - * @param timeoutNanos max waiting time for the collector to process each log batch. When set to 0 - * or to a negative value, the exporter will wait indefinitely. - * @param compressionEnabled whether or not to enable gzip compression. - */ - OtlpGrpcLogExporter(ManagedChannel channel, long timeoutNanos, boolean compressionEnabled) { - // TODO: telemetry schema version. - Meter meter = GlobalMeterProvider.get().meterBuilder("io.opentelemetry.exporters.otlp").build(); - this.logsSeen = - meter.counterBuilder("logsSeenByExporter").build().bind(EXPORTER_NAME_Attributes); - LongCounter logsExportedCounter = meter.counterBuilder("logsExportedByExporter").build(); - this.logsExportedSuccess = logsExportedCounter.bind(EXPORT_SUCCESS_ATTRIBUTES); - this.logsExportedFailure = logsExportedCounter.bind(EXPORT_FAILURE_ATTRIBUTES); - this.managedChannel = channel; - this.timeoutNanos = timeoutNanos; - Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE; - this.logsService = - MarshalerLogsServiceGrpc.newFutureStub(channel).withCompression(codec.getMessageEncoding()); - } + private final GrpcExporter delegate; /** - * Submits all the given logs in a single batch to the OpenTelemetry collector. + * Returns a new {@link OtlpGrpcLogExporter} reading the configuration values from the environment + * and from system properties. System properties override values defined in the environment. If a + * configuration value is missing, it uses the default value. * - * @param logs the list of sampled logs to be exported. - * @return the result of the operation + * @return a new {@link OtlpGrpcLogExporter} instance. */ - @Override - public CompletableResultCode export(Collection logs) { - logsSeen.add(logs.size()); - LogsRequestMarshaler request = LogsRequestMarshaler.create(logs); - - final CompletableResultCode result = new CompletableResultCode(); - - MarshalerLogsServiceGrpc.LogsServiceFutureStub exporter; - if (timeoutNanos > 0) { - exporter = logsService.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS); - } else { - exporter = logsService; - } - - Futures.addCallback( - exporter.export(request), - new FutureCallback() { - @Override - public void onSuccess(@Nullable ExportLogsServiceResponse response) { - logsExportedSuccess.add(logs.size()); - result.succeed(); - } - - @Override - public void onFailure(Throwable t) { - logsExportedFailure.add(logs.size()); - Status status = Status.fromThrowable(t); - switch (status.getCode()) { - case UNIMPLEMENTED: - logger.log( - Level.SEVERE, - "Failed to export logs. Server responded with UNIMPLEMENTED. " - + "This usually means that your collector is not configured with an otlp " - + "receiver in the \"pipelines\" section of the configuration. " - + "Full error message: " - + t.getMessage()); - break; - case UNAVAILABLE: - logger.log( - Level.SEVERE, - "Failed to export logs. Server is UNAVAILABLE. " - + "Make sure your collector is running and reachable from this network. " - + "Full error message:" - + t.getMessage()); - break; - default: - logger.log( - Level.WARNING, "Failed to export logs. Error message: " + t.getMessage()); - break; - } - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "Failed to export logs. Details follow: " + t); - } - result.fail(); - } - }, - MoreExecutors.directExecutor()); - return result; + public static OtlpGrpcLogExporter getDefault() { + return builder().build(); } /** @@ -154,15 +39,20 @@ public static OtlpGrpcLogExporterBuilder builder() { return new OtlpGrpcLogExporterBuilder(); } + OtlpGrpcLogExporter(GrpcExporter delegate) { + this.delegate = delegate; + } + /** - * Returns a new {@link OtlpGrpcLogExporter} reading the configuration values from the environment - * and from system properties. System properties override values defined in the environment. If a - * configuration value is missing, it uses the default value. + * Submits all the given logs in a single batch to the OpenTelemetry collector. * - * @return a new {@link OtlpGrpcLogExporter} instance. + * @param logs the list of sampled logs to be exported. + * @return the result of the operation */ - public static OtlpGrpcLogExporter getDefault() { - return builder().build(); + @Override + public CompletableResultCode export(Collection logs) { + LogsRequestMarshaler request = LogsRequestMarshaler.create(logs); + return delegate.export(request, logs.size()); } /** @@ -171,17 +61,6 @@ public static OtlpGrpcLogExporter getDefault() { */ @Override public CompletableResultCode shutdown() { - if (managedChannel.isTerminated()) { - return CompletableResultCode.ofSuccess(); - } - this.logsSeen.unbind(); - this.logsExportedSuccess.unbind(); - this.logsExportedFailure.unbind(); - return ManagedChannelUtil.shutdownChannel(managedChannel); - } - - // Visible for testing - long getTimeoutNanos() { - return timeoutNanos; + return delegate.shutdown(); } } diff --git a/exporters/otlp/logs/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogExporterBuilder.java b/exporters/otlp/logs/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogExporterBuilder.java index 15b59e6f17c..08b14bc6154 100644 --- a/exporters/otlp/logs/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogExporterBuilder.java +++ b/exporters/otlp/logs/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogExporterBuilder.java @@ -5,35 +5,40 @@ package io.opentelemetry.exporter.otlp.logs; -import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; import static io.opentelemetry.api.internal.Utils.checkArgument; import static java.util.Objects.requireNonNull; import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.Metadata; -import io.grpc.stub.MetadataUtils; -import io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporterBuilder; +import io.opentelemetry.exporter.otlp.internal.logs.LogsRequestMarshaler; import java.net.URI; -import java.net.URISyntaxException; import java.time.Duration; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import javax.net.ssl.SSLException; /** Builder for {@link OtlpGrpcLogExporter}. */ public final class OtlpGrpcLogExporterBuilder { + // Visible for testing + static final String GRPC_ENDPOINT_PATH = + "/opentelemetry.proto.collector.logs.v1.LogsService/Export"; + private static final String DEFAULT_ENDPOINT_URL = "http://localhost:4317"; private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL); private static final long DEFAULT_TIMEOUT_SECS = 10; - @Nullable private ManagedChannel channel; - private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS); - private URI endpoint = DEFAULT_ENDPOINT; - private boolean compressionEnabled = false; - @Nullable private Metadata metadata; - @Nullable private byte[] trustedCertificatesPem; + // Visible for testing + final GrpcExporterBuilder delegate; + + OtlpGrpcLogExporterBuilder() { + delegate = + GrpcExporter.builder( + "log", + DEFAULT_TIMEOUT_SECS, + DEFAULT_ENDPOINT, + () -> MarshalerLogsServiceGrpc::newFutureStub, + GRPC_ENDPOINT_PATH); + } /** * Sets the managed chanel to use when communicating with the backend. Takes precedence over @@ -43,7 +48,7 @@ public final class OtlpGrpcLogExporterBuilder { * @return this builder's instance */ public OtlpGrpcLogExporterBuilder setChannel(ManagedChannel channel) { - this.channel = channel; + delegate.setChannel(channel); return this; } @@ -54,7 +59,7 @@ public OtlpGrpcLogExporterBuilder setChannel(ManagedChannel channel) { public OtlpGrpcLogExporterBuilder setTimeout(long timeout, TimeUnit unit) { requireNonNull(unit, "unit"); checkArgument(timeout >= 0, "timeout must be non-negative"); - timeoutNanos = unit.toNanos(timeout); + delegate.setTimeout(timeout, unit); return this; } @@ -64,7 +69,8 @@ public OtlpGrpcLogExporterBuilder setTimeout(long timeout, TimeUnit unit) { */ public OtlpGrpcLogExporterBuilder setTimeout(Duration timeout) { requireNonNull(timeout, "timeout"); - return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + delegate.setTimeout(timeout); + return this; } /** @@ -73,21 +79,7 @@ public OtlpGrpcLogExporterBuilder setTimeout(Duration timeout) { */ public OtlpGrpcLogExporterBuilder setEndpoint(String endpoint) { requireNonNull(endpoint, "endpoint"); - - URI uri; - try { - uri = new URI(endpoint); - } catch (URISyntaxException e) { - throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e); - } - - if (uri.getScheme() == null - || (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) { - throw new IllegalArgumentException( - "Invalid endpoint, must start with http:// or https://: " + uri); - } - - this.endpoint = uri; + delegate.setEndpoint(endpoint); return this; } @@ -100,9 +92,7 @@ public OtlpGrpcLogExporterBuilder setCompression(String compressionMethod) { checkArgument( compressionMethod.equals("gzip") || compressionMethod.equals("none"), "Unsupported compression method. Supported compression methods include: gzip, none."); - if (compressionMethod.equals("gzip")) { - this.compressionEnabled = true; - } + delegate.setCompression(compressionMethod); return this; } @@ -112,23 +102,20 @@ public OtlpGrpcLogExporterBuilder setCompression(String compressionMethod) { * use the system default trusted certificates. */ public OtlpGrpcLogExporterBuilder setTrustedCertificates(byte[] trustedCertificatesPem) { - this.trustedCertificatesPem = trustedCertificatesPem; + delegate.setTrustedCertificates(trustedCertificatesPem); return this; } /** - * Add header to request. Optional. Applicable only if {@link OtlpGrpcLogExporterBuilder#endpoint} - * is set to build channel. + * Add header to request. Optional. Applicable only if {@link + * OtlpGrpcLogExporterBuilder#setChannel(ManagedChannel)} is not used to set channel. * * @param key header key * @param value header value * @return this builder's instance */ public OtlpGrpcLogExporterBuilder addHeader(String key, String value) { - if (metadata == null) { - metadata = new Metadata(); - } - metadata.put(Metadata.Key.of(key, ASCII_STRING_MARSHALLER), value); + delegate.addHeader(key, value); return this; } @@ -138,37 +125,6 @@ public OtlpGrpcLogExporterBuilder addHeader(String key, String value) { * @return a new exporter's instance */ public OtlpGrpcLogExporter build() { - ManagedChannel channel = this.channel; - if (channel == null) { - final ManagedChannelBuilder managedChannelBuilder = - ManagedChannelBuilder.forTarget(endpoint.getAuthority()); - - if (endpoint.getScheme().equals("https")) { - managedChannelBuilder.useTransportSecurity(); - } else { - managedChannelBuilder.usePlaintext(); - } - - if (metadata != null) { - managedChannelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata)); - } - - if (trustedCertificatesPem != null) { - try { - ManagedChannelUtil.setTrustedCertificatesPem( - managedChannelBuilder, trustedCertificatesPem); - } catch (SSLException e) { - throw new IllegalStateException( - "Could not set trusted certificates for gRPC TLS connection, are they valid " - + "X.509 in PEM format?", - e); - } - } - - channel = managedChannelBuilder.build(); - } - return new OtlpGrpcLogExporter(channel, timeoutNanos, compressionEnabled); + return new OtlpGrpcLogExporter(delegate.build()); } - - OtlpGrpcLogExporterBuilder() {} } diff --git a/exporters/otlp/logs/src/test/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogsExporterTest.java b/exporters/otlp/logs/src/test/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogsExporterTest.java index 21f2c988242..7f06e980976 100644 --- a/exporters/otlp/logs/src/test/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogsExporterTest.java +++ b/exporters/otlp/logs/src/test/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogsExporterTest.java @@ -22,6 +22,8 @@ import io.opentelemetry.api.trace.SpanId; import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporter; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder; import io.opentelemetry.exporter.otlp.internal.logs.ResourceLogsMarshaler; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; @@ -59,7 +61,7 @@ class OtlpGrpcLogsExporterTest { private final Closer closer = Closer.create(); @RegisterExtension - LogCapturer logs = LogCapturer.create().captureForType(OtlpGrpcLogExporter.class); + LogCapturer logs = LogCapturer.create().captureForType(DefaultGrpcExporter.class); @BeforeEach public void setup() throws IOException { @@ -346,6 +348,12 @@ void testExport_PermissionDenied() { } } + @Test + void usingGrpc() { + assertThat(OtlpGrpcLogExporter.builder().delegate) + .isInstanceOf(DefaultGrpcExporterBuilder.class); + } + private static LogRecord generateFakeLog() { return LogRecord.builder( Resource.create(Attributes.builder().put("testKey", "testValue").build()), diff --git a/exporters/otlp/logs/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/logs/ExportTest.java b/exporters/otlp/logs/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/logs/ExportTest.java index 43821cfd0ec..cfca7a6db1c 100644 --- a/exporters/otlp/logs/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/logs/ExportTest.java +++ b/exporters/otlp/logs/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/logs/ExportTest.java @@ -17,6 +17,7 @@ import io.opentelemetry.api.trace.SpanId; import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc; @@ -134,4 +135,10 @@ void tlsBadCert() { .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Could not set trusted certificates"); } + + @Test + void usingGrpc() { + assertThat(OtlpGrpcLogExporter.builder().delegate) + .isInstanceOf(DefaultGrpcExporterBuilder.class); + } } diff --git a/exporters/otlp/logs/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/logs/ExportTest.java b/exporters/otlp/logs/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/logs/ExportTest.java index 43821cfd0ec..cfca7a6db1c 100644 --- a/exporters/otlp/logs/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/logs/ExportTest.java +++ b/exporters/otlp/logs/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/logs/ExportTest.java @@ -17,6 +17,7 @@ import io.opentelemetry.api.trace.SpanId; import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc; @@ -134,4 +135,10 @@ void tlsBadCert() { .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Could not set trusted certificates"); } + + @Test + void usingGrpc() { + assertThat(OtlpGrpcLogExporter.builder().delegate) + .isInstanceOf(DefaultGrpcExporterBuilder.class); + } } diff --git a/exporters/otlp/logs/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/logs/ExportTest.java b/exporters/otlp/logs/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/logs/ExportTest.java index 43821cfd0ec..cfca7a6db1c 100644 --- a/exporters/otlp/logs/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/logs/ExportTest.java +++ b/exporters/otlp/logs/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/logs/ExportTest.java @@ -17,6 +17,7 @@ import io.opentelemetry.api.trace.SpanId; import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc; @@ -134,4 +135,10 @@ void tlsBadCert() { .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Could not set trusted certificates"); } + + @Test + void usingGrpc() { + assertThat(OtlpGrpcLogExporter.builder().delegate) + .isInstanceOf(DefaultGrpcExporterBuilder.class); + } } diff --git a/exporters/otlp/logs/src/testOkHttpOnly/java/io/opentelemetry/exporter/otlp/logs/OkHttpOnlyExportTest.java b/exporters/otlp/logs/src/testOkHttpOnly/java/io/opentelemetry/exporter/otlp/logs/OkHttpOnlyExportTest.java new file mode 100644 index 00000000000..b89e605314c --- /dev/null +++ b/exporters/otlp/logs/src/testOkHttpOnly/java/io/opentelemetry/exporter/otlp/logs/OkHttpOnlyExportTest.java @@ -0,0 +1,158 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.logs; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.grpc.protocol.AbstractUnaryGrpcService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.exporter.otlp.internal.grpc.OkHttpGrpcExporterBuilder; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.logging.data.LogRecord; +import io.opentelemetry.sdk.resources.Resource; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import okhttp3.tls.HeldCertificate; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class OkHttpOnlyExportTest { + + private static final List LOGS = + Collections.singletonList( + LogRecord.builder( + Resource.create(Attributes.builder().put("testKey", "testValue").build()), + InstrumentationLibraryInfo.create("instrumentation", "1")) + .setUnixTimeMillis(System.currentTimeMillis()) + .setTraceId(TraceId.getInvalid()) + .setSpanId(SpanId.getInvalid()) + .setFlags(TraceFlags.getDefault().asByte()) + .setSeverity(LogRecord.Severity.ERROR) + .setSeverityText("really severe") + .setName("log1") + .setBody("message") + .setAttributes(Attributes.builder().put("animal", "cat").build()) + .build()); + + private static final HeldCertificate HELD_CERTIFICATE; + + static { + try { + HELD_CERTIFICATE = + new HeldCertificate.Builder() + .commonName("localhost") + .addSubjectAlternativeName(InetAddress.getByName("localhost").getCanonicalHostName()) + .build(); + } catch (UnknownHostException e) { + throw new IllegalStateException("Error building certificate.", e); + } + } + + @RegisterExtension + @Order(2) + public static ServerExtension server = + new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.service( + OtlpGrpcLogExporterBuilder.GRPC_ENDPOINT_PATH, + new AbstractUnaryGrpcService() { + @Override + protected CompletionStage handleMessage( + ServiceRequestContext ctx, byte[] message) { + return CompletableFuture.completedFuture( + ExportLogsServiceResponse.getDefaultInstance().toByteArray()); + } + }); + sb.http(0); + sb.https(0); + sb.tls(HELD_CERTIFICATE.keyPair().getPrivate(), HELD_CERTIFICATE.certificate()); + } + }; + + // NB: Armeria does not support decompression without using the actual grpc-java (naturally + // this is the same for grpc-java as a test server). The failure does indicate compression, or at + // least some sort of data transformation was attempted. Separate integration tests using the + // OTel collector verify that this is indeed correct compression. + @Test + void gzipCompressionExport() { + OtlpGrpcLogExporter exporter = + OtlpGrpcLogExporter.builder() + .setEndpoint("http://localhost:" + server.httpPort()) + .setCompression("gzip") + .build(); + // See note on test method on why this checks isFalse. + assertThat(exporter.export(LOGS).join(10, TimeUnit.SECONDS).isSuccess()).isFalse(); + } + + @Test + void plainTextExport() { + OtlpGrpcLogExporter exporter = + OtlpGrpcLogExporter.builder().setEndpoint("http://localhost:" + server.httpPort()).build(); + assertThat(exporter.export(LOGS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + } + + @Test + void authorityWithAuth() { + OtlpGrpcLogExporter exporter = + OtlpGrpcLogExporter.builder() + .setEndpoint("http://foo:bar@localhost:" + server.httpPort()) + .build(); + assertThat(exporter.export(LOGS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + } + + @Test + void testTlsExport() { + OtlpGrpcLogExporter exporter = + OtlpGrpcLogExporter.builder() + .setEndpoint("https://localhost:" + server.httpsPort()) + .setTrustedCertificates( + HELD_CERTIFICATE.certificatePem().getBytes(StandardCharsets.UTF_8)) + .build(); + assertThat(exporter.export(LOGS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + } + + @Test + void testTlsExport_untrusted() { + OtlpGrpcLogExporter exporter = + OtlpGrpcLogExporter.builder() + .setEndpoint("https://localhost:" + server.httpsPort()) + .build(); + assertThat(exporter.export(LOGS).join(10, TimeUnit.SECONDS).isSuccess()).isFalse(); + } + + @Test + void tlsBadCert() { + assertThatThrownBy( + () -> + OtlpGrpcLogExporter.builder() + .setTrustedCertificates("foobar".getBytes(StandardCharsets.UTF_8)) + .build()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Could not set trusted certificates"); + } + + @Test + void usingOkhttp() { + assertThat(OtlpGrpcLogExporter.builder().delegate) + .isInstanceOf(OkHttpGrpcExporterBuilder.class); + } +} diff --git a/exporters/otlp/metrics/build.gradle.kts b/exporters/otlp/metrics/build.gradle.kts index d3e3c8dd4de..77bd7952738 100644 --- a/exporters/otlp/metrics/build.gradle.kts +++ b/exporters/otlp/metrics/build.gradle.kts @@ -13,6 +13,7 @@ testSets { create("testGrpcNetty") create("testGrpcNettyShaded") create("testGrpcOkhttp") + create("testOkHttpOnly") } dependencies { @@ -44,10 +45,24 @@ dependencies { add("testGrpcOkhttpImplementation", "com.linecorp.armeria:armeria-junit5") add("testGrpcOkhttpRuntimeOnly", "io.grpc:grpc-okhttp") add("testGrpcOkhttpRuntimeOnly", "org.bouncycastle:bcpkix-jdk15on") + + add("testOkHttpOnlyImplementation", "com.linecorp.armeria:armeria-grpc-protocol") + add("testOkHttpOnlyImplementation", "com.linecorp.armeria:armeria-junit5") + add("testOkHttpOnlyImplementation", "com.squareup.okhttp3:okhttp") + add("testOkHttpOnlyImplementation", "com.squareup.okhttp3:okhttp-tls") + add("testOkHttpOnlyRuntimeOnly", "org.bouncycastle:bcpkix-jdk15on") } tasks { - named("check") { - dependsOn("testGrpcNetty", "testGrpcNettyShaded", "testGrpcOkhttp") + check { + dependsOn("testGrpcNetty", "testGrpcNettyShaded", "testGrpcOkhttp", "testOkHttpOnly") + } +} + +configurations { + named("testOkHttpOnlyRuntimeClasspath") { + dependencies { + exclude("io.grpc") + } } } diff --git a/exporters/otlp/metrics/src/main/java/io/opentelemetry/exporter/otlp/metrics/MarshalerMetricsServiceGrpc.java b/exporters/otlp/metrics/src/main/java/io/opentelemetry/exporter/otlp/metrics/MarshalerMetricsServiceGrpc.java index a5a9bd24b4d..465fb751c1e 100644 --- a/exporters/otlp/metrics/src/main/java/io/opentelemetry/exporter/otlp/metrics/MarshalerMetricsServiceGrpc.java +++ b/exporters/otlp/metrics/src/main/java/io/opentelemetry/exporter/otlp/metrics/MarshalerMetricsServiceGrpc.java @@ -7,8 +7,13 @@ import static io.grpc.MethodDescriptor.generateFullMethodName; +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.CallOptions; +import io.grpc.Channel; import io.grpc.MethodDescriptor; +import io.grpc.stub.ClientCalls; import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerInputStream; +import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerServiceStub; import io.opentelemetry.exporter.otlp.internal.metrics.MetricsRequestMarshaler; import java.io.InputStream; @@ -54,26 +59,26 @@ public ExportMetricsServiceResponse parse(InputStream stream) { .setResponseMarshaller(RESPONSE_MARSHALER) .build(); - static MetricsServiceFutureStub newFutureStub(io.grpc.Channel channel) { + static MetricsServiceFutureStub newFutureStub(Channel channel) { return MetricsServiceFutureStub.newStub(MetricsServiceFutureStub::new, channel); } static final class MetricsServiceFutureStub - extends io.grpc.stub.AbstractFutureStub< - MarshalerMetricsServiceGrpc.MetricsServiceFutureStub> { - private MetricsServiceFutureStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { + extends MarshalerServiceStub< + MetricsRequestMarshaler, ExportMetricsServiceResponse, MetricsServiceFutureStub> { + private MetricsServiceFutureStub(Channel channel, CallOptions callOptions) { super(channel, callOptions); } @Override protected MarshalerMetricsServiceGrpc.MetricsServiceFutureStub build( - io.grpc.Channel channel, io.grpc.CallOptions callOptions) { + Channel channel, CallOptions callOptions) { return new MarshalerMetricsServiceGrpc.MetricsServiceFutureStub(channel, callOptions); } - com.google.common.util.concurrent.ListenableFuture export( - MetricsRequestMarshaler request) { - return io.grpc.stub.ClientCalls.futureUnaryCall( + @Override + public ListenableFuture export(MetricsRequestMarshaler request) { + return ClientCalls.futureUnaryCall( getChannel().newCall(getExportMethod, getCallOptions()), request); } } diff --git a/exporters/otlp/metrics/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporter.java b/exporters/otlp/metrics/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporter.java index 6f6cc4314a6..5e3ce6f09f9 100644 --- a/exporters/otlp/metrics/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporter.java +++ b/exporters/otlp/metrics/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporter.java @@ -5,53 +5,42 @@ package io.opentelemetry.exporter.otlp.metrics; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; -import io.grpc.Codec; -import io.grpc.ManagedChannel; -import io.grpc.Status; -import io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.otlp.internal.metrics.MetricsRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.util.Collection; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; /** Exports metrics using OTLP via gRPC, using OpenTelemetry's protobuf model. */ @ThreadSafe public final class OtlpGrpcMetricExporter implements MetricExporter { - private static final Logger internalLogger = - Logger.getLogger(OtlpGrpcMetricExporter.class.getName()); + private final GrpcExporter delegate; - private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); - - private final MarshalerMetricsServiceGrpc.MetricsServiceFutureStub metricsService; - private final ManagedChannel managedChannel; - private final long timeoutNanos; + /** + * Returns a new {@link OtlpGrpcMetricExporter} reading the configuration values from the + * environment and from system properties. System properties override values defined in the + * environment. If a configuration value is missing, it uses the default value. + * + * @return a new {@link OtlpGrpcMetricExporter} instance. + */ + public static OtlpGrpcMetricExporter getDefault() { + return builder().build(); + } /** - * Creates a new OTLP gRPC Metric Reporter with the given name, using the given channel. + * Returns a new builder instance for this exporter. * - * @param channel the channel to use when communicating with the OpenTelemetry Collector. - * @param timeoutNanos max waiting time for the collector to process each metric batch. When set - * to 0 or to a negative value, the exporter will wait indefinitely. - * @param compressionEnabled whether or not to enable gzip compression. + * @return a new builder instance for this exporter. */ - OtlpGrpcMetricExporter(ManagedChannel channel, long timeoutNanos, boolean compressionEnabled) { - this.managedChannel = channel; - this.timeoutNanos = timeoutNanos; - Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE; - this.metricsService = - MarshalerMetricsServiceGrpc.newFutureStub(channel) - .withCompression(codec.getMessageEncoding()); + public static OtlpGrpcMetricExporterBuilder builder() { + return new OtlpGrpcMetricExporterBuilder(); + } + + OtlpGrpcMetricExporter(GrpcExporter delegate) { + this.delegate = delegate; } /** @@ -64,53 +53,7 @@ public final class OtlpGrpcMetricExporter implements MetricExporter { public CompletableResultCode export(Collection metrics) { MetricsRequestMarshaler request = MetricsRequestMarshaler.create(metrics); - final CompletableResultCode result = new CompletableResultCode(); - MarshalerMetricsServiceGrpc.MetricsServiceFutureStub exporter; - if (timeoutNanos > 0) { - exporter = metricsService.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS); - } else { - exporter = metricsService; - } - - Futures.addCallback( - exporter.export(request), - new FutureCallback() { - @Override - public void onSuccess(@Nullable ExportMetricsServiceResponse response) { - result.succeed(); - } - - @Override - public void onFailure(Throwable t) { - Status status = Status.fromThrowable(t); - switch (status.getCode()) { - case UNIMPLEMENTED: - logger.log( - Level.SEVERE, - "Failed to export metrics. Server responded with UNIMPLEMENTED. " - + "This usually means that your collector is not configured with an otlp " - + "receiver in the \"pipelines\" section of the configuration. " - + "Full error message: " - + t.getMessage()); - break; - case UNAVAILABLE: - logger.log( - Level.SEVERE, - "Failed to export metrics. Server is UNAVAILABLE. " - + "Make sure your collector is running and reachable from this network." - + t.getMessage()); - break; - default: - logger.log( - Level.WARNING, "Failed to export metrics. Error message: " + t.getMessage()); - break; - } - logger.log(Level.FINEST, "Failed to export metrics. Details follow: " + t); - result.fail(); - } - }, - MoreExecutors.directExecutor()); - return result; + return delegate.export(request, metrics.size()); } /** @@ -123,35 +66,12 @@ public CompletableResultCode flush() { return CompletableResultCode.ofSuccess(); } - /** - * Returns a new builder instance for this exporter. - * - * @return a new builder instance for this exporter. - */ - public static OtlpGrpcMetricExporterBuilder builder() { - return new OtlpGrpcMetricExporterBuilder(); - } - - /** - * Returns a new {@link OtlpGrpcMetricExporter} reading the configuration values from the - * environment and from system properties. System properties override values defined in the - * environment. If a configuration value is missing, it uses the default value. - * - * @return a new {@link OtlpGrpcMetricExporter} instance. - */ - public static OtlpGrpcMetricExporter getDefault() { - return builder().build(); - } - /** * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately * cancelled. The channel is forcefully closed after a timeout. */ @Override public CompletableResultCode shutdown() { - if (managedChannel.isTerminated()) { - return CompletableResultCode.ofSuccess(); - } - return ManagedChannelUtil.shutdownChannel(managedChannel); + return delegate.shutdown(); } } diff --git a/exporters/otlp/metrics/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java b/exporters/otlp/metrics/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java index d86903c9a91..3573d0b4a41 100644 --- a/exporters/otlp/metrics/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java +++ b/exporters/otlp/metrics/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java @@ -5,35 +5,40 @@ package io.opentelemetry.exporter.otlp.metrics; -import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; import static io.opentelemetry.api.internal.Utils.checkArgument; import static java.util.Objects.requireNonNull; import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.Metadata; -import io.grpc.stub.MetadataUtils; -import io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporterBuilder; +import io.opentelemetry.exporter.otlp.internal.metrics.MetricsRequestMarshaler; import java.net.URI; -import java.net.URISyntaxException; import java.time.Duration; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import javax.net.ssl.SSLException; /** Builder utility for this exporter. */ public final class OtlpGrpcMetricExporterBuilder { + // Visible for testing + static final String GRPC_ENDPOINT_PATH = + "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export"; + private static final String DEFAULT_ENDPOINT_URL = "http://localhost:4317"; private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL); private static final long DEFAULT_TIMEOUT_SECS = 10; - @Nullable private ManagedChannel channel; - private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS); - private URI endpoint = DEFAULT_ENDPOINT; - private boolean compressionEnabled = false; - @Nullable private Metadata metadata; - @Nullable private byte[] trustedCertificatesPem; + // Visible for testing + final GrpcExporterBuilder delegate; + + OtlpGrpcMetricExporterBuilder() { + delegate = + GrpcExporter.builder( + "metric", + DEFAULT_TIMEOUT_SECS, + DEFAULT_ENDPOINT, + () -> MarshalerMetricsServiceGrpc::newFutureStub, + GRPC_ENDPOINT_PATH); + } /** * Sets the managed chanel to use when communicating with the backend. Takes precedence over @@ -43,7 +48,7 @@ public final class OtlpGrpcMetricExporterBuilder { * @return this builder's instance */ public OtlpGrpcMetricExporterBuilder setChannel(ManagedChannel channel) { - this.channel = channel; + delegate.setChannel(channel); return this; } @@ -54,7 +59,7 @@ public OtlpGrpcMetricExporterBuilder setChannel(ManagedChannel channel) { public OtlpGrpcMetricExporterBuilder setTimeout(long timeout, TimeUnit unit) { requireNonNull(unit, "unit"); checkArgument(timeout >= 0, "timeout must be non-negative"); - timeoutNanos = unit.toNanos(timeout); + delegate.setTimeout(timeout, unit); return this; } @@ -64,7 +69,8 @@ public OtlpGrpcMetricExporterBuilder setTimeout(long timeout, TimeUnit unit) { */ public OtlpGrpcMetricExporterBuilder setTimeout(Duration timeout) { requireNonNull(timeout, "timeout"); - return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + delegate.setTimeout(timeout); + return this; } /** @@ -73,21 +79,7 @@ public OtlpGrpcMetricExporterBuilder setTimeout(Duration timeout) { */ public OtlpGrpcMetricExporterBuilder setEndpoint(String endpoint) { requireNonNull(endpoint, "endpoint"); - - URI uri; - try { - uri = new URI(endpoint); - } catch (URISyntaxException e) { - throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e); - } - - if (uri.getScheme() == null - || (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) { - throw new IllegalArgumentException( - "Invalid endpoint, must start with http:// or https://: " + uri); - } - - this.endpoint = uri; + delegate.setEndpoint(endpoint); return this; } @@ -100,9 +92,7 @@ public OtlpGrpcMetricExporterBuilder setCompression(String compressionMethod) { checkArgument( compressionMethod.equals("gzip") || compressionMethod.equals("none"), "Unsupported compression method. Supported compression methods include: gzip, none."); - if (compressionMethod.equals("gzip")) { - this.compressionEnabled = true; - } + delegate.setCompression(compressionMethod); return this; } @@ -112,23 +102,20 @@ public OtlpGrpcMetricExporterBuilder setCompression(String compressionMethod) { * use the system default trusted certificates. */ public OtlpGrpcMetricExporterBuilder setTrustedCertificates(byte[] trustedCertificatesPem) { - this.trustedCertificatesPem = trustedCertificatesPem; + delegate.setTrustedCertificates(trustedCertificatesPem); return this; } /** * Add header to request. Optional. Applicable only if {@link - * OtlpGrpcMetricExporterBuilder#endpoint} is set to build channel. + * OtlpGrpcMetricExporterBuilder#setChannel(ManagedChannel)} is not used to set channel. * * @param key header key * @param value header value * @return this builder's instance */ public OtlpGrpcMetricExporterBuilder addHeader(String key, String value) { - if (metadata == null) { - metadata = new Metadata(); - } - metadata.put(Metadata.Key.of(key, ASCII_STRING_MARSHALLER), value); + delegate.addHeader(key, value); return this; } @@ -138,37 +125,6 @@ public OtlpGrpcMetricExporterBuilder addHeader(String key, String value) { * @return a new exporter's instance */ public OtlpGrpcMetricExporter build() { - ManagedChannel channel = this.channel; - if (channel == null) { - final ManagedChannelBuilder managedChannelBuilder = - ManagedChannelBuilder.forTarget(endpoint.getAuthority()); - - if (endpoint.getScheme().equals("https")) { - managedChannelBuilder.useTransportSecurity(); - } else { - managedChannelBuilder.usePlaintext(); - } - - if (metadata != null) { - managedChannelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata)); - } - - if (trustedCertificatesPem != null) { - try { - ManagedChannelUtil.setTrustedCertificatesPem( - managedChannelBuilder, trustedCertificatesPem); - } catch (SSLException e) { - throw new IllegalStateException( - "Could not set trusted certificates for gRPC TLS connection, are they valid " - + "X.509 in PEM format?", - e); - } - } - - channel = managedChannelBuilder.build(); - } - return new OtlpGrpcMetricExporter(channel, timeoutNanos, compressionEnabled); + return new OtlpGrpcMetricExporter(delegate.build()); } - - OtlpGrpcMetricExporterBuilder() {} } diff --git a/exporters/otlp/metrics/src/test/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java b/exporters/otlp/metrics/src/test/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java index 41c97c71da9..8bdcfce4940 100644 --- a/exporters/otlp/metrics/src/test/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java +++ b/exporters/otlp/metrics/src/test/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterTest.java @@ -20,6 +20,8 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporter; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder; import io.opentelemetry.exporter.otlp.internal.metrics.ResourceMetricsMarshaler; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; @@ -62,7 +64,7 @@ class OtlpGrpcMetricExporterTest { private final Closer closer = Closer.create(); @RegisterExtension - LogCapturer logs = LogCapturer.create().captureForType(OtlpGrpcMetricExporter.class); + LogCapturer logs = LogCapturer.create().captureForType(DefaultGrpcExporter.class); @BeforeEach public void setup() throws IOException { @@ -353,6 +355,12 @@ void testExport_flush() { } } + @Test + void usingGrpc() { + assertThat(OtlpGrpcMetricExporter.builder().delegate) + .isInstanceOf(DefaultGrpcExporterBuilder.class); + } + private static MetricData generateFakeMetric() { long startNs = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); long endNs = startNs + TimeUnit.MILLISECONDS.toNanos(900); diff --git a/exporters/otlp/metrics/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/metrics/ExportTest.java b/exporters/otlp/metrics/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/metrics/ExportTest.java index efe5df22885..6b438a5dc8e 100644 --- a/exporters/otlp/metrics/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/metrics/ExportTest.java +++ b/exporters/otlp/metrics/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/metrics/ExportTest.java @@ -15,6 +15,7 @@ import com.linecorp.armeria.testing.junit5.server.ServerExtension; import io.grpc.stub.StreamObserver; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; @@ -141,4 +142,10 @@ void tlsBadCert() { .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Could not set trusted certificates"); } + + @Test + void usingGrpc() { + assertThat(OtlpGrpcMetricExporter.builder().delegate) + .isInstanceOf(DefaultGrpcExporterBuilder.class); + } } diff --git a/exporters/otlp/metrics/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/metrics/ExportTest.java b/exporters/otlp/metrics/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/metrics/ExportTest.java index efe5df22885..6b438a5dc8e 100644 --- a/exporters/otlp/metrics/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/metrics/ExportTest.java +++ b/exporters/otlp/metrics/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/metrics/ExportTest.java @@ -15,6 +15,7 @@ import com.linecorp.armeria.testing.junit5.server.ServerExtension; import io.grpc.stub.StreamObserver; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; @@ -141,4 +142,10 @@ void tlsBadCert() { .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Could not set trusted certificates"); } + + @Test + void usingGrpc() { + assertThat(OtlpGrpcMetricExporter.builder().delegate) + .isInstanceOf(DefaultGrpcExporterBuilder.class); + } } diff --git a/exporters/otlp/metrics/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/metrics/ExportTest.java b/exporters/otlp/metrics/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/metrics/ExportTest.java index efe5df22885..6b438a5dc8e 100644 --- a/exporters/otlp/metrics/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/metrics/ExportTest.java +++ b/exporters/otlp/metrics/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/metrics/ExportTest.java @@ -15,6 +15,7 @@ import com.linecorp.armeria.testing.junit5.server.ServerExtension; import io.grpc.stub.StreamObserver; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; @@ -141,4 +142,10 @@ void tlsBadCert() { .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Could not set trusted certificates"); } + + @Test + void usingGrpc() { + assertThat(OtlpGrpcMetricExporter.builder().delegate) + .isInstanceOf(DefaultGrpcExporterBuilder.class); + } } diff --git a/exporters/otlp/metrics/src/testOkHttpOnly/java/io/opentelemetry/exporter/otlp/metrics/OkHttpOnlyExportTest.java b/exporters/otlp/metrics/src/testOkHttpOnly/java/io/opentelemetry/exporter/otlp/metrics/OkHttpOnlyExportTest.java new file mode 100644 index 00000000000..f1e3f372f17 --- /dev/null +++ b/exporters/otlp/metrics/src/testOkHttpOnly/java/io/opentelemetry/exporter/otlp/metrics/OkHttpOnlyExportTest.java @@ -0,0 +1,164 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.metrics; + +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.grpc.protocol.AbstractUnaryGrpcService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.exporter.otlp.internal.grpc.OkHttpGrpcExporterBuilder; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.LongSumData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.resources.Resource; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import okhttp3.tls.HeldCertificate; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class OkHttpOnlyExportTest { + + private static final long START_NS = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); + private static final List METRICS = + Collections.singletonList( + MetricData.createLongSum( + Resource.empty(), + InstrumentationLibraryInfo.empty(), + "name", + "description", + "1", + LongSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.CUMULATIVE, + Collections.singletonList( + LongPointData.create( + START_NS, + START_NS + TimeUnit.MILLISECONDS.toNanos(900), + Attributes.of(stringKey("k"), "v"), + 5))))); + + private static final HeldCertificate HELD_CERTIFICATE; + + static { + try { + HELD_CERTIFICATE = + new HeldCertificate.Builder() + .commonName("localhost") + .addSubjectAlternativeName(InetAddress.getByName("localhost").getCanonicalHostName()) + .build(); + } catch (UnknownHostException e) { + throw new IllegalStateException("Error building certificate.", e); + } + } + + @RegisterExtension + @Order(2) + public static ServerExtension server = + new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.service( + OtlpGrpcMetricExporterBuilder.GRPC_ENDPOINT_PATH, + new AbstractUnaryGrpcService() { + @Override + protected CompletionStage handleMessage( + ServiceRequestContext ctx, byte[] message) { + return CompletableFuture.completedFuture( + ExportMetricsServiceResponse.getDefaultInstance().toByteArray()); + } + }); + sb.http(0); + sb.https(0); + sb.tls(HELD_CERTIFICATE.keyPair().getPrivate(), HELD_CERTIFICATE.certificate()); + } + }; + + // NB: Armeria does not support decompression without using the actual grpc-java (naturally + // this is the same for grpc-java as a test server). The failure does indicate compression, or at + // least some sort of data transformation was attempted. Separate integration tests using the + // OTel collector verify that this is indeed correct compression. + @Test + void gzipCompressionExportButFails() { + OtlpGrpcMetricExporter exporter = + OtlpGrpcMetricExporter.builder() + .setEndpoint("http://localhost:" + server.httpPort()) + .setCompression("gzip") + .build(); + // See note on test method on why this checks isFalse. + assertThat(exporter.export(METRICS).join(10, TimeUnit.SECONDS).isSuccess()).isFalse(); + } + + @Test + void plainTextExport() { + OtlpGrpcMetricExporter exporter = + OtlpGrpcMetricExporter.builder() + .setEndpoint("http://localhost:" + server.httpPort()) + .build(); + assertThat(exporter.export(METRICS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + } + + @Test + void authorityWithAuth() { + OtlpGrpcMetricExporter exporter = + OtlpGrpcMetricExporter.builder() + .setEndpoint("http://foo:bar@localhost:" + server.httpPort()) + .build(); + assertThat(exporter.export(METRICS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + } + + @Test + void testTlsExport() { + OtlpGrpcMetricExporter exporter = + OtlpGrpcMetricExporter.builder() + .setEndpoint("https://localhost:" + server.httpsPort()) + .setTrustedCertificates( + HELD_CERTIFICATE.certificatePem().getBytes(StandardCharsets.UTF_8)) + .build(); + assertThat(exporter.export(METRICS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + } + + @Test + void testTlsExport_untrusted() { + OtlpGrpcMetricExporter exporter = + OtlpGrpcMetricExporter.builder() + .setEndpoint("https://localhost:" + server.httpsPort()) + .build(); + assertThat(exporter.export(METRICS).join(10, TimeUnit.SECONDS).isSuccess()).isFalse(); + } + + @Test + void tlsBadCert() { + assertThatThrownBy( + () -> + OtlpGrpcMetricExporter.builder() + .setTrustedCertificates("foobar".getBytes(StandardCharsets.UTF_8)) + .build()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Could not set trusted certificates"); + } + + @Test + void usingOkhttp() { + assertThat(OtlpGrpcMetricExporter.builder().delegate) + .isInstanceOf(OkHttpGrpcExporterBuilder.class); + } +} diff --git a/exporters/otlp/trace/build.gradle.kts b/exporters/otlp/trace/build.gradle.kts index 4a834952327..84e32c73b82 100644 --- a/exporters/otlp/trace/build.gradle.kts +++ b/exporters/otlp/trace/build.gradle.kts @@ -14,6 +14,7 @@ testSets { create("testGrpcNetty") create("testGrpcNettyShaded") create("testGrpcOkhttp") + create("testOkhttpOnly") // Mainly to conveniently profile through IDEA. Don't add to check task, it's for // manual invocation. @@ -31,6 +32,8 @@ dependencies { compileOnly("io.grpc:grpc-netty-shaded") compileOnly("io.grpc:grpc-okhttp") + compileOnly("com.squareup.okhttp3:okhttp") + api("io.grpc:grpc-stub") implementation("io.grpc:grpc-api") @@ -57,13 +60,27 @@ dependencies { add("testGrpcOkhttpRuntimeOnly", "io.grpc:grpc-okhttp") add("testGrpcOkhttpRuntimeOnly", "org.bouncycastle:bcpkix-jdk15on") + add("testOkhttpOnlyImplementation", "com.linecorp.armeria:armeria-grpc-protocol") + add("testOkhttpOnlyImplementation", "com.linecorp.armeria:armeria-junit5") + add("testOkhttpOnlyImplementation", "com.squareup.okhttp3:okhttp") + add("testOkhttpOnlyImplementation", "com.squareup.okhttp3:okhttp-tls") + add("testOkhttpOnlyRuntimeOnly", "org.bouncycastle:bcpkix-jdk15on") + add("testSpanPipeline", project(":proto")) add("testSpanPipeline", "io.grpc:grpc-protobuf") add("testSpanPipeline", "io.grpc:grpc-testing") } tasks { - named("check") { - dependsOn("testGrpcNetty", "testGrpcNettyShaded", "testGrpcOkhttp") + check { + dependsOn("testGrpcNetty", "testGrpcNettyShaded", "testGrpcOkhttp", "testOkhttpOnly") + } +} + +configurations { + named("testOkhttpOnlyRuntimeClasspath") { + dependencies { + exclude("io.grpc") + } } } diff --git a/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/MarshalerTraceServiceGrpc.java b/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/MarshalerTraceServiceGrpc.java index 66d34e8039b..eed1af6714b 100644 --- a/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/MarshalerTraceServiceGrpc.java +++ b/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/MarshalerTraceServiceGrpc.java @@ -9,6 +9,7 @@ import io.grpc.MethodDescriptor; import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerInputStream; +import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerServiceStub; import io.opentelemetry.exporter.otlp.internal.traces.TraceRequestMarshaler; import java.io.InputStream; @@ -57,7 +58,8 @@ static TraceServiceFutureStub newFutureStub(io.grpc.Channel channel) { } static final class TraceServiceFutureStub - extends io.grpc.stub.AbstractFutureStub { + extends MarshalerServiceStub< + TraceRequestMarshaler, ExportTraceServiceResponse, TraceServiceFutureStub> { private TraceServiceFutureStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { super(channel, callOptions); } @@ -68,7 +70,8 @@ protected MarshalerTraceServiceGrpc.TraceServiceFutureStub build( return new MarshalerTraceServiceGrpc.TraceServiceFutureStub(channel, callOptions); } - com.google.common.util.concurrent.ListenableFuture export( + @Override + public com.google.common.util.concurrent.ListenableFuture export( TraceRequestMarshaler request) { return io.grpc.stub.ClientCalls.futureUnaryCall( getChannel().newCall(getExportMethod, getCallOptions()), request); diff --git a/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporter.java b/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporter.java index 722d1b92d0b..04f990788d9 100644 --- a/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporter.java +++ b/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporter.java @@ -5,79 +5,42 @@ package io.opentelemetry.exporter.otlp.trace; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; -import io.grpc.Codec; -import io.grpc.ManagedChannel; -import io.grpc.Status; -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.BoundLongCounter; -import io.opentelemetry.api.metrics.GlobalMeterProvider; -import io.opentelemetry.api.metrics.LongCounter; -import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.otlp.internal.traces.TraceRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.util.Collection; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; /** Exports spans using OTLP via gRPC, using OpenTelemetry's protobuf model. */ @ThreadSafe public final class OtlpGrpcSpanExporter implements SpanExporter { - private static final AttributeKey EXPORTER_KEY = AttributeKey.stringKey("exporter"); - private static final AttributeKey SUCCESS_KEY = AttributeKey.stringKey("success"); - private static final String EXPORTER_NAME = OtlpGrpcSpanExporter.class.getSimpleName(); - private static final Attributes EXPORTER_NAME_Attributes = - Attributes.of(EXPORTER_KEY, EXPORTER_NAME); - private static final Attributes EXPORT_SUCCESS_ATTRIBUTES = - Attributes.of(EXPORTER_KEY, EXPORTER_NAME, SUCCESS_KEY, "true"); - private static final Attributes EXPORT_FAILURE_ATTRIBUTES = - Attributes.of(EXPORTER_KEY, EXPORTER_NAME, SUCCESS_KEY, "false"); - private static final Logger internalLogger = - Logger.getLogger(OtlpGrpcSpanExporter.class.getName()); + private final GrpcExporter delegate; - private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); - - private final MarshalerTraceServiceGrpc.TraceServiceFutureStub traceService; - - private final ManagedChannel managedChannel; - private final long timeoutNanos; - private final BoundLongCounter spansSeen; - private final BoundLongCounter spansExportedSuccess; - private final BoundLongCounter spansExportedFailure; + /** + * Returns a new {@link OtlpGrpcSpanExporter} reading the configuration values from the + * environment and from system properties. System properties override values defined in the + * environment. If a configuration value is missing, it uses the default value. + * + * @return a new {@link OtlpGrpcSpanExporter} instance. + */ + public static OtlpGrpcSpanExporter getDefault() { + return builder().build(); + } /** - * Creates a new OTLP gRPC Span Reporter with the given name, using the given channel. + * Returns a new builder instance for this exporter. * - * @param channel the channel to use when communicating with the OpenTelemetry Collector. - * @param timeoutNanos max waiting time for the collector to process each span batch. When set to - * 0 or to a negative value, the exporter will wait indefinitely. - * @param compressionEnabled whether or not to enable gzip compression. + * @return a new builder instance for this exporter. */ - OtlpGrpcSpanExporter(ManagedChannel channel, long timeoutNanos, boolean compressionEnabled) { - // TODO: telemetry schema version. - Meter meter = GlobalMeterProvider.get().meterBuilder("io.opentelemetry.exporters.otlp").build(); - this.spansSeen = - meter.counterBuilder("spansSeenByExporter").build().bind(EXPORTER_NAME_Attributes); - LongCounter spansExportedCounter = meter.counterBuilder("spansExportedByExporter").build(); - this.spansExportedSuccess = spansExportedCounter.bind(EXPORT_SUCCESS_ATTRIBUTES); - this.spansExportedFailure = spansExportedCounter.bind(EXPORT_FAILURE_ATTRIBUTES); - this.managedChannel = channel; - this.timeoutNanos = timeoutNanos; - Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE; - this.traceService = - MarshalerTraceServiceGrpc.newFutureStub(channel) - .withCompression(codec.getMessageEncoding()); + public static OtlpGrpcSpanExporterBuilder builder() { + return new OtlpGrpcSpanExporterBuilder(); + } + + OtlpGrpcSpanExporter(GrpcExporter delegate) { + this.delegate = delegate; } /** @@ -88,62 +51,9 @@ public final class OtlpGrpcSpanExporter implements SpanExporter { */ @Override public CompletableResultCode export(Collection spans) { - spansSeen.add(spans.size()); TraceRequestMarshaler request = TraceRequestMarshaler.create(spans); - final CompletableResultCode result = new CompletableResultCode(); - - MarshalerTraceServiceGrpc.TraceServiceFutureStub exporter; - if (timeoutNanos > 0) { - exporter = traceService.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS); - } else { - exporter = traceService; - } - - Futures.addCallback( - exporter.export(request), - new FutureCallback() { - @Override - public void onSuccess(@Nullable ExportTraceServiceResponse response) { - spansExportedSuccess.add(spans.size()); - result.succeed(); - } - - @Override - public void onFailure(Throwable t) { - spansExportedFailure.add(spans.size()); - Status status = Status.fromThrowable(t); - switch (status.getCode()) { - case UNIMPLEMENTED: - logger.log( - Level.SEVERE, - "Failed to export spans. Server responded with UNIMPLEMENTED. " - + "This usually means that your collector is not configured with an otlp " - + "receiver in the \"pipelines\" section of the configuration. " - + "Full error message: " - + t.getMessage()); - break; - case UNAVAILABLE: - logger.log( - Level.SEVERE, - "Failed to export spans. Server is UNAVAILABLE. " - + "Make sure your collector is running and reachable from this network. " - + "Full error message:" - + t.getMessage()); - break; - default: - logger.log( - Level.WARNING, "Failed to export spans. Error message: " + t.getMessage()); - break; - } - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "Failed to export spans. Details follow: " + t); - } - result.fail(); - } - }, - MoreExecutors.directExecutor()); - return result; + return delegate.export(request, spans.size()); } /** @@ -156,43 +66,12 @@ public CompletableResultCode flush() { return CompletableResultCode.ofSuccess(); } - /** - * Returns a new builder instance for this exporter. - * - * @return a new builder instance for this exporter. - */ - public static OtlpGrpcSpanExporterBuilder builder() { - return new OtlpGrpcSpanExporterBuilder(); - } - - /** - * Returns a new {@link OtlpGrpcSpanExporter} reading the configuration values from the - * environment and from system properties. System properties override values defined in the - * environment. If a configuration value is missing, it uses the default value. - * - * @return a new {@link OtlpGrpcSpanExporter} instance. - */ - public static OtlpGrpcSpanExporter getDefault() { - return builder().build(); - } - /** * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately * cancelled. */ @Override public CompletableResultCode shutdown() { - if (managedChannel.isTerminated()) { - return CompletableResultCode.ofSuccess(); - } - this.spansSeen.unbind(); - this.spansExportedSuccess.unbind(); - this.spansExportedFailure.unbind(); - return ManagedChannelUtil.shutdownChannel(managedChannel); - } - - // Visible for testing - long getTimeoutNanos() { - return timeoutNanos; + return delegate.shutdown(); } } diff --git a/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java b/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java index 6a0f3c7deb0..942b9daf35d 100644 --- a/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java +++ b/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java @@ -5,35 +5,40 @@ package io.opentelemetry.exporter.otlp.trace; -import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; import static io.opentelemetry.api.internal.Utils.checkArgument; import static java.util.Objects.requireNonNull; import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.Metadata; -import io.grpc.stub.MetadataUtils; -import io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporterBuilder; +import io.opentelemetry.exporter.otlp.internal.traces.TraceRequestMarshaler; import java.net.URI; -import java.net.URISyntaxException; import java.time.Duration; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import javax.net.ssl.SSLException; /** Builder utility for this exporter. */ public final class OtlpGrpcSpanExporterBuilder { + // Visible for testing + static final String GRPC_ENDPOINT_PATH = + "/opentelemetry.proto.collector.trace.v1.TraceService/Export"; + private static final String DEFAULT_ENDPOINT_URL = "http://localhost:4317"; private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL); private static final long DEFAULT_TIMEOUT_SECS = 10; - @Nullable private ManagedChannel channel; - private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS); - private URI endpoint = DEFAULT_ENDPOINT; - private boolean compressionEnabled = false; - @Nullable private Metadata metadata; - @Nullable private byte[] trustedCertificatesPem; + // Visible for testing + final GrpcExporterBuilder delegate; + + OtlpGrpcSpanExporterBuilder() { + delegate = + GrpcExporter.builder( + "span", + DEFAULT_TIMEOUT_SECS, + DEFAULT_ENDPOINT, + () -> MarshalerTraceServiceGrpc::newFutureStub, + GRPC_ENDPOINT_PATH); + } /** * Sets the managed chanel to use when communicating with the backend. Takes precedence over @@ -43,7 +48,7 @@ public final class OtlpGrpcSpanExporterBuilder { * @return this builder's instance */ public OtlpGrpcSpanExporterBuilder setChannel(ManagedChannel channel) { - this.channel = channel; + delegate.setChannel(channel); return this; } @@ -54,7 +59,7 @@ public OtlpGrpcSpanExporterBuilder setChannel(ManagedChannel channel) { public OtlpGrpcSpanExporterBuilder setTimeout(long timeout, TimeUnit unit) { requireNonNull(unit, "unit"); checkArgument(timeout >= 0, "timeout must be non-negative"); - timeoutNanos = unit.toNanos(timeout); + delegate.setTimeout(timeout, unit); return this; } @@ -64,7 +69,8 @@ public OtlpGrpcSpanExporterBuilder setTimeout(long timeout, TimeUnit unit) { */ public OtlpGrpcSpanExporterBuilder setTimeout(Duration timeout) { requireNonNull(timeout, "timeout"); - return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + delegate.setTimeout(timeout); + return this; } /** @@ -73,21 +79,7 @@ public OtlpGrpcSpanExporterBuilder setTimeout(Duration timeout) { */ public OtlpGrpcSpanExporterBuilder setEndpoint(String endpoint) { requireNonNull(endpoint, "endpoint"); - - URI uri; - try { - uri = new URI(endpoint); - } catch (URISyntaxException e) { - throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e); - } - - if (uri.getScheme() == null - || (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) { - throw new IllegalArgumentException( - "Invalid endpoint, must start with http:// or https://: " + uri); - } - - this.endpoint = uri; + delegate.setEndpoint(endpoint); return this; } @@ -100,9 +92,7 @@ public OtlpGrpcSpanExporterBuilder setCompression(String compressionMethod) { checkArgument( compressionMethod.equals("gzip") || compressionMethod.equals("none"), "Unsupported compression method. Supported compression methods include: gzip, none."); - if (compressionMethod.equals("gzip")) { - this.compressionEnabled = true; - } + delegate.setCompression(compressionMethod); return this; } @@ -112,23 +102,20 @@ public OtlpGrpcSpanExporterBuilder setCompression(String compressionMethod) { * use the system default trusted certificates. */ public OtlpGrpcSpanExporterBuilder setTrustedCertificates(byte[] trustedCertificatesPem) { - this.trustedCertificatesPem = trustedCertificatesPem; + delegate.setTrustedCertificates(trustedCertificatesPem); return this; } /** * Add header to request. Optional. Applicable only if {@link - * OtlpGrpcSpanExporterBuilder#endpoint} is set to build channel. + * OtlpGrpcSpanExporterBuilder#setChannel(ManagedChannel)} is not called. * * @param key header key * @param value header value * @return this builder's instance */ public OtlpGrpcSpanExporterBuilder addHeader(String key, String value) { - if (metadata == null) { - metadata = new Metadata(); - } - metadata.put(Metadata.Key.of(key, ASCII_STRING_MARSHALLER), value); + delegate.addHeader(key, value); return this; } @@ -138,37 +125,6 @@ public OtlpGrpcSpanExporterBuilder addHeader(String key, String value) { * @return a new exporter's instance */ public OtlpGrpcSpanExporter build() { - ManagedChannel channel = this.channel; - if (channel == null) { - final ManagedChannelBuilder managedChannelBuilder = - ManagedChannelBuilder.forTarget(endpoint.getAuthority()); - - if (endpoint.getScheme().equals("https")) { - managedChannelBuilder.useTransportSecurity(); - } else { - managedChannelBuilder.usePlaintext(); - } - - if (metadata != null) { - managedChannelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata)); - } - - if (trustedCertificatesPem != null) { - try { - ManagedChannelUtil.setTrustedCertificatesPem( - managedChannelBuilder, trustedCertificatesPem); - } catch (SSLException e) { - throw new IllegalStateException( - "Could not set trusted certificates for gRPC TLS connection, are they valid " - + "X.509 in PEM format?", - e); - } - } - - channel = managedChannelBuilder.build(); - } - return new OtlpGrpcSpanExporter(channel, timeoutNanos, compressionEnabled); + return new OtlpGrpcSpanExporter(delegate.build()); } - - OtlpGrpcSpanExporterBuilder() {} } diff --git a/exporters/otlp/trace/src/test/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterTest.java b/exporters/otlp/trace/src/test/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterTest.java index 301b1664ae8..88c07aa75b5 100644 --- a/exporters/otlp/trace/src/test/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterTest.java +++ b/exporters/otlp/trace/src/test/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterTest.java @@ -23,6 +23,8 @@ import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporter; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder; import io.opentelemetry.exporter.otlp.internal.traces.ResourceSpansMarshaler; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; @@ -64,7 +66,7 @@ class OtlpGrpcSpanExporterTest { private final Closer closer = Closer.create(); @RegisterExtension - LogCapturer logs = LogCapturer.create().captureForType(OtlpGrpcSpanExporter.class); + LogCapturer logs = LogCapturer.create().captureForType(DefaultGrpcExporter.class); @BeforeEach public void setup() throws IOException { @@ -351,6 +353,12 @@ void testExport_PermissionDenied() { } } + @Test + void usingGrpc() { + assertThat(OtlpGrpcSpanExporter.builder().delegate) + .isInstanceOf(DefaultGrpcExporterBuilder.class); + } + private static SpanData generateFakeSpan() { long duration = TimeUnit.MILLISECONDS.toNanos(900); long startNs = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); diff --git a/exporters/otlp/trace/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java b/exporters/otlp/trace/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java index 265c59aed42..879f99ad4c8 100644 --- a/exporters/otlp/trace/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java +++ b/exporters/otlp/trace/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java @@ -14,6 +14,7 @@ import com.linecorp.armeria.testing.junit5.server.ServerExtension; import io.grpc.stub.StreamObserver; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc; @@ -126,4 +127,10 @@ void tlsBadCert() { .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Could not set trusted certificates"); } + + @Test + void usingGrpc() { + assertThat(OtlpGrpcSpanExporter.builder().delegate) + .isInstanceOf(DefaultGrpcExporterBuilder.class); + } } diff --git a/exporters/otlp/trace/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java b/exporters/otlp/trace/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java index 265c59aed42..879f99ad4c8 100644 --- a/exporters/otlp/trace/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java +++ b/exporters/otlp/trace/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java @@ -14,6 +14,7 @@ import com.linecorp.armeria.testing.junit5.server.ServerExtension; import io.grpc.stub.StreamObserver; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc; @@ -126,4 +127,10 @@ void tlsBadCert() { .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Could not set trusted certificates"); } + + @Test + void usingGrpc() { + assertThat(OtlpGrpcSpanExporter.builder().delegate) + .isInstanceOf(DefaultGrpcExporterBuilder.class); + } } diff --git a/exporters/otlp/trace/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java b/exporters/otlp/trace/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java index 265c59aed42..879f99ad4c8 100644 --- a/exporters/otlp/trace/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java +++ b/exporters/otlp/trace/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java @@ -14,6 +14,7 @@ import com.linecorp.armeria.testing.junit5.server.ServerExtension; import io.grpc.stub.StreamObserver; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc; @@ -126,4 +127,10 @@ void tlsBadCert() { .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Could not set trusted certificates"); } + + @Test + void usingGrpc() { + assertThat(OtlpGrpcSpanExporter.builder().delegate) + .isInstanceOf(DefaultGrpcExporterBuilder.class); + } } diff --git a/exporters/otlp/trace/src/testOkhttpOnly/java/io/opentelemetry/exporter/otlp/trace/OkHttpOnlyExportTest.java b/exporters/otlp/trace/src/testOkhttpOnly/java/io/opentelemetry/exporter/otlp/trace/OkHttpOnlyExportTest.java new file mode 100644 index 00000000000..c47e0b402d2 --- /dev/null +++ b/exporters/otlp/trace/src/testOkhttpOnly/java/io/opentelemetry/exporter/otlp/trace/OkHttpOnlyExportTest.java @@ -0,0 +1,151 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.trace; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.grpc.protocol.AbstractUnaryGrpcService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.exporter.otlp.internal.grpc.OkHttpGrpcExporterBuilder; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; +import io.opentelemetry.sdk.testing.trace.TestSpanData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import okhttp3.tls.HeldCertificate; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class OkHttpOnlyExportTest { + + private static final List SPANS = + Collections.singletonList( + TestSpanData.builder() + .setName("name") + .setKind(SpanKind.CLIENT) + .setStartEpochNanos(1) + .setEndEpochNanos(2) + .setStatus(StatusData.ok()) + .setHasEnded(true) + .build()); + + private static final HeldCertificate HELD_CERTIFICATE; + + static { + try { + HELD_CERTIFICATE = + new HeldCertificate.Builder() + .commonName("localhost") + .addSubjectAlternativeName(InetAddress.getByName("localhost").getCanonicalHostName()) + .build(); + } catch (UnknownHostException e) { + throw new IllegalStateException("Error building certificate.", e); + } + } + + @RegisterExtension + @Order(2) + public static ServerExtension server = + new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.service( + OtlpGrpcSpanExporterBuilder.GRPC_ENDPOINT_PATH, + new AbstractUnaryGrpcService() { + @Override + protected CompletionStage handleMessage( + ServiceRequestContext ctx, byte[] message) { + return CompletableFuture.completedFuture( + ExportTraceServiceResponse.getDefaultInstance().toByteArray()); + } + }); + sb.http(0); + sb.https(0); + sb.tls(HELD_CERTIFICATE.keyPair().getPrivate(), HELD_CERTIFICATE.certificate()); + } + }; + + // NB: Armeria does not support decompression without using the actual grpc-java (naturally + // this is the same for grpc-java as a test server). The failure does indicate compression, or at + // least some sort of data transformation was attempted. Separate integration tests using the + // OTel collector verify that this is indeed correct compression. + @Test + void gzipCompressionExportAttemptedButFails() { + OtlpGrpcSpanExporter exporter = + OtlpGrpcSpanExporter.builder() + .setEndpoint("http://localhost:" + server.httpPort()) + .setCompression("gzip") + .build(); + + // See note on test method on why this checks isFalse. + assertThat(exporter.export(SPANS).join(10, TimeUnit.SECONDS).isSuccess()).isFalse(); + } + + @Test + void plainTextExport() { + OtlpGrpcSpanExporter exporter = + OtlpGrpcSpanExporter.builder().setEndpoint("http://localhost:" + server.httpPort()).build(); + assertThat(exporter.export(SPANS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + } + + @Test + void authorityWithAuth() { + OtlpGrpcSpanExporter exporter = + OtlpGrpcSpanExporter.builder() + .setEndpoint("http://foo:bar@localhost:" + server.httpPort()) + .build(); + assertThat(exporter.export(SPANS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + } + + @Test + void testTlsExport() throws Exception { + OtlpGrpcSpanExporter exporter = + OtlpGrpcSpanExporter.builder() + .setEndpoint("https://localhost:" + server.httpsPort()) + .setTrustedCertificates( + HELD_CERTIFICATE.certificatePem().getBytes(StandardCharsets.UTF_8)) + .build(); + assertThat(exporter.export(SPANS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + } + + @Test + void testTlsExport_untrusted() { + OtlpGrpcSpanExporter exporter = + OtlpGrpcSpanExporter.builder() + .setEndpoint("https://localhost:" + server.httpsPort()) + .build(); + assertThat(exporter.export(SPANS).join(10, TimeUnit.SECONDS).isSuccess()).isFalse(); + } + + @Test + void tlsBadCert() { + assertThatThrownBy( + () -> + OtlpGrpcSpanExporter.builder() + .setTrustedCertificates("foobar".getBytes(StandardCharsets.UTF_8)) + .build()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Could not set trusted certificates"); + } + + @Test + void usingOkhttp() { + assertThat(OtlpGrpcSpanExporter.builder().delegate) + .isInstanceOf(OkHttpGrpcExporterBuilder.class); + } +} diff --git a/integration-tests/build.gradle.kts b/integration-tests/build.gradle.kts index df03f57a632..a23bc956bcf 100644 --- a/integration-tests/build.gradle.kts +++ b/integration-tests/build.gradle.kts @@ -1,33 +1,73 @@ plugins { id("otel.java-conventions") + + id("org.unbroken-dome.test-sets") } description = "OpenTelemetry Integration Tests" otelJava.moduleName.set("io.opentelemetry.integration.tests") +testSets { + create("testJaeger") + + libraries { + create("testOtlpCommon") + } + + create("testOtlp") { + imports("testOtlpCommon") + } + + create("testOtlpNoGrpcJava") { + imports("testOtlpCommon") + } +} + dependencies { testImplementation(project(":sdk:all")) testImplementation(project(":sdk:testing")) testImplementation(project(":extensions:trace-propagators")) - testImplementation(project(":exporters:jaeger")) - testImplementation(project(":exporters:otlp:trace")) - testImplementation(project(":exporters:otlp:metrics")) - testImplementation(project(":exporters:otlp:logs")) - testImplementation(project(":exporters:otlp-http:trace")) - testImplementation(project(":exporters:otlp-http:metrics")) - testImplementation(project(":exporters:otlp-http:logs")) - testImplementation(project(":semconv")) - testImplementation(project(":proto")) - - testImplementation("io.grpc:grpc-protobuf") - testImplementation("com.google.protobuf:protobuf-java") - testImplementation("io.grpc:grpc-netty-shaded") - testImplementation("org.junit.jupiter:junit-jupiter-params") - testImplementation("com.fasterxml.jackson.core:jackson-databind") - testImplementation("com.linecorp.armeria:armeria-grpc") + testImplementation("com.google.protobuf:protobuf-java") testImplementation("com.linecorp.armeria:armeria-junit5") - testImplementation("org.testcontainers:junit-jupiter") + testImplementation("com.fasterxml.jackson.core:jackson-databind") testImplementation("com.squareup.okhttp3:okhttp") + testImplementation("org.junit.jupiter:junit-jupiter-params") + testImplementation("org.testcontainers:junit-jupiter") testImplementation("org.slf4j:slf4j-simple") + + add("testOtlpCommonImplementation", project(":exporters:otlp:trace")) + add("testOtlpCommonImplementation", project(":exporters:otlp:metrics")) + add("testOtlpCommonImplementation", project(":exporters:otlp:logs")) + add("testOtlpCommonImplementation", project(":exporters:otlp-http:logs")) + add("testOtlpCommonImplementation", project(":exporters:otlp-http:metrics")) + add("testOtlpCommonImplementation", project(":exporters:otlp-http:trace")) + add("testOtlpCommonImplementation", project(":semconv")) + add("testOtlpCommonImplementation", project(":proto")) + add("testOtlpCommonImplementation", "com.linecorp.armeria:armeria-grpc-protocol") + add("testOtlpCommonImplementation", "com.linecorp.armeria:armeria-junit5") + add("testOtlpCommonImplementation", "org.assertj:assertj-core") + add("testOtlpCommonImplementation", "org.awaitility:awaitility") + add("testOtlpCommonImplementation", "org.junit.jupiter:junit-jupiter-params") + add("testOtlpCommonImplementation", "org.testcontainers:junit-jupiter") + + add("testOtlpRuntimeOnly", "io.grpc:grpc-netty-shaded") + + add("testJaegerImplementation", project(":exporters:jaeger")) + add("testJaegerImplementation", project(":semconv")) + add("testJaegerRuntimeOnly", "io.grpc:grpc-netty-shaded") +} + +tasks { + check { + dependsOn("testJaeger", "testOtlp", "testOtlpNoGrpcJava") + } +} + +configurations { + named("testOtlpNoGrpcJavaRuntimeClasspath") { + dependencies { + exclude("io.grpc") + } + } } diff --git a/integration-tests/src/test/java/io/opentelemetry/JaegerExporterIntegrationTest.java b/integration-tests/src/testJaeger/java/io/opentelemetry/integrationtest/JaegerExporterIntegrationTest.java similarity index 99% rename from integration-tests/src/test/java/io/opentelemetry/JaegerExporterIntegrationTest.java rename to integration-tests/src/testJaeger/java/io/opentelemetry/integrationtest/JaegerExporterIntegrationTest.java index 75af5f12655..90fd1b14e90 100644 --- a/integration-tests/src/test/java/io/opentelemetry/JaegerExporterIntegrationTest.java +++ b/integration-tests/src/testJaeger/java/io/opentelemetry/integrationtest/JaegerExporterIntegrationTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry; +package io.opentelemetry.integrationtest; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/integration-tests/src/testOtlp/java/io/opentelemetry/integrationtest/GrpcJavaOtlpIntegrationTest.java b/integration-tests/src/testOtlp/java/io/opentelemetry/integrationtest/GrpcJavaOtlpIntegrationTest.java new file mode 100644 index 00000000000..250c85700ec --- /dev/null +++ b/integration-tests/src/testOtlp/java/io/opentelemetry/integrationtest/GrpcJavaOtlpIntegrationTest.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.integrationtest; + +import static org.assertj.core.api.Assertions.assertThatCode; + +import org.junit.jupiter.api.Test; + +class GrpcJavaOtlpIntegrationTest extends OtlpExporterIntegrationTest { + + @Test + void noGrpcFound() { + assertThatCode(() -> Class.forName("io.grpc.ManagedChannel")).doesNotThrowAnyException(); + } +} diff --git a/integration-tests/src/test/java/io/opentelemetry/OtlpExporterIntegrationTest.java b/integration-tests/src/testOtlpCommon/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java similarity index 83% rename from integration-tests/src/test/java/io/opentelemetry/OtlpExporterIntegrationTest.java rename to integration-tests/src/testOtlpCommon/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java index 4becd699a8b..b36883c1fc7 100644 --- a/integration-tests/src/test/java/io/opentelemetry/OtlpExporterIntegrationTest.java +++ b/integration-tests/src/testOtlpCommon/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java @@ -3,15 +3,17 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry; +package io.opentelemetry.integrationtest; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.testcontainers.Testcontainers.exposeHostPorts; +import com.google.protobuf.InvalidProtocolBufferException; import com.linecorp.armeria.server.ServerBuilder; -import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.grpc.protocol.AbstractUnaryGrpcService; import com.linecorp.armeria.testing.junit5.server.ServerExtension; -import io.grpc.stub.StreamObserver; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongCounter; @@ -30,13 +32,10 @@ import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; -import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; -import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; -import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc; import io.opentelemetry.proto.common.v1.AnyValue; import io.opentelemetry.proto.common.v1.KeyValue; import io.opentelemetry.proto.logs.v1.InstrumentationLibraryLogs; @@ -64,11 +63,13 @@ import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import java.io.UncheckedIOException; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -77,7 +78,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.testcontainers.containers.BindMode; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.Wait; @@ -91,7 +93,7 @@ * the data. */ @Testcontainers(disabledWithoutDocker = true) -class OtlpExporterIntegrationTest { +abstract class OtlpExporterIntegrationTest { private static final String COLLECTOR_IMAGE = "ghcr.io/open-telemetry/opentelemetry-java/otel-collector"; @@ -149,8 +151,9 @@ void beforeEach() { @AfterEach void afterEach() {} - @Test - void testOtlpGrpcTraceExport() { + @ParameterizedTest + @ValueSource(strings = {"gzip", "none"}) + void testOtlpGrpcTraceExport(String compression) { SpanExporter otlpGrpcTraceExporter = OtlpGrpcSpanExporter.builder() .setEndpoint( @@ -158,14 +161,15 @@ void testOtlpGrpcTraceExport() { + collector.getHost() + ":" + collector.getMappedPort(COLLECTOR_OTLP_GRPC_PORT)) - .setCompression("gzip") + .setCompression(compression) .build(); testTraceExport(otlpGrpcTraceExporter); } - @Test - void testOtlpHttpTraceExport() { + @ParameterizedTest + @ValueSource(strings = {"gzip", "none"}) + void testOtlpHttpTraceExport(String compression) { SpanExporter otlpGrpcTraceExporter = OtlpHttpSpanExporter.builder() .setEndpoint( @@ -174,7 +178,7 @@ void testOtlpHttpTraceExport() { + ":" + collector.getMappedPort(COLLECTOR_OTLP_HTTP_PORT) + "/v1/traces") - .setCompression("gzip") + .setCompression(compression) .build(); testTraceExport(otlpGrpcTraceExporter); @@ -245,8 +249,9 @@ private static void testTraceExport(SpanExporter spanExporter) { assertThat(link.getSpanId().toByteArray()).isEqualTo(linkContext.getSpanIdBytes()); } - @Test - void testOtlpGrpcMetricExport() { + @ParameterizedTest + @ValueSource(strings = {"gzip", "none"}) + void testOtlpGrpcMetricExport(String compression) { MetricExporter otlpGrpcMetricExporter = OtlpGrpcMetricExporter.builder() .setEndpoint( @@ -254,14 +259,15 @@ void testOtlpGrpcMetricExport() { + collector.getHost() + ":" + collector.getMappedPort(COLLECTOR_OTLP_GRPC_PORT)) - .setCompression("gzip") + .setCompression(compression) .build(); testMetricExport(otlpGrpcMetricExporter); } - @Test - void testOtlpHttpMetricExport() { + @ParameterizedTest + @ValueSource(strings = {"gzip", "none"}) + void testOtlpHttpMetricExport(String compression) { MetricExporter otlpGrpcMetricExporter = OtlpHttpMetricExporter.builder() .setEndpoint( @@ -270,7 +276,7 @@ void testOtlpHttpMetricExport() { + ":" + collector.getMappedPort(COLLECTOR_OTLP_HTTP_PORT) + "/v1/metrics") - .setCompression("gzip") + .setCompression(compression) .build(); testMetricExport(otlpGrpcMetricExporter); @@ -331,8 +337,9 @@ private static void testMetricExport(MetricExporter metricExporter) { .build())); } - @Test - void testOtlpGrpcLogExport() { + @ParameterizedTest + @ValueSource(strings = {"gzip", "none"}) + void testOtlpGrpcLogExport(String compression) { LogExporter otlpGrpcLogExporter = OtlpGrpcLogExporter.builder() .setEndpoint( @@ -340,14 +347,15 @@ void testOtlpGrpcLogExport() { + collector.getHost() + ":" + collector.getMappedPort(COLLECTOR_OTLP_GRPC_PORT)) - .setCompression("gzip") + .setCompression(compression) .build(); testLogExporter(otlpGrpcLogExporter); } - @Test - void testOtlpHttpLogExport() { + @ParameterizedTest + @ValueSource(strings = {"gzip", "none"}) + void testOtlpHttpLogExport(String compression) { LogExporter otlpHttpLogExporter = OtlpHttpLogExporter.builder() .setEndpoint( @@ -356,7 +364,7 @@ void testOtlpHttpLogExport() { + ":" + collector.getMappedPort(COLLECTOR_OTLP_HTTP_PORT) + "/v1/logs") - .setCompression("gzip") + .setCompression(compression) .build(); testLogExporter(otlpHttpLogExporter); @@ -438,41 +446,48 @@ private void reset() { @Override protected void configure(ServerBuilder sb) { sb.service( - GrpcService.builder() - .addService( - new TraceServiceGrpc.TraceServiceImplBase() { - @Override - public void export( - ExportTraceServiceRequest request, - StreamObserver responseObserver) { - traceRequests.add(request); - responseObserver.onNext(ExportTraceServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } - }) - .addService( - new MetricsServiceGrpc.MetricsServiceImplBase() { - @Override - public void export( - ExportMetricsServiceRequest request, - StreamObserver responseObserver) { - metricRequests.add(request); - responseObserver.onNext(ExportMetricsServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } - }) - .addService( - new LogsServiceGrpc.LogsServiceImplBase() { - @Override - public void export( - ExportLogsServiceRequest request, - StreamObserver responseObserver) { - logRequests.add(request); - responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } - }) - .build()); + "/opentelemetry.proto.collector.trace.v1.TraceService/Export", + new AbstractUnaryGrpcService() { + @Override + protected CompletionStage handleMessage( + ServiceRequestContext ctx, byte[] message) { + try { + traceRequests.add(ExportTraceServiceRequest.parseFrom(message)); + } catch (InvalidProtocolBufferException e) { + throw new UncheckedIOException(e); + } + return completedFuture(ExportTraceServiceResponse.getDefaultInstance().toByteArray()); + } + }); + sb.service( + "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export", + new AbstractUnaryGrpcService() { + @Override + protected CompletionStage handleMessage( + ServiceRequestContext ctx, byte[] message) { + try { + metricRequests.add(ExportMetricsServiceRequest.parseFrom(message)); + } catch (InvalidProtocolBufferException e) { + throw new UncheckedIOException(e); + } + return completedFuture( + ExportMetricsServiceResponse.getDefaultInstance().toByteArray()); + } + }); + sb.service( + "/opentelemetry.proto.collector.logs.v1.LogsService/Export", + new AbstractUnaryGrpcService() { + @Override + protected CompletionStage handleMessage( + ServiceRequestContext ctx, byte[] message) { + try { + logRequests.add(ExportLogsServiceRequest.parseFrom(message)); + } catch (InvalidProtocolBufferException e) { + throw new UncheckedIOException(e); + } + return completedFuture(ExportLogsServiceResponse.getDefaultInstance().toByteArray()); + } + }); sb.http(0); } } diff --git a/integration-tests/src/test/resources/otel-config.yaml b/integration-tests/src/testOtlpCommon/resources/otel-config.yaml similarity index 100% rename from integration-tests/src/test/resources/otel-config.yaml rename to integration-tests/src/testOtlpCommon/resources/otel-config.yaml diff --git a/integration-tests/src/testOtlpNoGrpcJava/java/io/opentelemetry/integrationtest/NoGrpcJavaOtlpIntegrationTest.java b/integration-tests/src/testOtlpNoGrpcJava/java/io/opentelemetry/integrationtest/NoGrpcJavaOtlpIntegrationTest.java new file mode 100644 index 00000000000..266ef8ed793 --- /dev/null +++ b/integration-tests/src/testOtlpNoGrpcJava/java/io/opentelemetry/integrationtest/NoGrpcJavaOtlpIntegrationTest.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.integrationtest; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +class NoGrpcJavaOtlpIntegrationTest extends OtlpExporterIntegrationTest { + + @Test + void noGrpcFound() { + assertThatThrownBy(() -> Class.forName("io.grpc.ManagedChannel")) + .isInstanceOf(ClassNotFoundException.class); + } +} diff --git a/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/SpanExporterConfigurationTest.java b/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/SpanExporterConfigurationTest.java index 423fdad9b39..957c6e1e0cf 100644 --- a/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/SpanExporterConfigurationTest.java +++ b/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/SpanExporterConfigurationTest.java @@ -45,7 +45,7 @@ void configureOtlpTimeout() { OtlpGrpcSpanExporter.class, otlp -> assertThat(otlp) - .extracting("timeoutNanos") + .extracting("delegate.timeoutNanos") .isEqualTo(TimeUnit.MILLISECONDS.toNanos(10L))); } finally { exporter.shutdown(); diff --git a/sdk-extensions/autoconfigure/src/testOtlpGrpc/java/io/opentelemetry/sdk/autoconfigure/OtlpGrpcConfigTest.java b/sdk-extensions/autoconfigure/src/testOtlpGrpc/java/io/opentelemetry/sdk/autoconfigure/OtlpGrpcConfigTest.java index 12d6fe85cb8..1d0dd2c641c 100644 --- a/sdk-extensions/autoconfigure/src/testOtlpGrpc/java/io/opentelemetry/sdk/autoconfigure/OtlpGrpcConfigTest.java +++ b/sdk-extensions/autoconfigure/src/testOtlpGrpc/java/io/opentelemetry/sdk/autoconfigure/OtlpGrpcConfigTest.java @@ -140,7 +140,9 @@ void configureExportersGeneral() { MetricExporter metricExporter = MetricExporterConfiguration.configureOtlpMetrics(properties, SdkMeterProvider.builder()); - assertThat(spanExporter).extracting("timeoutNanos").isEqualTo(TimeUnit.SECONDS.toNanos(15)); + assertThat(spanExporter) + .extracting("delegate.timeoutNanos") + .isEqualTo(TimeUnit.SECONDS.toNanos(15)); assertThat( spanExporter .export(Lists.newArrayList(generateFakeSpan())) @@ -156,7 +158,9 @@ void configureExportersGeneral() { && headers.contains("header-key", "header-value") && headers.contains("grpc-encoding", "gzip")); - assertThat(metricExporter).extracting("timeoutNanos").isEqualTo(TimeUnit.SECONDS.toNanos(15)); + assertThat(metricExporter) + .extracting("delegate.timeoutNanos") + .isEqualTo(TimeUnit.SECONDS.toNanos(15)); assertThat( metricExporter .export(Lists.newArrayList(generateFakeMetric())) @@ -193,7 +197,9 @@ void configureSpanExporter() { SpanExporterConfiguration.configureExporter( "otlp", DefaultConfigProperties.createForTest(props), Collections.emptyMap()); - assertThat(spanExporter).extracting("timeoutNanos").isEqualTo(TimeUnit.SECONDS.toNanos(15)); + assertThat(spanExporter) + .extracting("delegate.timeoutNanos") + .isEqualTo(TimeUnit.SECONDS.toNanos(15)); assertThat( spanExporter .export(Lists.newArrayList(generateFakeSpan())) @@ -229,7 +235,9 @@ public void configureMetricExporter() { MetricExporterConfiguration.configureOtlpMetrics( DefaultConfigProperties.createForTest(props), SdkMeterProvider.builder()); - assertThat(metricExporter).extracting("timeoutNanos").isEqualTo(TimeUnit.SECONDS.toNanos(15)); + assertThat(metricExporter) + .extracting("delegate.timeoutNanos") + .isEqualTo(TimeUnit.SECONDS.toNanos(15)); assertThat( metricExporter .export(Lists.newArrayList(generateFakeMetric()))