Skip to content

Commit

Permalink
Autoconfigure experimental OTLP retry (#3791)
Browse files Browse the repository at this point in the history
* Add retry policy class and wire into autoconfigure

* Wire up otlp retry for DefaultGrpcExporter

* Add documentation to autoconfigure readme

* Add more tests

* Move delegate accessor to :exporters:otlp:common

* Use jackson for json serialization

* PR feedback
  • Loading branch information
jack-berg authored Nov 9, 2021
1 parent c858e18 commit a1a45d2
Show file tree
Hide file tree
Showing 29 changed files with 713 additions and 152 deletions.
3 changes: 2 additions & 1 deletion exporters/otlp/common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ dependencies {
testImplementation(project(":sdk:logs"))
testImplementation(project(":sdk:testing"))

testImplementation("com.fasterxml.jackson.core:jackson-core")
testImplementation("com.fasterxml.jackson.core:jackson-databind")
testImplementation("com.google.protobuf:protobuf-java-util")
testImplementation("io.opentelemetry.proto:opentelemetry-proto")
testImplementation("org.skyscreamer:jsonassert")

testImplementation("com.google.api.grpc:proto-google-common-protos")
testImplementation("io.grpc:grpc-testing")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal;

import com.google.auto.value.AutoValue;
import java.time.Duration;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
@AutoValue
public abstract class RetryPolicy {

private static final RetryPolicy DEFAULT = new RetryPolicyBuilder().build();

RetryPolicy() {}

/** Return the default {@link RetryPolicy}. */
public static RetryPolicy getDefault() {
return DEFAULT;
}

/** Returns a new {@link RetryPolicyBuilder} to construct a {@link RetryPolicy}. */
public static RetryPolicyBuilder builder() {
return new RetryPolicyBuilder();
}

/** Returns the max number of attempts, including the original request. */
public abstract int getMaxAttempts();

/** Returns the initial backoff. */
public abstract Duration getInitialBackoff();

/** Returns the max backoff. */
public abstract Duration getMaxBackoff();

/** Returns the backoff multiplier. */
public abstract double getBackoffMultiplier();

static RetryPolicy create(
int maxAttempts, Duration initialBackoff, Duration maxBackoff, double backoffMultiplier) {
return new AutoValue_RetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal;

import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;

import java.time.Duration;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class RetryPolicyBuilder {

private static final int DEFAULT_MAX_ATTEMPTS = 5;
private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.ofSeconds(1);
private static final Duration DEFAULT_MAX_BACKOFF = Duration.ofSeconds(5);
private static final double DEFAULT_BACKOFF_MULTIPLIER = 1.5;

private int maxAttempts = DEFAULT_MAX_ATTEMPTS;
private Duration initialBackoff = DEFAULT_INITIAL_BACKOFF;
private Duration maxBackoff = DEFAULT_MAX_BACKOFF;
private double backoffMultiplier = DEFAULT_BACKOFF_MULTIPLIER;

RetryPolicyBuilder() {}

/**
* Set the maximum number of attempts, including the original request. Must be greater than 1 and
* less than 6.
*/
public RetryPolicyBuilder setMaxAttempts(int maxAttempts) {
checkArgument(
maxAttempts > 1 && maxAttempts < 6, "maxAttempts must be greater than 1 and less than 6");
this.maxAttempts = maxAttempts;
return this;
}

/** Set the initial backoff. Must be greater than 0. */
public RetryPolicyBuilder setInitialBackoff(Duration initialBackoff) {
requireNonNull(initialBackoff, "initialBackoff");
checkArgument(initialBackoff.toNanos() > 0, "initialBackoff must be greater than 0");
this.initialBackoff = initialBackoff;
return this;
}

/** Set the maximum backoff. Must be greater than 0. */
public RetryPolicyBuilder setMaxBackoff(Duration maxBackoff) {
requireNonNull(maxBackoff, "maxBackoff");
checkArgument(maxBackoff.toNanos() > 0, "maxBackoff must be greater than 0");
this.maxBackoff = maxBackoff;
return this;
}

/** Set the backoff multiplier. Must be greater than 0.0. */
public RetryPolicyBuilder setBackoffMultiplier(double backoffMultiplier) {
checkArgument(backoffMultiplier > 0, "backoffMultiplier must be greater than 0");
this.backoffMultiplier = backoffMultiplier;
return this;
}

/** Build and return a {@link RetryPolicy} with the values of this builder. */
public RetryPolicy build() {
return RetryPolicy.create(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
package io.opentelemetry.exporter.otlp.internal.grpc;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import static io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil.toServiceConfig;

import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
Expand All @@ -32,23 +35,27 @@ public final class DefaultGrpcExporterBuilder<T extends Marshaler>

private final String type;
private final Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>> stubFactory;
private final String grpcServiceName;

@Nullable private ManagedChannel channel;
private long timeoutNanos;
private URI endpoint;
private boolean compressionEnabled = false;
@Nullable private Metadata metadata;
@Nullable private byte[] trustedCertificatesPem;
@Nullable private RetryPolicy retryPolicy;

/** Creates a new {@link DefaultGrpcExporterBuilder}. */
// Visible for testing
public DefaultGrpcExporterBuilder(
String type,
Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>> stubFactory,
long defaultTimeoutSecs,
URI defaultEndpoint) {
URI defaultEndpoint,
String grpcServiceName) {
this.type = type;
this.stubFactory = stubFactory;
this.grpcServiceName = grpcServiceName;
timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs);
endpoint = defaultEndpoint;
}
Expand Down Expand Up @@ -110,6 +117,12 @@ public DefaultGrpcExporterBuilder<T> addHeader(String key, String value) {
return this;
}

@Override
public GrpcExporterBuilder<T> addRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return this;
}

@Override
public GrpcExporter<T> build() {
ManagedChannel channel = this.channel;
Expand Down Expand Up @@ -139,6 +152,10 @@ public GrpcExporter<T> build() {
}
}

if (retryPolicy != null) {
managedChannelBuilder.defaultServiceConfig(toServiceConfig(grpcServiceName, retryPolicy));
}

channel = managedChannelBuilder.build();
}

Expand All @@ -147,4 +164,25 @@ public GrpcExporter<T> build() {
stubFactory.apply(channel).withCompression(codec.getMessageEncoding());
return new DefaultGrpcExporter<>(type, channel, stub, timeoutNanos, compressionEnabled);
}

/**
* Reflectively access a {@link DefaultGrpcExporterBuilder} instance in field called "delegate" of
* the instance.
*
* @throws IllegalArgumentException if the instance does not contain a field called "delegate" of
* type {@link DefaultGrpcExporterBuilder}
*/
public static <T> DefaultGrpcExporterBuilder<?> getDelegateBuilder(Class<T> type, T instance) {
try {
Field field = type.getDeclaredField("delegate");
field.setAccessible(true);
Object value = field.get(instance);
if (!(value instanceof DefaultGrpcExporterBuilder)) {
throw new IllegalArgumentException("delegate field is not type DefaultGrpcExporterBuilder");
}
return (DefaultGrpcExporterBuilder<?>) value;
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new IllegalArgumentException("Unable to access delegate reflectively.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ static <T extends Marshaler> GrpcExporterBuilder<T> builder(
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>>> stubFactory,
String grpcServiceName,
String grpcEndpointPath) {
return GrpcExporterUtil.exporterBuilder(
type, defaultTimeoutSecs, defaultEndpoint, stubFactory, grpcEndpointPath);
type, defaultTimeoutSecs, defaultEndpoint, stubFactory, grpcServiceName, grpcEndpointPath);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

Expand All @@ -26,5 +27,7 @@ public interface GrpcExporterBuilder<T extends Marshaler> {

GrpcExporterBuilder<T> addHeader(String key, String value);

GrpcExporterBuilder<T> addRetryPolicy(RetryPolicy retryPolicy);

GrpcExporter<T> build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ static <T extends Marshaler> GrpcExporterBuilder<T> exporterBuilder(
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>>> stubFactory,
String grpcServiceName,
String grpcEndpointPath) {
if (USE_OKHTTP) {
return new OkHttpGrpcExporterBuilder<>(
type, grpcEndpointPath, defaultTimeoutSecs, defaultEndpoint);
} else {
return new DefaultGrpcExporterBuilder<>(
type, stubFactory.get(), defaultTimeoutSecs, defaultEndpoint);
type, stubFactory.get(), defaultTimeoutSecs, defaultEndpoint, grpcServiceName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,21 @@
package io.opentelemetry.exporter.otlp.internal.grpc;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status.Code;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.TlsUtil;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -29,6 +37,16 @@ public final class ManagedChannelUtil {

private static final Logger logger = Logger.getLogger(ManagedChannelUtil.class.getName());

private static final List<Code> RETRYABLE_STATUS_CODES =
Arrays.asList(
Code.CANCELLED,
Code.DEADLINE_EXCEEDED,
Code.RESOURCE_EXHAUSTED,
Code.ABORTED,
Code.OUT_OF_RANGE,
Code.UNAVAILABLE,
Code.DATA_LOSS);

/**
* Configure the channel builder to trust the certificates. The {@code byte[]} should contain an
* X.509 certificate collection in PEM format.
Expand Down Expand Up @@ -70,6 +88,34 @@ public static void setTrustedCertificatesPem(
}
}

/**
* Convert the {@link RetryPolicy} into a gRPC service config for the {@code serviceName}. The
* resulting map can be passed to {@link ManagedChannelBuilder#defaultServiceConfig(Map)}.
*/
public static Map<String, ?> toServiceConfig(String serviceName, RetryPolicy retryPolicy) {
List<Double> retryableStatusCodes =
RETRYABLE_STATUS_CODES.stream().map(Code::value).map(i -> (double) i).collect(toList());

Map<String, Object> retryConfig = new HashMap<>();
retryConfig.put("retryableStatusCodes", retryableStatusCodes);
retryConfig.put("maxAttempts", (double) retryPolicy.getMaxAttempts());
retryConfig.put("initialBackoff", retryPolicy.getInitialBackoff().toMillis() / 1000.0 + "s");
retryConfig.put("maxBackoff", retryPolicy.getMaxBackoff().toMillis() / 1000.0 + "s");
retryConfig.put("backoffMultiplier", retryPolicy.getBackoffMultiplier());

Map<String, Object> methodConfig = new HashMap<>();
methodConfig.put(
"name", Collections.singletonList(Collections.singletonMap("service", serviceName)));
methodConfig.put("retryPolicy", retryConfig);

return Collections.singletonMap("methodConfig", Collections.singletonList(methodConfig));
}

/** Return the list of gRPC status codes that are retryable in OTLP. */
public static List<Code> retryableStatusCodes() {
return RETRYABLE_STATUS_CODES;
}

/** Shutdown the gRPC channel. */
public static CompletableResultCode shutdownChannel(ManagedChannel managedChannel) {
final CompletableResultCode result = new CompletableResultCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.TlsUtil;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -102,6 +103,11 @@ public OkHttpGrpcExporterBuilder<T> addHeader(String key, String value) {
return this;
}

@Override
public GrpcExporterBuilder<T> addRetryPolicy(RetryPolicy retryPolicy) {
throw new UnsupportedOperationException("Only available on DefaultGrpcExporter");
}

@Override
public GrpcExporter<T> build() {
OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
Expand Down
Loading

0 comments on commit a1a45d2

Please sign in to comment.