Skip to content

Commit

Permalink
Allow OTLP HTTP exporter to also export in gRPC format.
Browse files Browse the repository at this point in the history
  • Loading branch information
Anuraag Agrawal committed Sep 30, 2021
1 parent 694ac3f commit 08fda86
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 24 deletions.
4 changes: 2 additions & 2 deletions dependencyManagement/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ val DEPENDENCIES = listOf(
// using old version of okhttp to avoid pulling in kotlin stdlib
// not using (old) okhttp bom because that is pulling in old guava version
// and overriding the guava bom
"com.squareup.okhttp3:okhttp:3.12.13",
"com.squareup.okhttp3:okhttp-tls:3.12.13",
"com.squareup.okhttp3:okhttp:3.14.9",
"com.squareup.okhttp3:okhttp-tls:3.14.9",
"com.sun.net.httpserver:http:20070405",
"com.tngtech.archunit:archunit-junit5:0.21.0",
"com.uber.nullaway:nullaway:0.9.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

package io.opentelemetry.exporter.otlp.http.metrics;

import io.opentelemetry.exporter.otlp.internal.ProtoRequestBody;
import io.opentelemetry.exporter.otlp.internal.grpc.GrpcStatusUtil;
import io.opentelemetry.exporter.otlp.internal.metrics.MetricsRequestMarshaler;
import io.opentelemetry.exporter.otlp.internal.okhttp.ProtoRequestBody;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.data.MetricData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
import io.opentelemetry.api.metrics.GlobalMeterProvider;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.exporter.otlp.internal.ProtoRequestBody;
import io.opentelemetry.exporter.otlp.internal.grpc.GrpcStatusUtil;
import io.opentelemetry.exporter.otlp.internal.okhttp.GrpcRequestBody;
import io.opentelemetry.exporter.otlp.internal.okhttp.ProtoRequestBody;
import io.opentelemetry.exporter.otlp.internal.traces.TraceRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
Expand All @@ -21,7 +22,6 @@
import java.util.Collection;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import okhttp3.Call;
import okhttp3.Callback;
Expand All @@ -40,6 +40,9 @@
@ThreadSafe
public final class OtlpHttpSpanExporter implements SpanExporter {

private static final String GRPC_STATUS = "grpc-status";
private static final String GRPC_MESSAGE = "grpc-message";

private static final String EXPORTER_NAME = OtlpHttpSpanExporter.class.getSimpleName();
private static final Attributes EXPORTER_NAME_LABELS =
Attributes.builder().put("exporter", EXPORTER_NAME).build();
Expand All @@ -59,11 +62,17 @@ public final class OtlpHttpSpanExporter implements SpanExporter {

private final OkHttpClient client;
private final String endpoint;
@Nullable private final Headers headers;
private final Headers headers;
private final boolean compressionEnabled;
private final boolean useGrpc;

OtlpHttpSpanExporter(
OkHttpClient client, String endpoint, @Nullable Headers headers, boolean compressionEnabled) {
OkHttpClient client,
String endpoint,
Headers headers,
boolean compressionEnabled,
boolean useGrpc) {
this.useGrpc = useGrpc;
Meter meter = GlobalMeterProvider.get().get("io.opentelemetry.exporters.otlp-http");
this.spansSeen = meter.counterBuilder("spansSeenByExporter").build().bind(EXPORTER_NAME_LABELS);
LongCounter spansExportedCounter = meter.counterBuilder("spansExportedByExporter").build();
Expand All @@ -89,17 +98,19 @@ public CompletableResultCode export(Collection<SpanData> spans) {
TraceRequestMarshaler exportRequest = TraceRequestMarshaler.create(spans);

Request.Builder requestBuilder = new Request.Builder().url(endpoint);
if (headers != null) {
requestBuilder.headers(headers);
}
RequestBody requestBody = new ProtoRequestBody(exportRequest);
if (compressionEnabled) {
requestBuilder.addHeader("Content-Encoding", "gzip");
requestBuilder.post(gzipRequestBody(requestBody));
requestBuilder.headers(headers);
RequestBody requestBody;
if (!useGrpc) {
requestBody = new ProtoRequestBody(exportRequest);
if (compressionEnabled) {
requestBody = gzipRequestBody(requestBody);
}
} else {
requestBuilder.post(requestBody);
requestBody = new GrpcRequestBody(exportRequest, compressionEnabled);
}

requestBuilder.post(requestBody);

CompletableResultCode result = new CompletableResultCode();

client
Expand All @@ -118,7 +129,21 @@ public void onFailure(Call call, IOException e) {

@Override
public void onResponse(Call call, Response response) {
if (response.isSuccessful()) {
if (useGrpc) {
// Response body is empty but must be consumed to access trailers.
try {
response.body().bytes();
} catch (IOException e) {
logger.log(
Level.WARNING,
"Failed to export spans, could not consume server response.",
e);
result.fail();
return;
}
}

if (isSuccessful(response)) {
spansExportedSuccess.add(spans.size());
result.succeed();
return;
Expand All @@ -142,6 +167,28 @@ public void onResponse(Call call, Response response) {
return result;
}

private boolean isSuccessful(Response response) {
if (!response.isSuccessful()) {
return false;
}

if (!useGrpc) {
return true;
}

// Status can either be in the headers or trailers depending on error
String grpcStatus = response.header(GRPC_STATUS);
if (grpcStatus == null) {
try {
grpcStatus = response.trailers().get(GRPC_STATUS);
} catch (IOException e) {
// Could not read a status so assume the request failed.
return false;
}
}
return "0".equals(grpcStatus);
}

private static RequestBody gzipRequestBody(RequestBody requestBody) {
return new RequestBody() {
@Override
Expand All @@ -163,7 +210,23 @@ public void writeTo(BufferedSink bufferedSink) throws IOException {
};
}

private static String extractErrorStatus(Response response) {
private String extractErrorStatus(Response response) {
if (useGrpc) {
String message = response.header(GRPC_MESSAGE);
if (message == null) {
try {
message = response.trailers().get(GRPC_MESSAGE);
} catch (IOException e) {
// Fall through
}
}
if (message != null) {
return message;
}
// Couldn't get message for some reason, shouldn't happen in practice.
return "";
}

ResponseBody responseBody = response.body();
if (responseBody == null) {
return "Response body missing, HTTP status message: " + response.message();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,28 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
import javax.net.ssl.X509TrustManager;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;

/** Builder utility for {@link OtlpHttpSpanExporter}. */
public final class OtlpHttpSpanExporterBuilder {

private static final String GRPC_ENDPOINT_PATH =
"/opentelemetry.proto.collector.trace.v1.TraceService/Export";

private static final long DEFAULT_TIMEOUT_SECS = 10;
private static final String DEFAULT_ENDPOINT = "http://localhost:4317/v1/traces";

private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS);
private String endpoint = DEFAULT_ENDPOINT;
private boolean compressionEnabled = false;
@Nullable private Headers.Builder headersBuilder;
private final Headers.Builder headersBuilder = new Headers.Builder();
@Nullable private byte[] trustedCertificatesPem;

/**
Expand Down Expand Up @@ -90,9 +95,6 @@ public OtlpHttpSpanExporterBuilder setCompression(String compressionMethod) {

/** Add header to requests. */
public OtlpHttpSpanExporterBuilder addHeader(String key, String value) {
if (headersBuilder == null) {
headersBuilder = new Headers.Builder();
}
headersBuilder.add(key, value);
return this;
}
Expand Down Expand Up @@ -127,9 +129,28 @@ public OtlpHttpSpanExporter build() {
}
}

Headers headers = headersBuilder == null ? null : headersBuilder.build();
String endpoint = this.endpoint;

boolean useGrpc = endpoint.endsWith(GRPC_ENDPOINT_PATH);
if (useGrpc) {
if (endpoint.startsWith("http://")) {
clientBuilder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE));
} else {
clientBuilder.protocols(Collections.singletonList(Protocol.HTTP_2));
}

headersBuilder.add("te", "trailers");
if (compressionEnabled) {
headersBuilder.add("grpc-encoding", "gzip");
}
} else if (compressionEnabled) {
headersBuilder.add("Content-Encoding", "gzip");
}

Headers headers = headersBuilder.build();

return new OtlpHttpSpanExporter(clientBuilder.build(), endpoint, headers, compressionEnabled);
return new OtlpHttpSpanExporter(
clientBuilder.build(), endpoint, headers, compressionEnabled, useGrpc);
}

OtlpHttpSpanExporterBuilder() {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal.okhttp;

import io.opentelemetry.exporter.otlp.internal.Marshaler;
import java.io.IOException;
import javax.annotation.Nullable;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import okio.Buffer;
import okio.BufferedSink;
import okio.GzipSink;
import okio.Okio;

/**
* A {@link RequestBody} for reading from a {@link Marshaler} and writing in gRPC wire format.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class GrpcRequestBody extends RequestBody {

private static final int HEADER_LENGTH = 5;

private static final byte UNCOMPRESSED_FLAG = 0;
private static final byte COMPRESSED_FLAG = 1;

private static final MediaType GRPC_MEDIA_TYPE = MediaType.parse("application/grpc");

private final Marshaler marshaler;
private final int messageSize;
private final int contentLength;
private final boolean compressed;

/** Creates a new {@link GrpcRequestBody}. */
public GrpcRequestBody(Marshaler marshaler, boolean compressed) {
this.marshaler = marshaler;
this.compressed = compressed;

messageSize = marshaler.getBinarySerializedSize();
if (compressed) {
// Content length not known since we want to compress on the I/O thread.
contentLength = -1;
} else {
contentLength = HEADER_LENGTH + messageSize;
}
}

@Nullable
@Override
public MediaType contentType() {
return GRPC_MEDIA_TYPE;
}

@Override
public long contentLength() {
return contentLength;
}

@Override
public void writeTo(BufferedSink sink) throws IOException {
if (!compressed) {
sink.writeByte(UNCOMPRESSED_FLAG);
sink.writeInt(messageSize);
marshaler.writeBinaryTo(sink.outputStream());
} else {
try (Buffer compressedBody = new Buffer()) {
try (BufferedSink gzipSink = Okio.buffer(new GzipSink(compressedBody))) {
marshaler.writeBinaryTo(gzipSink.outputStream());
}
sink.writeByte(COMPRESSED_FLAG);
int compressedBytes = (int) compressedBody.size();
sink.writeInt(compressedBytes);
sink.write(compressedBody, compressedBytes);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal;
package io.opentelemetry.exporter.otlp.internal.okhttp;

import io.opentelemetry.exporter.otlp.internal.Marshaler;
import java.io.IOException;
import okhttp3.MediaType;
import okhttp3.RequestBody;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,22 @@ void testOtlpGrpcTraceExport() {
testTraceExport(otlpGrpcTraceExporter);
}

@Test
void testOtlpGrpcViaHttpTraceExport() {
SpanExporter otlpGrpcHttpTraceExporter =
OtlpHttpSpanExporter.builder()
.setEndpoint(
"http://"
+ collector.getHost()
+ ":"
+ collector.getMappedPort(COLLECTOR_OTLP_GRPC_PORT)
+ "/opentelemetry.proto.collector.trace.v1.TraceService/Export")
.setCompression("gzip")
.build();

testTraceExport(otlpGrpcHttpTraceExporter);
}

@Test
void testOtlpHttpTraceExport() {
SpanExporter otlpGrpcTraceExporter =
Expand Down

0 comments on commit 08fda86

Please sign in to comment.