Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTLP retry #3636

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
Comparing source compatibility of against
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder setRetryPolicy(io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy)
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
Comparing source compatibility of against
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder setRetryPolicy(io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy)
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,31 @@

package io.opentelemetry.exporter.otlp.http.metrics;

import static io.opentelemetry.exporter.otlp.internal.http.OkHttpUtil.gzipRequestBody;
import static io.opentelemetry.exporter.otlp.internal.http.OkHttpUtil.toListenableFuture;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.opentelemetry.exporter.otlp.internal.ProtoRequestBody;
import io.opentelemetry.exporter.otlp.internal.grpc.GrpcStatusUtil;
import io.opentelemetry.exporter.otlp.internal.http.HttpStatusException;
import io.opentelemetry.exporter.otlp.internal.metrics.MetricsRequestMarshaler;
import io.opentelemetry.exporter.otlp.internal.retry.RetryExecutor;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
import java.util.Collection;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.BufferedSink;
import okio.GzipSink;
import okio.Okio;

/** Exports metrics using OTLP via HTTP, using OpenTelemetry's protobuf model. */
@ThreadSafe
Expand All @@ -44,13 +44,23 @@ public final class OtlpHttpMetricExporter implements MetricExporter {
private final String endpoint;
@Nullable private final Headers headers;
private final boolean compressionEnabled;
private final RetryExecutor retryExecutor;

OtlpHttpMetricExporter(
OkHttpClient client, String endpoint, @Nullable Headers headers, boolean compressionEnabled) {
OkHttpClient client,
String endpoint,
@Nullable Headers headers,
boolean compressionEnabled,
RetryPolicy retryPolicy) {
this.client = client;
this.endpoint = endpoint;
this.headers = headers;
this.compressionEnabled = compressionEnabled;
this.retryExecutor =
new RetryExecutor(
OtlpHttpMetricExporter.class.getSimpleName(),
retryPolicy,
t -> !(t instanceof HttpStatusException));
}

/**
Expand All @@ -77,76 +87,40 @@ public CompletableResultCode export(Collection<MetricData> metrics) {

CompletableResultCode result = new CompletableResultCode();

client
.newCall(requestBuilder.build())
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
logger.log(
Level.SEVERE,
"Failed to export metrics. The request could not be executed. Full error message: "
+ e.getMessage());
result.fail();
}

@Override
public void onResponse(Call call, Response response) {
if (response.isSuccessful()) {
result.succeed();
return;
}

int code = response.code();

String status = extractErrorStatus(response);

Futures.addCallback(
retryExecutor.submit(
retryContext -> {
int attemptCount = retryContext.getAttemptCount();
if (attemptCount > 0) {
Throwable lastAttemptFailure = retryContext.getLastAttemptFailure();
String message =
lastAttemptFailure == null ? "No error" : lastAttemptFailure.getMessage();
logger.log(
Level.WARNING,
"Failed to export metrics. Server responded with HTTP status code "
+ code
+ ". Error message: "
+ status);
result.fail();
"Retrying metric export (try "
+ (attemptCount + 1)
+ "). Last attempt error message: "
+ message);
}
});
return toListenableFuture(client.newCall(requestBuilder.build()));
}),
new FutureCallback<Response>() {
@Override
public void onSuccess(@Nullable Response response) {
result.succeed();
}

@Override
public void onFailure(Throwable t) {
logger.log(Level.WARNING, "Failed to export metrics. " + t.getMessage());
result.fail();
}
},
MoreExecutors.directExecutor());

return result;
}

private static RequestBody gzipRequestBody(RequestBody requestBody) {
return new RequestBody() {
@Override
public MediaType contentType() {
return requestBody.contentType();
}

@Override
public long contentLength() {
return -1;
}

@Override
public void writeTo(BufferedSink bufferedSink) throws IOException {
BufferedSink gzipSink = Okio.buffer(new GzipSink(bufferedSink));
requestBody.writeTo(gzipSink);
gzipSink.close();
}
};
}

private static String extractErrorStatus(Response response) {
ResponseBody responseBody = response.body();
if (responseBody == null) {
return "Response body missing, HTTP status message: " + response.message();
}
try {
return GrpcStatusUtil.getStatusMessage(responseBody.bytes());
} catch (IOException e) {
return "Unable to parse response body, HTTP status message: " + response.message();
}
}

/**
* The OTLP exporter does not batch metrics, so this method will immediately return with success.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;

import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.io.ByteArrayInputStream;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -32,6 +33,7 @@ public final class OtlpHttpMetricExporterBuilder {
private boolean compressionEnabled = false;
@Nullable private Headers.Builder headersBuilder;
@Nullable private byte[] trustedCertificatesPem;
private RetryPolicy retryPolicy = RetryPolicy.noRetry();

/**
* Sets the maximum time to wait for the collector to process an exported batch of metrics. If
Expand Down Expand Up @@ -109,6 +111,13 @@ public OtlpHttpMetricExporterBuilder setTrustedCertificates(byte[] trustedCertif
return this;
}

/** Set the retry policy. */
public OtlpHttpMetricExporterBuilder setRetryPolicy(RetryPolicy retryPolicy) {
requireNonNull(retryPolicy, "retryPolicy");
this.retryPolicy = retryPolicy;
return this;
}

/**
* Constructs a new instance of the exporter based on the builder's values.
*
Expand All @@ -133,7 +142,8 @@ public OtlpHttpMetricExporter build() {

Headers headers = headersBuilder == null ? null : headersBuilder.build();

return new OtlpHttpMetricExporter(clientBuilder.build(), endpoint, headers, compressionEnabled);
return new OtlpHttpMetricExporter(
clientBuilder.build(), endpoint, headers, compressionEnabled, retryPolicy);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.otlp.internal.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
Expand All @@ -38,6 +39,7 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -207,6 +209,24 @@ void testServerErrorParseError() {
assertThat(log.getLevel()).isEqualTo(Level.WARN);
}

@Test
void testRetryPolicy() {
OtlpHttpMetricExporter exporter =
builder
.setTimeout(Duration.ofSeconds(10))
.setRetryPolicy(
RetryPolicy.exponentialBackoff(4, Duration.ofMillis(100), Duration.ofSeconds(1), 2))
.build();

server.enqueue(
buildResponse(
HttpStatus.INTERNAL_SERVER_ERROR,
Status.newBuilder().setMessage("Server error!").build()));
server.enqueue(successResponse());

exportAndAssertResult(exporter, /* expectedResult= */ true);
}

private static ExportMetricsServiceRequest exportAndAssertResult(
OtlpHttpMetricExporter otlpHttpMetricExporter, boolean expectedResult) {
List<MetricData> metrics = Collections.singletonList(generateFakeMetric());
Expand Down
Loading