From abeefbc824ec69887bde248033204c0559da2d23 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Thu, 30 Sep 2021 14:22:22 +0900 Subject: [PATCH] Allow OTLP HTTP exporter to also export in gRPC format. --- dependencyManagement/build.gradle.kts | 4 +- .../http/metrics/OtlpHttpMetricExporter.java | 2 +- .../otlp/http/trace/OtlpHttpSpanExporter.java | 91 ++++++++++++++++--- .../trace/OtlpHttpSpanExporterBuilder.java | 33 +++++-- .../otlp/internal/okhttp/GrpcRequestBody.java | 80 ++++++++++++++++ .../{ => okhttp}/ProtoRequestBody.java | 3 +- .../OtlpExporterIntegrationTest.java | 16 ++++ 7 files changed, 205 insertions(+), 24 deletions(-) create mode 100644 exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/okhttp/GrpcRequestBody.java rename exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/{ => okhttp}/ProtoRequestBody.java (90%) diff --git a/dependencyManagement/build.gradle.kts b/dependencyManagement/build.gradle.kts index f9544ccf8c0..90ce587bf37 100644 --- a/dependencyManagement/build.gradle.kts +++ b/dependencyManagement/build.gradle.kts @@ -75,8 +75,8 @@ val DEPENDENCIES = listOf( // using old version of okhttp to avoid pulling in kotlin stdlib // not using (old) okhttp bom because that is pulling in old guava version // and overriding the guava bom - "com.squareup.okhttp3:okhttp:3.12.13", - "com.squareup.okhttp3:okhttp-tls:3.12.13", + "com.squareup.okhttp3:okhttp:3.14.9", + "com.squareup.okhttp3:okhttp-tls:3.14.9", "com.sun.net.httpserver:http:20070405", "com.tngtech.archunit:archunit-junit5:0.21.0", "com.uber.nullaway:nullaway:0.9.2", diff --git a/exporters/otlp-http/metrics/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporter.java b/exporters/otlp-http/metrics/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporter.java index 9a4b49acc23..9c11593e286 100644 --- a/exporters/otlp-http/metrics/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporter.java +++ b/exporters/otlp-http/metrics/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporter.java @@ -5,9 +5,9 @@ package io.opentelemetry.exporter.otlp.http.metrics; -import io.opentelemetry.exporter.otlp.internal.ProtoRequestBody; import io.opentelemetry.exporter.otlp.internal.grpc.GrpcStatusUtil; import io.opentelemetry.exporter.otlp.internal.metrics.MetricsRequestMarshaler; +import io.opentelemetry.exporter.otlp.internal.okhttp.ProtoRequestBody; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.metrics.data.MetricData; diff --git a/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java b/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java index fc589e0a7e4..45ba4c42954 100644 --- a/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java +++ b/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java @@ -10,8 +10,9 @@ import io.opentelemetry.api.metrics.GlobalMeterProvider; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.exporter.otlp.internal.ProtoRequestBody; import io.opentelemetry.exporter.otlp.internal.grpc.GrpcStatusUtil; +import io.opentelemetry.exporter.otlp.internal.okhttp.GrpcRequestBody; +import io.opentelemetry.exporter.otlp.internal.okhttp.ProtoRequestBody; import io.opentelemetry.exporter.otlp.internal.traces.TraceRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.internal.ThrottlingLogger; @@ -21,7 +22,6 @@ import java.util.Collection; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import okhttp3.Call; import okhttp3.Callback; @@ -40,6 +40,9 @@ @ThreadSafe public final class OtlpHttpSpanExporter implements SpanExporter { + private static final String GRPC_STATUS = "grpc-status"; + private static final String GRPC_MESSAGE = "grpc-message"; + private static final String EXPORTER_NAME = OtlpHttpSpanExporter.class.getSimpleName(); private static final Attributes EXPORTER_NAME_LABELS = Attributes.builder().put("exporter", EXPORTER_NAME).build(); @@ -59,11 +62,17 @@ public final class OtlpHttpSpanExporter implements SpanExporter { private final OkHttpClient client; private final String endpoint; - @Nullable private final Headers headers; + private final Headers headers; private final boolean compressionEnabled; + private final boolean useGrpc; OtlpHttpSpanExporter( - OkHttpClient client, String endpoint, @Nullable Headers headers, boolean compressionEnabled) { + OkHttpClient client, + String endpoint, + Headers headers, + boolean compressionEnabled, + boolean useGrpc) { + this.useGrpc = useGrpc; Meter meter = GlobalMeterProvider.get().get("io.opentelemetry.exporters.otlp-http"); this.spansSeen = meter.counterBuilder("spansSeenByExporter").build().bind(EXPORTER_NAME_LABELS); LongCounter spansExportedCounter = meter.counterBuilder("spansExportedByExporter").build(); @@ -89,17 +98,19 @@ public CompletableResultCode export(Collection spans) { TraceRequestMarshaler exportRequest = TraceRequestMarshaler.create(spans); Request.Builder requestBuilder = new Request.Builder().url(endpoint); - if (headers != null) { - requestBuilder.headers(headers); - } - RequestBody requestBody = new ProtoRequestBody(exportRequest); - if (compressionEnabled) { - requestBuilder.addHeader("Content-Encoding", "gzip"); - requestBuilder.post(gzipRequestBody(requestBody)); + requestBuilder.headers(headers); + RequestBody requestBody; + if (!useGrpc) { + requestBody = new ProtoRequestBody(exportRequest); + if (compressionEnabled) { + requestBody = gzipRequestBody(requestBody); + } } else { - requestBuilder.post(requestBody); + requestBody = new GrpcRequestBody(exportRequest, compressionEnabled); } + requestBuilder.post(requestBody); + CompletableResultCode result = new CompletableResultCode(); client @@ -118,7 +129,21 @@ public void onFailure(Call call, IOException e) { @Override public void onResponse(Call call, Response response) { - if (response.isSuccessful()) { + if (useGrpc) { + // Response body is empty but must be consumed to access trailers. + try { + response.body().bytes(); + } catch (IOException e) { + logger.log( + Level.WARNING, + "Failed to export spans, could not consume server response.", + e); + result.fail(); + return; + } + } + + if (isSuccessful(response)) { spansExportedSuccess.add(spans.size()); result.succeed(); return; @@ -142,6 +167,28 @@ public void onResponse(Call call, Response response) { return result; } + private boolean isSuccessful(Response response) { + if (!response.isSuccessful()) { + return false; + } + + if (!useGrpc) { + return true; + } + + // Status can either be in the headers or trailers depending on error + String grpcStatus = response.header(GRPC_STATUS); + if (grpcStatus == null) { + try { + grpcStatus = response.trailers().get(GRPC_STATUS); + } catch (IOException e) { + // Could not read a status so assume the request failed. + return false; + } + } + return "0".equals(grpcStatus); + } + private static RequestBody gzipRequestBody(RequestBody requestBody) { return new RequestBody() { @Override @@ -163,7 +210,23 @@ public void writeTo(BufferedSink bufferedSink) throws IOException { }; } - private static String extractErrorStatus(Response response) { + private String extractErrorStatus(Response response) { + if (useGrpc) { + String message = response.header(GRPC_MESSAGE); + if (message == null) { + try { + message = response.trailers().get(GRPC_MESSAGE); + } catch (IOException e) { + // Fall through + } + } + if (message != null) { + return message; + } + // Couldn't get message for some reason, shouldn't happen in practice. + return ""; + } + ResponseBody responseBody = response.body(); if (responseBody == null) { return "Response body missing, HTTP status message: " + response.message(); diff --git a/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java b/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java index 9a619b05ada..a99acdb9b90 100644 --- a/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java +++ b/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java @@ -12,23 +12,28 @@ import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; +import java.util.Collections; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.net.ssl.SSLException; import javax.net.ssl.X509TrustManager; import okhttp3.Headers; import okhttp3.OkHttpClient; +import okhttp3.Protocol; /** Builder utility for {@link OtlpHttpSpanExporter}. */ public final class OtlpHttpSpanExporterBuilder { + private static final String GRPC_ENDPOINT_PATH = + "/opentelemetry.proto.collector.trace.v1.TraceService/Export"; + private static final long DEFAULT_TIMEOUT_SECS = 10; private static final String DEFAULT_ENDPOINT = "http://localhost:4317/v1/traces"; private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS); private String endpoint = DEFAULT_ENDPOINT; private boolean compressionEnabled = false; - @Nullable private Headers.Builder headersBuilder; + private final Headers.Builder headersBuilder = new Headers.Builder(); @Nullable private byte[] trustedCertificatesPem; /** @@ -90,9 +95,6 @@ public OtlpHttpSpanExporterBuilder setCompression(String compressionMethod) { /** Add header to requests. */ public OtlpHttpSpanExporterBuilder addHeader(String key, String value) { - if (headersBuilder == null) { - headersBuilder = new Headers.Builder(); - } headersBuilder.add(key, value); return this; } @@ -127,9 +129,28 @@ public OtlpHttpSpanExporter build() { } } - Headers headers = headersBuilder == null ? null : headersBuilder.build(); + String endpoint = this.endpoint; + + boolean useGrpc = endpoint.endsWith(GRPC_ENDPOINT_PATH); + if (useGrpc) { + if (endpoint.startsWith("http://")) { + clientBuilder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE)); + } else { + clientBuilder.protocols(Collections.singletonList(Protocol.HTTP_2)); + } + + headersBuilder.add("te", "trailers"); + if (compressionEnabled) { + headersBuilder.add("grpc-encoding", "gzip"); + } + } else if (compressionEnabled) { + headersBuilder.add("Content-Encoding", "gzip"); + } + + Headers headers = headersBuilder.build(); - return new OtlpHttpSpanExporter(clientBuilder.build(), endpoint, headers, compressionEnabled); + return new OtlpHttpSpanExporter( + clientBuilder.build(), endpoint, headers, compressionEnabled, useGrpc); } OtlpHttpSpanExporterBuilder() {} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/okhttp/GrpcRequestBody.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/okhttp/GrpcRequestBody.java new file mode 100644 index 00000000000..2ade5d6d2e4 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/okhttp/GrpcRequestBody.java @@ -0,0 +1,80 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.internal.okhttp; + +import io.opentelemetry.exporter.otlp.internal.Marshaler; +import java.io.IOException; +import javax.annotation.Nullable; +import okhttp3.MediaType; +import okhttp3.RequestBody; +import okio.Buffer; +import okio.BufferedSink; +import okio.GzipSink; +import okio.Okio; + +/** + * A {@link RequestBody} for reading from a {@link Marshaler} and writing in gRPC wire format. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public final class GrpcRequestBody extends RequestBody { + + private static final int HEADER_LENGTH = 5; + + private static final byte UNCOMPRESSED_FLAG = 0; + private static final byte COMPRESSED_FLAG = 1; + + private static final MediaType GRPC_MEDIA_TYPE = MediaType.parse("application/grpc"); + + private final Marshaler marshaler; + private final int messageSize; + private final int contentLength; + private final boolean compressed; + + /** Creates a new {@link GrpcRequestBody}. */ + public GrpcRequestBody(Marshaler marshaler, boolean compressed) { + this.marshaler = marshaler; + this.compressed = compressed; + + messageSize = marshaler.getBinarySerializedSize(); + if (compressed) { + // Content length not known since we want to compress on the I/O thread. + contentLength = -1; + } else { + contentLength = HEADER_LENGTH + messageSize; + } + } + + @Nullable + @Override + public MediaType contentType() { + return GRPC_MEDIA_TYPE; + } + + @Override + public long contentLength() { + return contentLength; + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + if (!compressed) { + sink.writeByte(UNCOMPRESSED_FLAG); + sink.writeInt(messageSize); + marshaler.writeBinaryTo(sink.outputStream()); + } else { + Buffer compressedBody = new Buffer(); + try (BufferedSink gzipSink = Okio.buffer(new GzipSink(compressedBody))) { + marshaler.writeBinaryTo(gzipSink.outputStream()); + } + sink.writeByte(COMPRESSED_FLAG); + int compressedBytes = (int) compressedBody.size(); + sink.writeInt(compressedBytes); + sink.write(compressedBody, compressedBytes); + } + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/ProtoRequestBody.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/okhttp/ProtoRequestBody.java similarity index 90% rename from exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/ProtoRequestBody.java rename to exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/okhttp/ProtoRequestBody.java index c3289def1b1..35fabb02c4e 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/ProtoRequestBody.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/otlp/internal/okhttp/ProtoRequestBody.java @@ -3,8 +3,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.exporter.otlp.internal; +package io.opentelemetry.exporter.otlp.internal.okhttp; +import io.opentelemetry.exporter.otlp.internal.Marshaler; import java.io.IOException; import okhttp3.MediaType; import okhttp3.RequestBody; diff --git a/integration-tests/src/test/java/io/opentelemetry/OtlpExporterIntegrationTest.java b/integration-tests/src/test/java/io/opentelemetry/OtlpExporterIntegrationTest.java index adbce3ba69a..8e2d017407f 100644 --- a/integration-tests/src/test/java/io/opentelemetry/OtlpExporterIntegrationTest.java +++ b/integration-tests/src/test/java/io/opentelemetry/OtlpExporterIntegrationTest.java @@ -150,6 +150,22 @@ void testOtlpGrpcTraceExport() { testTraceExport(otlpGrpcTraceExporter); } + @Test + void testOtlpGrpcViaHttpTraceExport() { + SpanExporter otlpGrpcHttpTraceExporter = + OtlpHttpSpanExporter.builder() + .setEndpoint( + "http://" + + collector.getHost() + + ":" + + collector.getMappedPort(COLLECTOR_OTLP_GRPC_PORT) + + "/opentelemetry.proto.collector.trace.v1.TraceService/Export") + .setCompression("gzip") + .build(); + + testTraceExport(otlpGrpcHttpTraceExporter); + } + @Test void testOtlpHttpTraceExport() { SpanExporter otlpGrpcTraceExporter =