Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTLP retry #3636

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exporters/otlp/common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
compileOnly("io.grpc:grpc-netty-shaded")
compileOnly("io.grpc:grpc-okhttp")
compileOnly("io.grpc:grpc-stub")
compileOnly("net.jodah:failsafe:2.4.3")

annotationProcessor("com.google.auto.value:auto-value")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry;

import io.opentelemetry.exporter.otlp.internal.ExponentialBackoffConfig;
import java.time.Duration;

@SuppressWarnings("InterfaceWithOnlyStatics")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh. Let's not make it an interface, but a utility class. :)

public interface RetryConfig {

/**
* Create a retry config representing an exponential backoff policy.
*
* @param maxAttempts the max number of retries
* @param initialBackoff the wait period before attempting the first retry
* @param maxBackoff the maximum wait period before retry attempts
* @param backoffMultiplier the multiplication factor used to compute the delay between successive
* failures
* @param jitter the duration to randomly vary retry delays by
*/
static RetryConfig exponentialBackoff(
long maxAttempts,
Duration initialBackoff,
Duration maxBackoff,
double backoffMultiplier,
Duration jitter) {
return ExponentialBackoffConfig.create(
maxAttempts, initialBackoff, maxBackoff, backoffMultiplier, jitter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal;

import com.google.auto.value.AutoValue;
import io.opentelemetry.RetryConfig;
import java.time.Duration;
import javax.annotation.concurrent.Immutable;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
@AutoValue
@Immutable
public abstract class ExponentialBackoffConfig implements RetryConfig {

public static ExponentialBackoffConfig create(
long maxRetries,
Duration initialBackoff,
Duration maxBackoff,
double backoffMultiplier,
Duration jitter) {
return new AutoValue_ExponentialBackoffConfig(
maxRetries, initialBackoff, maxBackoff, backoffMultiplier, jitter);
}

/** The maximum number of retries. */
abstract long getMaxRetries();

/** The wait period before attempting the first retry. */
abstract Duration getInitialBackoff();

/** The maximum wait period before retry attempts. */
abstract Duration getMaxBackoff();

/** The multiplication factor used to compute the delay between successive failures. */
abstract double getBackoffMultiplier();

/** The duration to randomly vary retry delays by. */
abstract Duration getJitter();

ExponentialBackoffConfig() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal;

import static io.grpc.Status.Code.ABORTED;
import static io.grpc.Status.Code.CANCELLED;
import static io.grpc.Status.Code.DATA_LOSS;
import static io.grpc.Status.Code.DEADLINE_EXCEEDED;
import static io.grpc.Status.Code.OUT_OF_RANGE;
import static io.grpc.Status.Code.RESOURCE_EXHAUSTED;
import static io.grpc.Status.Code.UNAVAILABLE;

import io.grpc.Status;
import io.opentelemetry.RetryConfig;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import net.jodah.failsafe.RetryPolicy;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class RetryUtil {

private static final Set<Status.Code> RETRYABLE_CODES =
Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
CANCELLED,
DEADLINE_EXCEEDED,
RESOURCE_EXHAUSTED,
ABORTED,
OUT_OF_RANGE,
UNAVAILABLE,
DATA_LOSS)));

public static Set<Status.Code> retryableStatusCodes() {
return RETRYABLE_CODES;
}

public static boolean hasRetryableStatusCode(Throwable throwable) {
return RETRYABLE_CODES.contains(Status.fromThrowable(throwable).getCode());
}

/** Convert the retry config to a retry policy. */
public static <T> RetryPolicy<T> toRetryPolicy(RetryConfig retryConfig) {
ExponentialBackoffConfig exponentialBackoffConfig = (ExponentialBackoffConfig) retryConfig;
return new RetryPolicy<T>()
.withBackoff(
exponentialBackoffConfig.getInitialBackoff().toMillis(),
exponentialBackoffConfig.getMaxBackoff().toMillis(),
ChronoUnit.MILLIS,
exponentialBackoffConfig.getBackoffMultiplier())
.withMaxRetries((int) exponentialBackoffConfig.getMaxRetries())
.withJitter(exponentialBackoffConfig.getJitter());
}

private RetryUtil() {}
}
2 changes: 2 additions & 0 deletions exporters/otlp/trace/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ dependencies {
api("io.grpc:grpc-stub")
implementation("io.grpc:grpc-api")

implementation("net.jodah:failsafe:2.4.3")

testImplementation(project(":proto"))
testImplementation(project(":sdk:testing"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package io.opentelemetry.exporter.otlp.trace;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Codec;
import io.grpc.ManagedChannel;
Expand All @@ -17,6 +15,7 @@
import io.opentelemetry.api.metrics.GlobalMeterProvider;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.exporter.otlp.internal.RetryUtil;
import io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil;
import io.opentelemetry.exporter.otlp.internal.traces.TraceRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
Expand All @@ -27,8 +26,10 @@
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.FailsafeExecutor;
import net.jodah.failsafe.RetryPolicy;

/** Exports spans using OTLP via gRPC, using OpenTelemetry's protobuf model. */
@ThreadSafe
Expand All @@ -52,6 +53,7 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {

private final ManagedChannel managedChannel;
private final long timeoutNanos;
private final FailsafeExecutor<ExportTraceServiceResponse> executor;
private final BoundLongCounter spansSeen;
private final BoundLongCounter spansExportedSuccess;
private final BoundLongCounter spansExportedFailure;
Expand All @@ -64,7 +66,11 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {
* 0 or to a negative value, the exporter will wait indefinitely.
* @param compressionEnabled whether or not to enable gzip compression.
*/
OtlpGrpcSpanExporter(ManagedChannel channel, long timeoutNanos, boolean compressionEnabled) {
OtlpGrpcSpanExporter(
ManagedChannel channel,
long timeoutNanos,
boolean compressionEnabled,
RetryPolicy<ExportTraceServiceResponse> retryPolicy) {
// TODO: telemetry schema version.
Meter meter = GlobalMeterProvider.get().meterBuilder("io.opentelemetry.exporters.otlp").build();
this.spansSeen =
Expand All @@ -78,6 +84,10 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {
this.traceService =
MarshalerTraceServiceGrpc.newFutureStub(channel)
.withCompression(codec.getMessageEncoding());

executor =
Failsafe.with(retryPolicy.abortOn(t -> !RetryUtil.hasRetryableStatusCode(t)))
.with(MoreExecutors.directExecutor());
}

/**
Expand All @@ -87,62 +97,76 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {
* @return the result of the operation
*/
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public CompletableResultCode export(Collection<SpanData> spans) {
spansSeen.add(spans.size());
TraceRequestMarshaler request = TraceRequestMarshaler.create(spans);

final CompletableResultCode result = new CompletableResultCode();

MarshalerTraceServiceGrpc.TraceServiceFutureStub exporter;
if (timeoutNanos > 0) {
exporter = traceService.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS);
} else {
exporter = traceService;
}
MarshalerTraceServiceGrpc.TraceServiceFutureStub exporter =
timeoutNanos > 0
? traceService.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS)
: traceService;

Futures.addCallback(
exporter.export(request),
new FutureCallback<ExportTraceServiceResponse>() {
@Override
public void onSuccess(@Nullable ExportTraceServiceResponse response) {
spansExportedSuccess.add(spans.size());
result.succeed();
}

@Override
public void onFailure(Throwable t) {
spansExportedFailure.add(spans.size());
Status status = Status.fromThrowable(t);
switch (status.getCode()) {
case UNIMPLEMENTED:
executor
.getAsync(
(executionContext) -> {
Throwable t = executionContext.getLastFailure();
if (t != null) {
logger.log(
Level.SEVERE,
"Failed to export spans. Server responded with UNIMPLEMENTED. "
+ "This usually means that your collector is not configured with an otlp "
+ "receiver in the \"pipelines\" section of the configuration. "
+ "Full error message: "
Level.WARNING,
"Retrying span export (attempt "
+ executionContext.getAttemptCount()
+ 1
+ "). Last attempt error message: "
+ t.getMessage());
break;
case UNAVAILABLE:
logger.log(
Level.SEVERE,
"Failed to export spans. Server is UNAVAILABLE. "
+ "Make sure your collector is running and reachable from this network. "
+ "Full error message:"
+ t.getMessage());
break;
default:
logger.log(
Level.WARNING, "Failed to export spans. Error message: " + t.getMessage());
break;
}
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "Failed to export spans. Details follow: " + t);
}
result.fail();
}
},
MoreExecutors.directExecutor());
}

return exporter.export(request).get();
})
.whenComplete(
(response, throwable) -> {
if (throwable == null) {
spansExportedSuccess.add(spans.size());
result.succeed();
return;
}

Throwable t = throwable;
if (throwable.getCause() != null) {
t = throwable.getCause();
}
spansExportedFailure.add(spans.size());
Status status = Status.fromThrowable(t);
switch (status.getCode()) {
case UNIMPLEMENTED:
logger.log(
Level.SEVERE,
"Failed to export spans. Server responded with UNIMPLEMENTED. "
+ "This usually means that your collector is not configured with an otlp "
+ "receiver in the \"pipelines\" section of the configuration. "
+ "Full error message: "
+ t.getMessage());
break;
case UNAVAILABLE:
logger.log(
Level.SEVERE,
"Failed to export spans. Server is UNAVAILABLE. "
+ "Make sure your collector is running and reachable from this network. "
+ "Full error message:"
+ t.getMessage());
break;
default:
logger.log(
Level.WARNING, "Failed to export spans. Error message: " + t.getMessage());
break;
}
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "Failed to export spans. Details follow: " + t);
}
result.fail();
});
return result;
}

Expand Down
Loading