diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporter.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporter.java new file mode 100644 index 00000000000..47ad16d19e3 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporter.java @@ -0,0 +1,142 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Codec; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.BoundLongCounter; +import io.opentelemetry.api.metrics.GlobalMeterProvider; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.internal.ThrottlingLogger; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A {@link GrpcExporter} which uses the standard grpc-java library. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class DefaultGrpcExporter implements GrpcExporter { + + private static final Logger internalLogger = + Logger.getLogger(DefaultGrpcExporter.class.getName()); + + private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); + + private final String type; + private final ManagedChannel managedChannel; + private final MarshalerServiceStub stub; + private final long timeoutNanos; + + private final BoundLongCounter seen; + private final BoundLongCounter success; + private final BoundLongCounter failed; + + /** Creates a new {@link DefaultGrpcExporter}. */ + public DefaultGrpcExporter( + String type, + ManagedChannel channel, + MarshalerServiceStub stub, + long timeoutNanos, + boolean compressionEnabled) { + this.type = type; + Meter meter = GlobalMeterProvider.get().get("io.opentelemetry.exporters.otlp-grpc"); + Attributes attributes = Attributes.builder().put("type", type).build(); + seen = meter.counterBuilder("otlp.exporter.seen").build().bind(attributes); + LongCounter exported = meter.counterBuilder("otlp.exported.exported").build(); + success = exported.bind(attributes.toBuilder().put("success", true).build()); + failed = exported.bind(attributes.toBuilder().put("success", false).build()); + + this.managedChannel = channel; + this.timeoutNanos = timeoutNanos; + Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE; + this.stub = stub.withCompression(codec.getMessageEncoding()); + } + + @Override + public CompletableResultCode export(T exportRequest, int numItems) { + seen.add(numItems); + + CompletableResultCode result = new CompletableResultCode(); + + MarshalerServiceStub stub = this.stub; + if (timeoutNanos > 0) { + stub = stub.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS); + } + Futures.addCallback( + stub.export(exportRequest), + new FutureCallback() { + @Override + public void onSuccess(@Nullable Object unused) { + success.add(numItems); + result.succeed(); + } + + @Override + public void onFailure(Throwable t) { + failed.add(numItems); + Status status = Status.fromThrowable(t); + switch (status.getCode()) { + case UNIMPLEMENTED: + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. Server responded with UNIMPLEMENTED. " + + "This usually means that your collector is not configured with an otlp " + + "receiver in the \"pipelines\" section of the configuration. " + + "Full error message: " + + t.getMessage()); + break; + case UNAVAILABLE: + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. Server is UNAVAILABLE. " + + "Make sure your collector is running and reachable from this network. " + + "Full error message:" + + t.getMessage()); + break; + default: + logger.log( + Level.WARNING, + "Failed to export " + type + "s. Error message: " + t.getMessage()); + break; + } + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, "Failed to export " + type + "s. Details follow: " + t); + } + result.fail(); + } + }, + MoreExecutors.directExecutor()); + + return result; + } + + @Override + public CompletableResultCode shutdown() { + if (managedChannel.isTerminated()) { + return CompletableResultCode.ofSuccess(); + } + seen.unbind(); + success.unbind(); + failed.unbind(); + return ManagedChannelUtil.shutdownChannel(managedChannel); + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporterBuilder.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporterBuilder.java new file mode 100644 index 00000000000..c4e44a7b0b3 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/DefaultGrpcExporterBuilder.java @@ -0,0 +1,160 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; +import static io.opentelemetry.api.internal.Utils.checkArgument; +import static java.util.Objects.requireNonNull; + +import io.grpc.Codec; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.stub.MetadataUtils; +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.annotation.Nullable; +import javax.net.ssl.SSLException; + +/** + * A builder for {@link DefaultGrpcExporter}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class DefaultGrpcExporterBuilder + implements GrpcExporterBuilder { + + private final String type; + private final Function> stubFactory; + + @Nullable private ManagedChannel channel; + private long timeoutNanos; + private URI endpoint; + private boolean compressionEnabled = false; + @Nullable private Metadata metadata; + @Nullable private byte[] trustedCertificatesPem; + + /** Creates a new {@link DefaultGrpcExporterBuilder}. */ + public DefaultGrpcExporterBuilder( + String type, + Function> stubFactory, + long defaultTimeoutSecs, + URI defaultEndpoint) { + this.type = type; + this.stubFactory = stubFactory; + timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs); + endpoint = defaultEndpoint; + } + + @Override + public DefaultGrpcExporterBuilder setChannel(ManagedChannel channel) { + this.channel = channel; + return this; + } + + @Override + public DefaultGrpcExporterBuilder setTimeout(long timeout, TimeUnit unit) { + requireNonNull(unit, "unit"); + checkArgument(timeout >= 0, "timeout must be non-negative"); + timeoutNanos = unit.toNanos(timeout); + return this; + } + + @Override + public DefaultGrpcExporterBuilder setTimeout(Duration timeout) { + requireNonNull(timeout, "timeout"); + return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + } + + @Override + public DefaultGrpcExporterBuilder setEndpoint(String endpoint) { + requireNonNull(endpoint, "endpoint"); + + URI uri; + try { + uri = new URI(endpoint); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e); + } + + if (uri.getScheme() == null + || (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) { + throw new IllegalArgumentException( + "Invalid endpoint, must start with http:// or https://: " + uri); + } + + this.endpoint = uri; + return this; + } + + @Override + public DefaultGrpcExporterBuilder setCompression(String compressionMethod) { + requireNonNull(compressionMethod, "compressionMethod"); + checkArgument( + compressionMethod.equals("gzip"), + "Unsupported compression method. Supported compression methods include: gzip."); + this.compressionEnabled = true; + return this; + } + + @Override + public DefaultGrpcExporterBuilder setTrustedCertificates(byte[] trustedCertificatesPem) { + this.trustedCertificatesPem = trustedCertificatesPem; + return this; + } + + @Override + public DefaultGrpcExporterBuilder addHeader(String key, String value) { + if (metadata == null) { + metadata = new Metadata(); + } + metadata.put(Metadata.Key.of(key, ASCII_STRING_MARSHALLER), value); + return this; + } + + @Override + public GrpcExporter build() { + ManagedChannel channel = this.channel; + if (channel == null) { + final ManagedChannelBuilder managedChannelBuilder = + ManagedChannelBuilder.forTarget(endpoint.getAuthority()); + + if (endpoint.getScheme().equals("https")) { + managedChannelBuilder.useTransportSecurity(); + } else { + managedChannelBuilder.usePlaintext(); + } + + if (metadata != null) { + managedChannelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata)); + } + + if (trustedCertificatesPem != null) { + try { + ManagedChannelUtil.setTrustedCertificatesPem( + managedChannelBuilder, trustedCertificatesPem); + } catch (SSLException e) { + throw new IllegalStateException( + "Could not set trusted certificates for gRPC TLS connection, are they valid " + + "X.509 in PEM format?", + e); + } + } + + channel = managedChannelBuilder.build(); + } + + Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE; + MarshalerServiceStub stub = + stubFactory.apply(channel).withCompression(codec.getMessageEncoding()); + return new DefaultGrpcExporter<>(type, channel, stub, timeoutNanos, compressionEnabled); + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporter.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporter.java new file mode 100644 index 00000000000..4e1a1a09d8e --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporter.java @@ -0,0 +1,28 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; + +/** + * An exporter of a {@link io.opentelemetry.exporter.otlp.internal.Marshaler} using the gRPC wire + * format. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public interface GrpcExporter { + + /** + * Exports the {@code exportRequest} which is a request {@link Marshaler} for {@code numItems} + * items. + */ + CompletableResultCode export(T exportRequest, int numItems); + + /** Shuts the exporter down. */ + CompletableResultCode shutdown(); +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporterBuilder.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporterBuilder.java new file mode 100644 index 00000000000..76dae665e0d --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/GrpcExporterBuilder.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import io.grpc.ManagedChannel; +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +public interface GrpcExporterBuilder { + GrpcExporterBuilder setChannel(ManagedChannel channel); + + GrpcExporterBuilder setTimeout(long timeout, TimeUnit unit); + + GrpcExporterBuilder setTimeout(Duration timeout); + + GrpcExporterBuilder setEndpoint(String endpoint); + + GrpcExporterBuilder setCompression(String compressionMethod); + + GrpcExporterBuilder setTrustedCertificates(byte[] trustedCertificatesPem); + + GrpcExporterBuilder addHeader(String key, String value); + + GrpcExporter build(); +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/MarshalerServiceStub.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/MarshalerServiceStub.java new file mode 100644 index 00000000000..ab525f987a4 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/grpc/MarshalerServiceStub.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.grpc; + +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.CallOptions; +import io.grpc.Channel; + +public abstract class MarshalerServiceStub> + extends io.grpc.stub.AbstractFutureStub { + protected MarshalerServiceStub(Channel channel, CallOptions callOptions) { + super(channel, callOptions); + } + + public abstract ListenableFuture export(T request); +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/okhttp/OkHttpExporterBuilder.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/okhttp/OkHttpExporterBuilder.java new file mode 100644 index 00000000000..5375f62e76b --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/okhttp/OkHttpExporterBuilder.java @@ -0,0 +1,150 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.okhttp; + +import static io.opentelemetry.api.internal.Utils.checkArgument; +import static java.util.Objects.requireNonNull; + +import io.grpc.ManagedChannel; +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import io.opentelemetry.exporter.otlp.internal.TlsUtil; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporterBuilder; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import javax.net.ssl.SSLException; +import javax.net.ssl.X509TrustManager; +import okhttp3.Headers; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; + +/** + * A builder for {@link OkHttpGrpcExporter}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class OkHttpExporterBuilder implements GrpcExporterBuilder { + + private final String type; + private final String grpcEndpointPath; + + private long timeoutNanos; + private URI endpoint; + private boolean compressionEnabled = false; + private final Headers.Builder headers = new Headers.Builder(); + @Nullable private byte[] trustedCertificatesPem; + + /** Creates a new {@link OkHttpExporterBuilder}. */ + public OkHttpExporterBuilder( + String type, String grpcEndpointPath, long defaultTimeoutSecs, URI defaultEndpoint) { + this.type = type; + this.grpcEndpointPath = grpcEndpointPath; + timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs); + endpoint = defaultEndpoint; + } + + @Override + public OkHttpExporterBuilder setChannel(ManagedChannel channel) { + throw new UnsupportedOperationException("Only available on DefaultGrpcExporter"); + } + + @Override + public OkHttpExporterBuilder setTimeout(long timeout, TimeUnit unit) { + requireNonNull(unit, "unit"); + checkArgument(timeout >= 0, "timeout must be non-negative"); + timeoutNanos = unit.toNanos(timeout); + return this; + } + + @Override + public OkHttpExporterBuilder setTimeout(Duration timeout) { + requireNonNull(timeout, "timeout"); + return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + } + + @Override + public OkHttpExporterBuilder setEndpoint(String endpoint) { + requireNonNull(endpoint, "endpoint"); + + URI uri; + try { + uri = new URI(endpoint); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e); + } + + if (uri.getScheme() == null + || (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) { + throw new IllegalArgumentException( + "Invalid endpoint, must start with http:// or https://: " + uri); + } + + this.endpoint = uri; + return this; + } + + @Override + public OkHttpExporterBuilder setCompression(String compressionMethod) { + requireNonNull(compressionMethod, "compressionMethod"); + checkArgument( + compressionMethod.equals("gzip"), + "Unsupported compression method. Supported compression methods include: gzip."); + this.compressionEnabled = true; + return this; + } + + @Override + public OkHttpExporterBuilder setTrustedCertificates(byte[] trustedCertificatesPem) { + this.trustedCertificatesPem = trustedCertificatesPem; + return this; + } + + @Override + public OkHttpExporterBuilder addHeader(String key, String value) { + headers.add(key, value); + return this; + } + + @Override + public GrpcExporter build() { + OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder(); + + Headers.Builder headers = this.headers != null ? this.headers : new Headers.Builder(); + + clientBuilder.callTimeout(Duration.ofNanos(timeoutNanos)); + + if (trustedCertificatesPem != null) { + try { + X509TrustManager trustManager = TlsUtil.trustManager(trustedCertificatesPem); + clientBuilder.sslSocketFactory(TlsUtil.sslSocketFactory(trustManager), trustManager); + } catch (SSLException e) { + throw new IllegalStateException( + "Could not set trusted certificates, are they valid X.509 in PEM format?", e); + } + } + + String endpoint = this.endpoint.resolve(grpcEndpointPath).toString(); + if (endpoint.startsWith("http://")) { + clientBuilder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE)); + } else { + clientBuilder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1)); + } + + headers.add("te", "trailers"); + if (compressionEnabled) { + headers.add("grpc-encoding", "gzip"); + } + + return new OkHttpGrpcExporter<>( + type, clientBuilder.build(), endpoint, headers.build(), compressionEnabled); + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/okhttp/OkHttpGrpcExporter.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/okhttp/OkHttpGrpcExporter.java new file mode 100644 index 00000000000..2167029a030 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/okhttp/OkHttpGrpcExporter.java @@ -0,0 +1,233 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: + +/* + * Copyright 2014 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.exporter.otlp.internal.okhttp; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.BoundLongCounter; +import io.opentelemetry.api.metrics.GlobalMeterProvider; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporter; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.internal.ThrottlingLogger; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.Headers; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + +/** + * A {@link GrpcExporter} which uses OkHttp instead of grpc-java. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class OkHttpGrpcExporter implements GrpcExporter { + + private static final String GRPC_STATUS = "grpc-status"; + private static final String GRPC_MESSAGE = "grpc-message"; + + private final ThrottlingLogger logger = + new ThrottlingLogger(Logger.getLogger(OkHttpGrpcExporter.class.getName())); + + private final String type; + private final OkHttpClient client; + private final String endpoint; + private final Headers headers; + private final boolean compressionEnabled; + + private final BoundLongCounter seen; + private final BoundLongCounter success; + private final BoundLongCounter failed; + + /** Creates a new {@link OkHttpGrpcExporter}. */ + public OkHttpGrpcExporter( + String type, + OkHttpClient client, + String endpoint, + Headers headers, + boolean compressionEnabled) { + this.type = type; + this.client = client; + this.endpoint = endpoint; + this.headers = headers; + this.compressionEnabled = compressionEnabled; + + Meter meter = GlobalMeterProvider.get().get("io.opentelemetry.exporters.otlp-grpc-okhttp"); + Attributes attributes = Attributes.builder().put("type", type).build(); + seen = meter.counterBuilder("otlp.exporter.seen").build().bind(attributes); + LongCounter exported = meter.counterBuilder("otlp.exported.exported").build(); + success = exported.bind(attributes.toBuilder().put("success", true).build()); + failed = exported.bind(attributes.toBuilder().put("success", false).build()); + } + + @Override + public CompletableResultCode export(T exportRequest, int numItems) { + seen.add(numItems); + + Request.Builder requestBuilder = new Request.Builder().url(endpoint).headers(headers); + + RequestBody requestBody = new GrpcRequestBody(exportRequest, compressionEnabled); + requestBuilder.post(requestBody); + + CompletableResultCode result = new CompletableResultCode(); + + client + .newCall(requestBuilder.build()) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + failed.add(numItems); + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. The request could not be executed. Full error message: " + + e.getMessage()); + result.fail(); + } + + @Override + public void onResponse(Call call, Response response) { + // Response body is empty but must be consumed to access trailers. + try { + response.body().bytes(); + } catch (IOException e) { + logger.log( + Level.WARNING, + "Failed to export " + type + "s, could not consume server response.", + e); + failed.add(numItems); + result.fail(); + return; + } + + String status = grpcStatus(response); + if ("0".equals(status)) { + success.add(numItems); + result.succeed(); + return; + } + + failed.add(numItems); + + String codeMessage = + status != null + ? "gRPC status code " + status + : "HTTP status code " + response.code(); + String errorMessage = grpcMessage(response); + logger.log( + Level.WARNING, + "Failed to export spans. Server responded with " + + codeMessage + + ". Error message: " + + errorMessage); + result.fail(); + } + }); + + return result; + } + + @Nullable + private static String grpcStatus(Response response) { + // Status can either be in the headers or trailers depending on error + String grpcStatus = response.header(GRPC_STATUS); + if (grpcStatus == null) { + try { + grpcStatus = response.trailers().get(GRPC_STATUS); + } catch (IOException e) { + // Could not read a status, this generally means the HTTP status is the error. + return null; + } + } + return grpcStatus; + } + + private static String grpcMessage(Response response) { + String message = response.header(GRPC_MESSAGE); + if (message == null) { + try { + message = response.trailers().get(GRPC_MESSAGE); + } catch (IOException e) { + // Fall through + } + } + if (message != null) { + return unescape(message); + } + // Couldn't get message for some reason, use the HTTP status. + return response.message(); + } + + @Override + public CompletableResultCode shutdown() { + client.dispatcher().cancelAll(); + this.seen.unbind(); + this.success.unbind(); + this.failed.unbind(); + return CompletableResultCode.ofSuccess(); + } + + // From grpc-java + + /** Unescape the provided ascii to a unicode {@link String}. */ + private static String unescape(String value) { + for (int i = 0; i < value.length(); i++) { + final char c = value.charAt(i); + if (c < ' ' || c >= '~' || (c == '%' && i + 2 < value.length())) { + return doUnescape(value.getBytes(StandardCharsets.US_ASCII)); + } + } + return value; + } + + private static String doUnescape(byte[] value) { + final ByteBuffer buf = ByteBuffer.allocate(value.length); + for (int i = 0; i < value.length; ) { + if (value[i] == '%' && i + 2 < value.length) { + try { + buf.put((byte) Integer.parseInt(new String(value, i + 1, 2, StandardCharsets.UTF_8), 16)); + i += 3; + continue; + } catch (NumberFormatException e) { + // ignore, fall through, just push the bytes. + } + } + buf.put(value[i]); + i += 1; + } + return new String(buf.array(), 0, buf.position(), StandardCharsets.UTF_8); + } +} diff --git a/exporters/otlp/trace/build.gradle.kts b/exporters/otlp/trace/build.gradle.kts index 4a834952327..c545e2e9266 100644 --- a/exporters/otlp/trace/build.gradle.kts +++ b/exporters/otlp/trace/build.gradle.kts @@ -14,6 +14,7 @@ testSets { create("testGrpcNetty") create("testGrpcNettyShaded") create("testGrpcOkhttp") + create("testOkhttpOnly") // Mainly to conveniently profile through IDEA. Don't add to check task, it's for // manual invocation. @@ -31,6 +32,8 @@ dependencies { compileOnly("io.grpc:grpc-netty-shaded") compileOnly("io.grpc:grpc-okhttp") + compileOnly("com.squareup.okhttp3:okhttp") + api("io.grpc:grpc-stub") implementation("io.grpc:grpc-api") @@ -57,6 +60,12 @@ dependencies { add("testGrpcOkhttpRuntimeOnly", "io.grpc:grpc-okhttp") add("testGrpcOkhttpRuntimeOnly", "org.bouncycastle:bcpkix-jdk15on") + add("testOkhttpOnlyImplementation", "com.linecorp.armeria:armeria-grpc-protocol") + add("testOkhttpOnlyImplementation", "com.linecorp.armeria:armeria-junit5") + add("testOkhttpOnlyImplementation", "com.squareup.okhttp3:okhttp") + add("testOkhttpOnlyImplementation", "com.squareup.okhttp3:okhttp-tls") + add("testOkhttpOnlyRuntimeOnly", "org.bouncycastle:bcpkix-jdk15on") + add("testSpanPipeline", project(":proto")) add("testSpanPipeline", "io.grpc:grpc-protobuf") add("testSpanPipeline", "io.grpc:grpc-testing") @@ -64,6 +73,14 @@ dependencies { tasks { named("check") { - dependsOn("testGrpcNetty", "testGrpcNettyShaded", "testGrpcOkhttp") + dependsOn("testGrpcNetty", "testGrpcNettyShaded", "testGrpcOkhttp", "testOkhttpOnly") + } +} + +configurations { + named("testOkhttpOnlyRuntimeClasspath") { + dependencies { + exclude("io.grpc") + } } } diff --git a/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/MarshalerTraceServiceGrpc.java b/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/MarshalerTraceServiceGrpc.java index 66d34e8039b..eed1af6714b 100644 --- a/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/MarshalerTraceServiceGrpc.java +++ b/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/MarshalerTraceServiceGrpc.java @@ -9,6 +9,7 @@ import io.grpc.MethodDescriptor; import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerInputStream; +import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerServiceStub; import io.opentelemetry.exporter.otlp.internal.traces.TraceRequestMarshaler; import java.io.InputStream; @@ -57,7 +58,8 @@ static TraceServiceFutureStub newFutureStub(io.grpc.Channel channel) { } static final class TraceServiceFutureStub - extends io.grpc.stub.AbstractFutureStub { + extends MarshalerServiceStub< + TraceRequestMarshaler, ExportTraceServiceResponse, TraceServiceFutureStub> { private TraceServiceFutureStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { super(channel, callOptions); } @@ -68,7 +70,8 @@ protected MarshalerTraceServiceGrpc.TraceServiceFutureStub build( return new MarshalerTraceServiceGrpc.TraceServiceFutureStub(channel, callOptions); } - com.google.common.util.concurrent.ListenableFuture export( + @Override + public com.google.common.util.concurrent.ListenableFuture export( TraceRequestMarshaler request) { return io.grpc.stub.ClientCalls.futureUnaryCall( getChannel().newCall(getExportMethod, getCallOptions()), request); diff --git a/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporter.java b/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporter.java index 722d1b92d0b..a7896689e74 100644 --- a/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporter.java +++ b/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporter.java @@ -5,79 +5,42 @@ package io.opentelemetry.exporter.otlp.trace; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; -import io.grpc.Codec; -import io.grpc.ManagedChannel; -import io.grpc.Status; -import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.BoundLongCounter; -import io.opentelemetry.api.metrics.GlobalMeterProvider; -import io.opentelemetry.api.metrics.LongCounter; -import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.otlp.internal.traces.TraceRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.util.Collection; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; /** Exports spans using OTLP via gRPC, using OpenTelemetry's protobuf model. */ @ThreadSafe public final class OtlpGrpcSpanExporter implements SpanExporter { - private static final AttributeKey EXPORTER_KEY = AttributeKey.stringKey("exporter"); - private static final AttributeKey SUCCESS_KEY = AttributeKey.stringKey("success"); - private static final String EXPORTER_NAME = OtlpGrpcSpanExporter.class.getSimpleName(); - private static final Attributes EXPORTER_NAME_Attributes = - Attributes.of(EXPORTER_KEY, EXPORTER_NAME); - private static final Attributes EXPORT_SUCCESS_ATTRIBUTES = - Attributes.of(EXPORTER_KEY, EXPORTER_NAME, SUCCESS_KEY, "true"); - private static final Attributes EXPORT_FAILURE_ATTRIBUTES = - Attributes.of(EXPORTER_KEY, EXPORTER_NAME, SUCCESS_KEY, "false"); - private static final Logger internalLogger = - Logger.getLogger(OtlpGrpcSpanExporter.class.getName()); + private final GrpcExporter exporter; - private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); - - private final MarshalerTraceServiceGrpc.TraceServiceFutureStub traceService; - - private final ManagedChannel managedChannel; - private final long timeoutNanos; - private final BoundLongCounter spansSeen; - private final BoundLongCounter spansExportedSuccess; - private final BoundLongCounter spansExportedFailure; + /** + * Returns a new {@link OtlpGrpcSpanExporter} reading the configuration values from the + * environment and from system properties. System properties override values defined in the + * environment. If a configuration value is missing, it uses the default value. + * + * @return a new {@link OtlpGrpcSpanExporter} instance. + */ + public static OtlpGrpcSpanExporter getDefault() { + return builder().build(); + } /** - * Creates a new OTLP gRPC Span Reporter with the given name, using the given channel. + * Returns a new builder instance for this exporter. * - * @param channel the channel to use when communicating with the OpenTelemetry Collector. - * @param timeoutNanos max waiting time for the collector to process each span batch. When set to - * 0 or to a negative value, the exporter will wait indefinitely. - * @param compressionEnabled whether or not to enable gzip compression. + * @return a new builder instance for this exporter. */ - OtlpGrpcSpanExporter(ManagedChannel channel, long timeoutNanos, boolean compressionEnabled) { - // TODO: telemetry schema version. - Meter meter = GlobalMeterProvider.get().meterBuilder("io.opentelemetry.exporters.otlp").build(); - this.spansSeen = - meter.counterBuilder("spansSeenByExporter").build().bind(EXPORTER_NAME_Attributes); - LongCounter spansExportedCounter = meter.counterBuilder("spansExportedByExporter").build(); - this.spansExportedSuccess = spansExportedCounter.bind(EXPORT_SUCCESS_ATTRIBUTES); - this.spansExportedFailure = spansExportedCounter.bind(EXPORT_FAILURE_ATTRIBUTES); - this.managedChannel = channel; - this.timeoutNanos = timeoutNanos; - Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE; - this.traceService = - MarshalerTraceServiceGrpc.newFutureStub(channel) - .withCompression(codec.getMessageEncoding()); + public static OtlpGrpcSpanExporterBuilder builder() { + return new OtlpGrpcSpanExporterBuilder(); + } + + OtlpGrpcSpanExporter(GrpcExporter exporter) { + this.exporter = exporter; } /** @@ -88,62 +51,9 @@ public final class OtlpGrpcSpanExporter implements SpanExporter { */ @Override public CompletableResultCode export(Collection spans) { - spansSeen.add(spans.size()); TraceRequestMarshaler request = TraceRequestMarshaler.create(spans); - final CompletableResultCode result = new CompletableResultCode(); - - MarshalerTraceServiceGrpc.TraceServiceFutureStub exporter; - if (timeoutNanos > 0) { - exporter = traceService.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS); - } else { - exporter = traceService; - } - - Futures.addCallback( - exporter.export(request), - new FutureCallback() { - @Override - public void onSuccess(@Nullable ExportTraceServiceResponse response) { - spansExportedSuccess.add(spans.size()); - result.succeed(); - } - - @Override - public void onFailure(Throwable t) { - spansExportedFailure.add(spans.size()); - Status status = Status.fromThrowable(t); - switch (status.getCode()) { - case UNIMPLEMENTED: - logger.log( - Level.SEVERE, - "Failed to export spans. Server responded with UNIMPLEMENTED. " - + "This usually means that your collector is not configured with an otlp " - + "receiver in the \"pipelines\" section of the configuration. " - + "Full error message: " - + t.getMessage()); - break; - case UNAVAILABLE: - logger.log( - Level.SEVERE, - "Failed to export spans. Server is UNAVAILABLE. " - + "Make sure your collector is running and reachable from this network. " - + "Full error message:" - + t.getMessage()); - break; - default: - logger.log( - Level.WARNING, "Failed to export spans. Error message: " + t.getMessage()); - break; - } - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, "Failed to export spans. Details follow: " + t); - } - result.fail(); - } - }, - MoreExecutors.directExecutor()); - return result; + return exporter.export(request, spans.size()); } /** @@ -156,43 +66,12 @@ public CompletableResultCode flush() { return CompletableResultCode.ofSuccess(); } - /** - * Returns a new builder instance for this exporter. - * - * @return a new builder instance for this exporter. - */ - public static OtlpGrpcSpanExporterBuilder builder() { - return new OtlpGrpcSpanExporterBuilder(); - } - - /** - * Returns a new {@link OtlpGrpcSpanExporter} reading the configuration values from the - * environment and from system properties. System properties override values defined in the - * environment. If a configuration value is missing, it uses the default value. - * - * @return a new {@link OtlpGrpcSpanExporter} instance. - */ - public static OtlpGrpcSpanExporter getDefault() { - return builder().build(); - } - /** * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately * cancelled. */ @Override public CompletableResultCode shutdown() { - if (managedChannel.isTerminated()) { - return CompletableResultCode.ofSuccess(); - } - this.spansSeen.unbind(); - this.spansExportedSuccess.unbind(); - this.spansExportedFailure.unbind(); - return ManagedChannelUtil.shutdownChannel(managedChannel); - } - - // Visible for testing - long getTimeoutNanos() { - return timeoutNanos; + return exporter.shutdown(); } } diff --git a/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java b/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java index 57b1be86b1a..ca69893ca96 100644 --- a/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java +++ b/exporters/otlp/trace/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java @@ -5,35 +5,64 @@ package io.opentelemetry.exporter.otlp.trace; -import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; import static io.opentelemetry.api.internal.Utils.checkArgument; import static java.util.Objects.requireNonNull; import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.Metadata; -import io.grpc.stub.MetadataUtils; -import io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder; +import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporterBuilder; +import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpExporterBuilder; +import io.opentelemetry.exporter.otlp.internal.traces.TraceRequestMarshaler; import java.net.URI; -import java.net.URISyntaxException; import java.time.Duration; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import javax.net.ssl.SSLException; /** Builder utility for this exporter. */ public final class OtlpGrpcSpanExporterBuilder { + // Visible for testing + static final String GRPC_ENDPOINT_PATH = + "/opentelemetry.proto.collector.trace.v1.TraceService/Export"; + private static final String DEFAULT_ENDPOINT_URL = "http://localhost:4317"; private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL); private static final long DEFAULT_TIMEOUT_SECS = 10; - @Nullable private ManagedChannel channel; - private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS); - private URI endpoint = DEFAULT_ENDPOINT; - private boolean compressionEnabled = false; - @Nullable private Metadata metadata; - @Nullable private byte[] trustedCertificatesPem; + // Visible for testing + static final boolean USE_OKHTTP; + + static { + boolean useOkhttp = false; + // Use the OkHttp exporter if ManagedChannel is not found and OkHttp is. + try { + Class.forName("io.grpc.ManagedChannel"); + } catch (ClassNotFoundException e) { + try { + Class.forName("okhttp3.OkHttpClient"); + useOkhttp = true; + } catch (ClassNotFoundException classNotFoundException) { + // Fall through + } + } + USE_OKHTTP = useOkhttp; + } + + private final GrpcExporterBuilder delegate; + + OtlpGrpcSpanExporterBuilder() { + if (USE_OKHTTP) { + delegate = + new OkHttpExporterBuilder<>( + "span", GRPC_ENDPOINT_PATH, DEFAULT_TIMEOUT_SECS, DEFAULT_ENDPOINT); + } else { + delegate = + new DefaultGrpcExporterBuilder<>( + "span", + MarshalerTraceServiceGrpc::newFutureStub, + DEFAULT_TIMEOUT_SECS, + DEFAULT_ENDPOINT); + } + } /** * Sets the managed chanel to use when communicating with the backend. Takes precedence over @@ -43,7 +72,7 @@ public final class OtlpGrpcSpanExporterBuilder { * @return this builder's instance */ public OtlpGrpcSpanExporterBuilder setChannel(ManagedChannel channel) { - this.channel = channel; + delegate.setChannel(channel); return this; } @@ -54,7 +83,7 @@ public OtlpGrpcSpanExporterBuilder setChannel(ManagedChannel channel) { public OtlpGrpcSpanExporterBuilder setTimeout(long timeout, TimeUnit unit) { requireNonNull(unit, "unit"); checkArgument(timeout >= 0, "timeout must be non-negative"); - timeoutNanos = unit.toNanos(timeout); + delegate.setTimeout(timeout, unit); return this; } @@ -64,7 +93,8 @@ public OtlpGrpcSpanExporterBuilder setTimeout(long timeout, TimeUnit unit) { */ public OtlpGrpcSpanExporterBuilder setTimeout(Duration timeout) { requireNonNull(timeout, "timeout"); - return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + delegate.setTimeout(timeout); + return this; } /** @@ -73,21 +103,7 @@ public OtlpGrpcSpanExporterBuilder setTimeout(Duration timeout) { */ public OtlpGrpcSpanExporterBuilder setEndpoint(String endpoint) { requireNonNull(endpoint, "endpoint"); - - URI uri; - try { - uri = new URI(endpoint); - } catch (URISyntaxException e) { - throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e); - } - - if (uri.getScheme() == null - || (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) { - throw new IllegalArgumentException( - "Invalid endpoint, must start with http:// or https://: " + uri); - } - - this.endpoint = uri; + delegate.setEndpoint(endpoint); return this; } @@ -100,7 +116,7 @@ public OtlpGrpcSpanExporterBuilder setCompression(String compressionMethod) { checkArgument( compressionMethod.equals("gzip"), "Unsupported compression method. Supported compression methods include: gzip."); - this.compressionEnabled = true; + delegate.setCompression(compressionMethod); return this; } @@ -110,23 +126,20 @@ public OtlpGrpcSpanExporterBuilder setCompression(String compressionMethod) { * use the system default trusted certificates. */ public OtlpGrpcSpanExporterBuilder setTrustedCertificates(byte[] trustedCertificatesPem) { - this.trustedCertificatesPem = trustedCertificatesPem; + delegate.setTrustedCertificates(trustedCertificatesPem); return this; } /** * Add header to request. Optional. Applicable only if {@link - * OtlpGrpcSpanExporterBuilder#endpoint} is set to build channel. + * OtlpGrpcSpanExporterBuilder#setChannel(ManagedChannel)} is not called. * * @param key header key * @param value header value * @return this builder's instance */ public OtlpGrpcSpanExporterBuilder addHeader(String key, String value) { - if (metadata == null) { - metadata = new Metadata(); - } - metadata.put(Metadata.Key.of(key, ASCII_STRING_MARSHALLER), value); + delegate.addHeader(key, value); return this; } @@ -136,37 +149,6 @@ public OtlpGrpcSpanExporterBuilder addHeader(String key, String value) { * @return a new exporter's instance */ public OtlpGrpcSpanExporter build() { - ManagedChannel channel = this.channel; - if (channel == null) { - final ManagedChannelBuilder managedChannelBuilder = - ManagedChannelBuilder.forTarget(endpoint.getAuthority()); - - if (endpoint.getScheme().equals("https")) { - managedChannelBuilder.useTransportSecurity(); - } else { - managedChannelBuilder.usePlaintext(); - } - - if (metadata != null) { - managedChannelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata)); - } - - if (trustedCertificatesPem != null) { - try { - ManagedChannelUtil.setTrustedCertificatesPem( - managedChannelBuilder, trustedCertificatesPem); - } catch (SSLException e) { - throw new IllegalStateException( - "Could not set trusted certificates for gRPC TLS connection, are they valid " - + "X.509 in PEM format?", - e); - } - } - - channel = managedChannelBuilder.build(); - } - return new OtlpGrpcSpanExporter(channel, timeoutNanos, compressionEnabled); + return new OtlpGrpcSpanExporter(delegate.build()); } - - OtlpGrpcSpanExporterBuilder() {} } diff --git a/exporters/otlp/trace/src/test/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterTest.java b/exporters/otlp/trace/src/test/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterTest.java index e9a94b9d185..988895b5088 100644 --- a/exporters/otlp/trace/src/test/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterTest.java +++ b/exporters/otlp/trace/src/test/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterTest.java @@ -22,6 +22,7 @@ import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporter; import io.opentelemetry.exporter.otlp.internal.traces.ResourceSpansMarshaler; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; @@ -62,7 +63,7 @@ class OtlpGrpcSpanExporterTest { private final Closer closer = Closer.create(); @RegisterExtension - LogCapturer logs = LogCapturer.create().captureForType(OtlpGrpcSpanExporter.class); + LogCapturer logs = LogCapturer.create().captureForType(DefaultGrpcExporter.class); @BeforeEach public void setup() throws IOException { @@ -311,6 +312,11 @@ void testExport_PermissionDenied() { } } + @Test + void usingGrpc() { + assertThat(OtlpGrpcSpanExporterBuilder.USE_OKHTTP).isFalse(); + } + private static SpanData generateFakeSpan() { long duration = TimeUnit.MILLISECONDS.toNanos(900); long startNs = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); diff --git a/exporters/otlp/trace/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java b/exporters/otlp/trace/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java index 265c59aed42..a5c5bd6f5fd 100644 --- a/exporters/otlp/trace/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java +++ b/exporters/otlp/trace/src/testGrpcNetty/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java @@ -126,4 +126,9 @@ void tlsBadCert() { .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Could not set trusted certificates"); } + + @Test + void usingGrpc() { + assertThat(OtlpGrpcSpanExporterBuilder.USE_OKHTTP).isFalse(); + } } diff --git a/exporters/otlp/trace/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java b/exporters/otlp/trace/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java index 265c59aed42..a5c5bd6f5fd 100644 --- a/exporters/otlp/trace/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java +++ b/exporters/otlp/trace/src/testGrpcNettyShaded/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java @@ -126,4 +126,9 @@ void tlsBadCert() { .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Could not set trusted certificates"); } + + @Test + void usingGrpc() { + assertThat(OtlpGrpcSpanExporterBuilder.USE_OKHTTP).isFalse(); + } } diff --git a/exporters/otlp/trace/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java b/exporters/otlp/trace/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java index 265c59aed42..a5c5bd6f5fd 100644 --- a/exporters/otlp/trace/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java +++ b/exporters/otlp/trace/src/testGrpcOkhttp/java/io/opentelemetry/exporter/otlp/trace/ExportTest.java @@ -126,4 +126,9 @@ void tlsBadCert() { .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Could not set trusted certificates"); } + + @Test + void usingGrpc() { + assertThat(OtlpGrpcSpanExporterBuilder.USE_OKHTTP).isFalse(); + } } diff --git a/exporters/otlp/trace/src/testOkhttpOnly/java/io/opentelemetry/exporter/otlp/trace/OkHttpOnlyExportTest.java b/exporters/otlp/trace/src/testOkhttpOnly/java/io/opentelemetry/exporter/otlp/trace/OkHttpOnlyExportTest.java new file mode 100644 index 00000000000..d597677e63e --- /dev/null +++ b/exporters/otlp/trace/src/testOkhttpOnly/java/io/opentelemetry/exporter/otlp/trace/OkHttpOnlyExportTest.java @@ -0,0 +1,149 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.trace; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.grpc.protocol.AbstractUnaryGrpcService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; +import io.opentelemetry.sdk.testing.trace.TestSpanData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import okhttp3.tls.HeldCertificate; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class OkHttpOnlyExportTest { + + private static final List SPANS = + Collections.singletonList( + TestSpanData.builder() + .setName("name") + .setKind(SpanKind.CLIENT) + .setStartEpochNanos(1) + .setEndEpochNanos(2) + .setStatus(StatusData.ok()) + .setHasEnded(true) + .build()); + + private static final HeldCertificate HELD_CERTIFICATE; + + static { + try { + HELD_CERTIFICATE = + new HeldCertificate.Builder() + .commonName("localhost") + .addSubjectAlternativeName(InetAddress.getByName("localhost").getCanonicalHostName()) + .build(); + } catch (UnknownHostException e) { + throw new IllegalStateException("Error building certificate.", e); + } + } + + @RegisterExtension + @Order(2) + public static ServerExtension server = + new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.service( + OtlpGrpcSpanExporterBuilder.GRPC_ENDPOINT_PATH, + new AbstractUnaryGrpcService() { + @Override + protected CompletionStage handleMessage( + ServiceRequestContext ctx, byte[] message) { + return CompletableFuture.completedFuture( + ExportTraceServiceResponse.getDefaultInstance().toByteArray()); + } + }); + sb.http(0); + sb.https(0); + sb.tls(HELD_CERTIFICATE.keyPair().getPrivate(), HELD_CERTIFICATE.certificate()); + } + }; + + // NB: Armeria does not support decompression without using the actual grpc-java (naturally + // this is the same for grpc-java as a test server). The failure does indicate compression, or at + // least some sort of data transformation was attempted. Separate integration tests using the + // OTel collector verify that this is indeed correct compression. + @Test + void gzipCompressionExportAttemptedButFails() { + OtlpGrpcSpanExporter exporter = + OtlpGrpcSpanExporter.builder() + .setEndpoint("http://localhost:" + server.httpPort()) + .setCompression("gzip") + .build(); + + // See note on test method on why this checks isFalse. + assertThat(exporter.export(SPANS).join(10, TimeUnit.SECONDS).isSuccess()).isFalse(); + } + + @Test + void plainTextExport() { + OtlpGrpcSpanExporter exporter = + OtlpGrpcSpanExporter.builder().setEndpoint("http://localhost:" + server.httpPort()).build(); + assertThat(exporter.export(SPANS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + } + + @Test + void authorityWithAuth() { + OtlpGrpcSpanExporter exporter = + OtlpGrpcSpanExporter.builder() + .setEndpoint("http://foo:bar@localhost:" + server.httpPort()) + .build(); + assertThat(exporter.export(SPANS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + } + + @Test + void testTlsExport() throws Exception { + OtlpGrpcSpanExporter exporter = + OtlpGrpcSpanExporter.builder() + .setEndpoint("https://localhost:" + server.httpsPort()) + .setTrustedCertificates( + HELD_CERTIFICATE.certificatePem().getBytes(StandardCharsets.UTF_8)) + .build(); + assertThat(exporter.export(SPANS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue(); + } + + @Test + void testTlsExport_untrusted() { + OtlpGrpcSpanExporter exporter = + OtlpGrpcSpanExporter.builder() + .setEndpoint("https://localhost:" + server.httpsPort()) + .build(); + assertThat(exporter.export(SPANS).join(10, TimeUnit.SECONDS).isSuccess()).isFalse(); + } + + @Test + void tlsBadCert() { + assertThatThrownBy( + () -> + OtlpGrpcSpanExporter.builder() + .setTrustedCertificates("foobar".getBytes(StandardCharsets.UTF_8)) + .build()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Could not set trusted certificates"); + } + + @Test + void usingOkhttp() { + assertThat(OtlpGrpcSpanExporterBuilder.USE_OKHTTP).isTrue(); + } +} diff --git a/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/SpanExporterConfigurationTest.java b/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/SpanExporterConfigurationTest.java index 423fdad9b39..2da58653955 100644 --- a/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/SpanExporterConfigurationTest.java +++ b/sdk-extensions/autoconfigure/src/testFullConfig/java/io/opentelemetry/sdk/autoconfigure/SpanExporterConfigurationTest.java @@ -45,7 +45,7 @@ void configureOtlpTimeout() { OtlpGrpcSpanExporter.class, otlp -> assertThat(otlp) - .extracting("timeoutNanos") + .extracting("exporter.timeoutNanos") .isEqualTo(TimeUnit.MILLISECONDS.toNanos(10L))); } finally { exporter.shutdown(); diff --git a/sdk-extensions/autoconfigure/src/testOtlpGrpc/java/io/opentelemetry/sdk/autoconfigure/OtlpGrpcConfigTest.java b/sdk-extensions/autoconfigure/src/testOtlpGrpc/java/io/opentelemetry/sdk/autoconfigure/OtlpGrpcConfigTest.java index f748aa28a44..fe0388963c2 100644 --- a/sdk-extensions/autoconfigure/src/testOtlpGrpc/java/io/opentelemetry/sdk/autoconfigure/OtlpGrpcConfigTest.java +++ b/sdk-extensions/autoconfigure/src/testOtlpGrpc/java/io/opentelemetry/sdk/autoconfigure/OtlpGrpcConfigTest.java @@ -144,7 +144,9 @@ void configureExportersGeneral() { MetricExporterConfiguration.configureOtlpMetrics( properties, SdkMeterProvider.builder().build()); - assertThat(spanExporter).extracting("timeoutNanos").isEqualTo(TimeUnit.SECONDS.toNanos(15)); + assertThat(spanExporter) + .extracting("exporter.timeoutNanos") + .isEqualTo(TimeUnit.SECONDS.toNanos(15)); assertThat( spanExporter .export(Lists.newArrayList(generateFakeSpan())) @@ -197,7 +199,9 @@ void configureSpanExporter() { SpanExporterConfiguration.configureExporter( "otlp", DefaultConfigProperties.createForTest(props), Collections.emptyMap()); - assertThat(spanExporter).extracting("timeoutNanos").isEqualTo(TimeUnit.SECONDS.toNanos(15)); + assertThat(spanExporter) + .extracting("exporter.timeoutNanos") + .isEqualTo(TimeUnit.SECONDS.toNanos(15)); assertThat( spanExporter .export(Lists.newArrayList(generateFakeSpan()))