diff --git a/dependencyManagement/build.gradle.kts b/dependencyManagement/build.gradle.kts index 73a8123633b..d7c5a40160a 100644 --- a/dependencyManagement/build.gradle.kts +++ b/dependencyManagement/build.gradle.kts @@ -19,7 +19,8 @@ val DEPENDENCY_BOMS = listOf( "com.google.protobuf:protobuf-bom:3.17.2", "com.fasterxml.jackson:jackson-bom:2.12.3", "org.junit:junit-bom:5.7.2", - "io.zipkin.reporter2:zipkin-reporter-bom:2.16.3" + "io.zipkin.reporter2:zipkin-reporter-bom:2.16.3", + "com.squareup.okhttp3:okhttp-bom:4.9.0" ) val DEPENDENCY_SETS = listOf( diff --git a/exporters/otlp-http/build.gradle.kts b/exporters/otlp-http/build.gradle.kts new file mode 100644 index 00000000000..d676b58d0ef --- /dev/null +++ b/exporters/otlp-http/build.gradle.kts @@ -0,0 +1,8 @@ +subprojects { + val proj = this + plugins.withId("java") { + configure { + archivesBaseName = "opentelemetry-exporter-otlp-http-${proj.name}" + } + } +} diff --git a/exporters/otlp-http/trace/README.md b/exporters/otlp-http/trace/README.md new file mode 100644 index 00000000000..3bf391a9666 --- /dev/null +++ b/exporters/otlp-http/trace/README.md @@ -0,0 +1,8 @@ +# OpenTelemetry - OTLP Trace Exporter - HTTP + +[![Javadocs][javadoc-image]][javadoc-url] + +This is the OpenTelemetry exporter, sending span data to OpenTelemetry collector via HTTP without gRPC. + +[javadoc-image]: https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporters-otlp.svg +[javadoc-url]: https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporters-otlp \ No newline at end of file diff --git a/exporters/otlp-http/trace/build.gradle.kts b/exporters/otlp-http/trace/build.gradle.kts new file mode 100644 index 00000000000..b286884a2b1 --- /dev/null +++ b/exporters/otlp-http/trace/build.gradle.kts @@ -0,0 +1,24 @@ +plugins { + id("otel.java-conventions") + // TODO: uncomment once ready to publish + // id("otel.publish-conventions") + + id("otel.animalsniffer-conventions") +} + +description = "OpenTelemetry Protocol HTTP Trace Exporter" +otelJava.moduleName.set("io.opentelemetry.exporter.otlp.http.trace") + +dependencies { + api(project(":sdk:trace")) + + implementation(project(":exporters:otlp:common")) + + implementation("com.squareup.okhttp3:okhttp") + implementation("com.squareup.okhttp3:okhttp-tls") + implementation("com.squareup.okio:okio") + + testImplementation(project(":sdk:testing")) + + testImplementation("com.linecorp.armeria:armeria-junit5") +} diff --git a/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java b/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java new file mode 100644 index 00000000000..b21b1833244 --- /dev/null +++ b/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java @@ -0,0 +1,225 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.http.trace; + +import com.google.rpc.Code; +import com.google.rpc.Status; +import io.opentelemetry.api.metrics.BoundLongCounter; +import io.opentelemetry.api.metrics.GlobalMeterProvider; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.common.Labels; +import io.opentelemetry.exporter.otlp.internal.SpanAdapter; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.internal.ThrottlingLogger; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.io.IOException; +import java.util.Collection; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.concurrent.ThreadSafe; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.Headers; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.ResponseBody; +import okio.BufferedSink; +import okio.GzipSink; +import okio.Okio; + +/** Exports spans using OTLP via HTTP, using OpenTelemetry's protobuf model. */ +@ThreadSafe +public final class OtlpHttpSpanExporter implements SpanExporter { + + private static final String EXPORTER_NAME = OtlpHttpSpanExporter.class.getSimpleName(); + private static final Labels EXPORTER_NAME_LABELS = Labels.of("exporter", EXPORTER_NAME); + private static final Labels EXPORT_SUCCESS_LABELS = + Labels.of("exporter", EXPORTER_NAME, "success", "true"); + private static final Labels EXPORT_FAILURE_LABELS = + Labels.of("exporter", EXPORTER_NAME, "success", "false"); + + private static final MediaType PROTOBUF_MEDIA_TYPE = MediaType.parse("application/x-protobuf"); + + private final ThrottlingLogger logger = + new ThrottlingLogger(Logger.getLogger(OtlpHttpSpanExporter.class.getName())); + + private final BoundLongCounter spansSeen; + private final BoundLongCounter spansExportedSuccess; + private final BoundLongCounter spansExportedFailure; + + private final OkHttpClient client; + private final String endpoint; + private final Headers headers; + private final boolean isCompressionEnabled; + + OtlpHttpSpanExporter( + OkHttpClient client, String endpoint, Headers headers, boolean isCompressionEnabled) { + Meter meter = GlobalMeterProvider.getMeter("io.opentelemetry.exporters.otlp-http"); + this.spansSeen = + meter.longCounterBuilder("spansSeenByExporter").build().bind(EXPORTER_NAME_LABELS); + LongCounter spansExportedCounter = meter.longCounterBuilder("spansExportedByExporter").build(); + this.spansExportedSuccess = spansExportedCounter.bind(EXPORT_SUCCESS_LABELS); + this.spansExportedFailure = spansExportedCounter.bind(EXPORT_FAILURE_LABELS); + + this.client = client; + this.endpoint = endpoint; + this.headers = headers; + this.isCompressionEnabled = isCompressionEnabled; + } + + /** + * Submits all the given spans in a single batch to the OpenTelemetry collector. + * + * @param spans the list of sampled Spans to be exported. + * @return the result of the operation + */ + @Override + public CompletableResultCode export(Collection spans) { + spansSeen.add(spans.size()); + ExportTraceServiceRequest exportTraceServiceRequest = + ExportTraceServiceRequest.newBuilder() + .addAllResourceSpans(SpanAdapter.toProtoResourceSpans(spans)) + .build(); + + Request.Builder requestBuilder = new Request.Builder().url(endpoint); + if (headers != null) { + requestBuilder.headers(headers); + } + RequestBody requestBody = + RequestBody.create(exportTraceServiceRequest.toByteArray(), PROTOBUF_MEDIA_TYPE); + if (isCompressionEnabled) { + requestBuilder.addHeader("Content-Encoding", "gzip"); + requestBuilder.post(gzipRequestBody(requestBody)); + } else { + requestBuilder.post(requestBody); + } + + CompletableResultCode result = new CompletableResultCode(); + + client + .newCall(requestBuilder.build()) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + spansExportedFailure.add(spans.size()); + result.fail(); + logger.log( + Level.SEVERE, + "Failed to export spans. The request could not be executed. Full error message: " + + e.getMessage()); + } + + @Override + public void onResponse(Call call, Response response) { + if (response.isSuccessful()) { + spansExportedSuccess.add(spans.size()); + result.succeed(); + return; + } + + spansExportedFailure.add(spans.size()); + int code = response.code(); + + Status status = extractErrorStatus(response); + + logger.log( + Level.WARNING, + "Failed to export spans. Server responded with HTTP status code " + + code + + ". Error message: " + + status.getMessage()); + result.fail(); + } + }); + + return result; + } + + private static RequestBody gzipRequestBody(RequestBody requestBody) { + return new RequestBody() { + @Override + public MediaType contentType() { + return requestBody.contentType(); + } + + @Override + public long contentLength() { + return -1; + } + + @Override + public void writeTo(BufferedSink bufferedSink) throws IOException { + BufferedSink gzipSink = Okio.buffer(new GzipSink(bufferedSink)); + requestBody.writeTo(gzipSink); + gzipSink.close(); + } + }; + } + + private static Status extractErrorStatus(Response response) { + ResponseBody responseBody = response.body(); + if (responseBody == null) { + return Status.newBuilder() + .setMessage("Response body missing, HTTP status message: " + response.message()) + .setCode(Code.UNKNOWN.getNumber()) + .build(); + } + try { + return Status.parseFrom(responseBody.bytes()); + } catch (IOException e) { + return Status.newBuilder() + .setMessage("Unable to parse response body, HTTP status message: " + response.message()) + .setCode(Code.UNKNOWN.getNumber()) + .build(); + } + } + + /** + * The OTLP exporter does not batch spans, so this method will immediately return with success. + * + * @return always Success + */ + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + /** + * Returns a new builder instance for this exporter. + * + * @return a new builder instance for this exporter. + */ + public static OtlpHttpSpanExporterBuilder builder() { + return new OtlpHttpSpanExporterBuilder(); + } + + /** + * Returns a new {@link OtlpHttpSpanExporter} using the default values. + * + * @return a new {@link OtlpHttpSpanExporter} instance. + */ + public static OtlpHttpSpanExporter getDefault() { + return builder().build(); + } + + /** Shutdown the exporter. */ + @Override + public CompletableResultCode shutdown() { + final CompletableResultCode result = CompletableResultCode.ofSuccess(); + client.dispatcher().cancelAll(); + this.spansSeen.unbind(); + this.spansExportedSuccess.unbind(); + this.spansExportedFailure.unbind(); + return result; + } +} diff --git a/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java b/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java new file mode 100644 index 00000000000..9985cbb2b29 --- /dev/null +++ b/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java @@ -0,0 +1,160 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.http.trace; + +import static io.opentelemetry.api.internal.Utils.checkArgument; +import static java.util.Objects.requireNonNull; + +import com.google.common.base.Preconditions; +import java.io.ByteArrayInputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import okhttp3.Headers; +import okhttp3.OkHttpClient; +import okhttp3.tls.HandshakeCertificates; + +/** Builder utility for {@link OtlpHttpSpanExporter}. */ +public final class OtlpHttpSpanExporterBuilder { + + private static final long DEFAULT_TIMEOUT_SECS = 10; + private static final String DEFAULT_ENDPOINT = "http://localhost:4317/v1/traces"; + + private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS); + private String endpoint = DEFAULT_ENDPOINT; + private boolean isCompressionEnabled = false; + @Nullable private Headers.Builder headersBuilder; + @Nullable private byte[] trustedCertificatesPem; + + /** + * Sets the maximum time to wait for the collector to process an exported batch of spans. If + * unset, defaults to {@value DEFAULT_TIMEOUT_SECS}s. + */ + public OtlpHttpSpanExporterBuilder setTimeout(long timeout, TimeUnit unit) { + requireNonNull(unit, "unit"); + checkArgument(timeout >= 0, "timeout must be non-negative"); + timeoutNanos = unit.toNanos(timeout); + return this; + } + + /** + * Sets the maximum time to wait for the collector to process an exported batch of spans. If + * unset, defaults to {@value DEFAULT_TIMEOUT_SECS}s. + */ + public OtlpHttpSpanExporterBuilder setTimeout(Duration timeout) { + requireNonNull(timeout, "timeout"); + return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + } + + /** + * Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT}. The + * endpoint must start with either http:// or https://, and include the full HTTP path. + */ + public OtlpHttpSpanExporterBuilder setEndpoint(String endpoint) { + requireNonNull(endpoint, "endpoint"); + + URI uri; + try { + uri = new URI(endpoint); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e); + } + + if (uri.getScheme() == null + || (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) { + throw new IllegalArgumentException( + "Invalid endpoint, must start with http:// or https://: " + uri); + } + + this.endpoint = endpoint; + return this; + } + + /** + * Sets the method used to compress payloads. If unset, compression is disabled. Currently the + * only supported compression method is "gzip". + */ + public OtlpHttpSpanExporterBuilder setCompression(String compressionMethod) { + requireNonNull(compressionMethod, "compressionMethod"); + Preconditions.checkArgument( + compressionMethod.equals("gzip"), + "Unsupported compression method. Supported compression methods include: gzip."); + this.isCompressionEnabled = true; + return this; + } + + /** Add header to requests. */ + public OtlpHttpSpanExporterBuilder addHeader(String key, String value) { + if (headersBuilder == null) { + headersBuilder = new Headers.Builder(); + } + headersBuilder.add(key, value); + return this; + } + + /** + * Sets the certificate chain to use for verifying servers when TLS is enabled. The {@code byte[]} + * should contain an X.509 certificate collection in PEM format. If not set, TLS connections will + * use the system default trusted certificates. + */ + public OtlpHttpSpanExporterBuilder setTrustedCertificates(byte[] trustedCertificatesPem) { + this.trustedCertificatesPem = trustedCertificatesPem; + return this; + } + + /** + * Constructs a new instance of the exporter based on the builder's values. + * + * @return a new exporter's instance + */ + public OtlpHttpSpanExporter build() { + OkHttpClient.Builder clientBuilder = + new OkHttpClient.Builder().callTimeout(Duration.ofNanos(timeoutNanos)); + + if (trustedCertificatesPem != null) { + try { + HandshakeCertificates handshakeCertificates = + toHandshakeCertificates(trustedCertificatesPem); + clientBuilder.sslSocketFactory( + handshakeCertificates.sslSocketFactory(), handshakeCertificates.trustManager()); + } catch (CertificateException e) { + throw new IllegalStateException( + "Could not set trusted certificate for OTLP HTTP connection, are they valid X.509 in PEM format?", + e); + } + } + + Headers headers = headersBuilder == null ? null : headersBuilder.build(); + + return new OtlpHttpSpanExporter(clientBuilder.build(), endpoint, headers, isCompressionEnabled); + } + + /** + * Extract X.509 certificates from the bytes. + * + * @param trustedCertificatesPem bytes containing an X.509 certificate collection in PEM format. + * @return a HandshakeCertificates with the certificates + * @throws CertificateException if an error occurs extracting certificates + */ + private static HandshakeCertificates toHandshakeCertificates(byte[] trustedCertificatesPem) + throws CertificateException { + ByteArrayInputStream is = new ByteArrayInputStream(trustedCertificatesPem); + CertificateFactory factory = CertificateFactory.getInstance("X.509"); + HandshakeCertificates.Builder certBuilder = new HandshakeCertificates.Builder(); + while (is.available() > 0) { + X509Certificate cert = (X509Certificate) factory.generateCertificate(is); + certBuilder.addTrustedCertificate(cert); + } + return certBuilder.build(); + } + + OtlpHttpSpanExporterBuilder() {} +} diff --git a/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/package-info.java b/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/package-info.java new file mode 100644 index 00000000000..06b040d8ac9 --- /dev/null +++ b/exporters/otlp-http/trace/src/main/java/io/opentelemetry/exporter/otlp/http/trace/package-info.java @@ -0,0 +1,10 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +/** OpenTelemetry exporter which sends span data to OpenTelemetry collector via OTLP HTTP. */ +@ParametersAreNonnullByDefault +package io.opentelemetry.exporter.otlp.http.trace; + +import javax.annotation.ParametersAreNonnullByDefault; diff --git a/exporters/otlp-http/trace/src/test/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterTest.java b/exporters/otlp-http/trace/src/test/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterTest.java new file mode 100644 index 00000000000..6fb7a7cb5cc --- /dev/null +++ b/exporters/otlp-http/trace/src/test/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterTest.java @@ -0,0 +1,248 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.http.trace; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.rpc.Status; +import com.linecorp.armeria.common.AggregatedHttpRequest; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension; +import io.github.netmikey.logunit.api.LogCapturer; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.exporter.otlp.internal.SpanAdapter; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; +import io.opentelemetry.sdk.testing.trace.TestSpanData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import okhttp3.tls.HeldCertificate; +import okio.Buffer; +import okio.GzipSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.event.Level; +import org.slf4j.event.LoggingEvent; + +class OtlpHttpSpanExporterTest { + + private static final MediaType APPLICATION_PROTOBUF = + MediaType.create("application", "x-protobuf"); + private static final HeldCertificate HELD_CERTIFICATE; + + static { + try { + HELD_CERTIFICATE = + new HeldCertificate.Builder() + .commonName("localhost") + .addSubjectAlternativeName(InetAddress.getByName("localhost").getCanonicalHostName()) + .build(); + } catch (UnknownHostException e) { + throw new IllegalStateException("Error building certificate.", e); + } + } + + @RegisterExtension + static MockWebServerExtension server = + new MockWebServerExtension() { + @Override + protected void configureServer(ServerBuilder sb) { + sb.tls(HELD_CERTIFICATE.keyPair().getPrivate(), HELD_CERTIFICATE.certificate()); + } + }; + + @RegisterExtension + LogCapturer logs = LogCapturer.create().captureForType(OtlpHttpSpanExporter.class); + + private OtlpHttpSpanExporterBuilder builder; + + @BeforeEach + void setup() { + builder = + OtlpHttpSpanExporter.builder() + .setEndpoint("https://localhost:" + server.httpsPort() + "/v1/traces") + .addHeader("foo", "bar") + .setTrustedCertificates( + HELD_CERTIFICATE.certificatePem().getBytes(StandardCharsets.UTF_8)); + } + + @Test + @SuppressWarnings("PreferJavaTimeOverload") + void invalidConfig() { + assertThatThrownBy(() -> OtlpHttpSpanExporter.builder().setTimeout(-1, TimeUnit.MILLISECONDS)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("timeout must be non-negative"); + assertThatThrownBy(() -> OtlpHttpSpanExporter.builder().setTimeout(1, null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("unit"); + assertThatThrownBy(() -> OtlpHttpSpanExporter.builder().setTimeout(null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("timeout"); + + assertThatThrownBy(() -> OtlpHttpSpanExporter.builder().setEndpoint(null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("endpoint"); + assertThatThrownBy(() -> OtlpHttpSpanExporter.builder().setEndpoint("😺://localhost")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid endpoint, must be a URL: 😺://localhost"); + assertThatThrownBy(() -> OtlpHttpSpanExporter.builder().setEndpoint("localhost")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid endpoint, must start with http:// or https://: localhost"); + assertThatThrownBy(() -> OtlpHttpSpanExporter.builder().setEndpoint("gopher://localhost")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid endpoint, must start with http:// or https://: gopher://localhost"); + + assertThatThrownBy(() -> OtlpHttpSpanExporter.builder().setCompression(null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("compressionMethod"); + assertThatThrownBy(() -> OtlpHttpSpanExporter.builder().setCompression("foo")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Unsupported compression method. Supported compression methods include: gzip."); + } + + @Test + void testExportUncompressed() { + server.enqueue(successResponse()); + OtlpHttpSpanExporter exporter = builder.build(); + + ExportTraceServiceRequest payload = exportAndAssertResult(exporter, /* expectedResult= */ true); + AggregatedHttpRequest request = server.takeRequest().request(); + assertRequestCommon(request); + assertThat(parseRequestBody(request.content().array())).isEqualTo(payload); + } + + @Test + void testExportGzipCompressed() { + server.enqueue(successResponse()); + OtlpHttpSpanExporter exporter = builder.setCompression("gzip").build(); + + ExportTraceServiceRequest payload = exportAndAssertResult(exporter, /* expectedResult= */ true); + AggregatedHttpRequest request = server.takeRequest().request(); + assertRequestCommon(request); + assertThat(request.headers().get("Content-Encoding")).isEqualTo("gzip"); + assertThat(parseRequestBody(gzipDecompress(request.content().array()))).isEqualTo(payload); + } + + private static void assertRequestCommon(AggregatedHttpRequest request) { + assertThat(request.method()).isEqualTo(HttpMethod.POST); + assertThat(request.path()).isEqualTo("/v1/traces"); + assertThat(request.headers().get("foo")).isEqualTo("bar"); + assertThat(request.headers().get("Content-Type")).isEqualTo(APPLICATION_PROTOBUF.toString()); + } + + private static ExportTraceServiceRequest parseRequestBody(byte[] bytes) { + try { + return ExportTraceServiceRequest.parseFrom(bytes); + } catch (InvalidProtocolBufferException e) { + throw new IllegalStateException("Unable to parse Protobuf request body.", e); + } + } + + private static byte[] gzipDecompress(byte[] bytes) { + try { + Buffer result = new Buffer(); + GzipSource source = new GzipSource(new Buffer().write(bytes)); + while (source.read(result, Integer.MAX_VALUE) != -1) {} + return result.readByteArray(); + } catch (IOException e) { + throw new IllegalStateException("Unable to decompress payload.", e); + } + } + + @Test + void testServerError() { + server.enqueue( + buildResponse( + HttpStatus.INTERNAL_SERVER_ERROR, + Status.newBuilder().setMessage("Server error!").build())); + OtlpHttpSpanExporter exporter = builder.build(); + + exportAndAssertResult(exporter, /* expectedResult= */ false); + LoggingEvent log = + logs.assertContains( + "Failed to export spans. Server responded with HTTP status code 500. Error message: Server error!"); + assertThat(log.getLevel()).isEqualTo(Level.WARN); + } + + @Test + void testServerErrorParseError() { + server.enqueue( + HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR, APPLICATION_PROTOBUF, "Server error!")); + OtlpHttpSpanExporter exporter = builder.build(); + + exportAndAssertResult(exporter, /* expectedResult= */ false); + LoggingEvent log = + logs.assertContains( + "Failed to export spans. Server responded with HTTP status code 500. Error message: Unable to parse response body, HTTP status message:"); + assertThat(log.getLevel()).isEqualTo(Level.WARN); + } + + private static ExportTraceServiceRequest exportAndAssertResult( + OtlpHttpSpanExporter otlpHttpSpanExporter, boolean expectedResult) { + List spans = Collections.singletonList(generateFakeSpan()); + CompletableResultCode resultCode = otlpHttpSpanExporter.export(spans); + resultCode.join(10, TimeUnit.SECONDS); + assertThat(resultCode.isSuccess()).isEqualTo(expectedResult); + return ExportTraceServiceRequest.newBuilder() + .addAllResourceSpans(SpanAdapter.toProtoResourceSpans(spans)) + .build(); + } + + private static HttpResponse successResponse() { + ExportTraceServiceResponse exportTraceServiceResponse = + ExportTraceServiceResponse.newBuilder().build(); + return buildResponse(HttpStatus.OK, exportTraceServiceResponse); + } + + private static HttpResponse buildResponse(HttpStatus httpStatus, T message) { + return HttpResponse.of(httpStatus, APPLICATION_PROTOBUF, message.toByteArray()); + } + + private static SpanData generateFakeSpan() { + long duration = TimeUnit.MILLISECONDS.toNanos(900); + long startNs = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); + long endNs = startNs + duration; + return TestSpanData.builder() + .setHasEnded(true) + .setSpanContext( + SpanContext.create( + "00000000000000000000000000abc123", + "0000000000def456", + TraceFlags.getDefault(), + TraceState.getDefault())) + .setName("GET /api/endpoint") + .setStartEpochNanos(startNs) + .setEndEpochNanos(endNs) + .setStatus(StatusData.ok()) + .setKind(SpanKind.SERVER) + .setLinks(Collections.emptyList()) + .setTotalRecordedLinks(0) + .setTotalRecordedEvents(0) + .setInstrumentationLibraryInfo( + InstrumentationLibraryInfo.create("testLib", "1.0", "http://url")) + .build(); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 7c22a242eff..09f29c9de29 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -46,6 +46,7 @@ include(":exporters:otlp:all") include(":exporters:otlp:common") include(":exporters:otlp:metrics") include(":exporters:otlp:trace") +include(":exporters:otlp-http:trace") include(":exporters:prometheus") include(":exporters:zipkin") include(":integration-tests")