Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Retries to gRPC version of OTLP Exporter #3883

Closed
wants to merge 15 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ Released 2022-Nov-07
scopes.
([3843](https://github.com/open-telemetry/opentelemetry-dotnet/pull/3843))

* Add ability to retry failed exports to the gRPC version of the exporter.
([#3883](https://github.com/open-telemetry/opentelemetry-dotnet/pull/3883))

## 1.4.0-beta.2

Released 2022-Oct-17
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
#if NETSTANDARD2_1 || NET6_0_OR_GREATER
using Grpc.Net.Client;
using Grpc.Net.Client.Configuration;
#endif
using LogOtlpCollector = OpenTelemetry.Proto.Collector.Logs.V1;
using MetricsOtlpCollector = OpenTelemetry.Proto.Collector.Metrics.V1;
Expand All @@ -30,6 +31,11 @@ namespace OpenTelemetry.Exporter
{
internal static class OtlpExporterOptionsExtensions
{
private const int RetryMaxAttempts = 5;
private const double RetryBackoffMultiplier = 1.5;
private static readonly TimeSpan RetryInitialBackoff = TimeSpan.FromSeconds(1);
private static readonly TimeSpan RetryMaxBackoff = TimeSpan.FromSeconds(5);

#if NETSTANDARD2_1 || NET6_0_OR_GREATER
public static GrpcChannel CreateChannel(this OtlpExporterOptions options)
#else
Expand All @@ -42,7 +48,38 @@ public static Channel CreateChannel(this OtlpExporterOptions options)
}

#if NETSTANDARD2_1 || NET6_0_OR_GREATER
return GrpcChannel.ForAddress(options.Endpoint);
var retryPolicy = new RetryPolicy
{
MaxAttempts = RetryMaxAttempts,
InitialBackoff = RetryInitialBackoff,
MaxBackoff = RetryMaxBackoff,
BackoffMultiplier = RetryBackoffMultiplier,

// See table showing gRPC codes and if they are retryable:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures
RetryableStatusCodes =
{
StatusCode.Cancelled,
StatusCode.DeadlineExceeded,
StatusCode.ResourceExhausted, // TODO: Investigate this one.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial assumption is that the gRPC client handles this. Can probably validate the behavior conforms with the specs requirements with the mock server.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StatusCode.Aborted,
StatusCode.OutOfRange,
StatusCode.Unavailable,
StatusCode.DataLoss,
},
};

var methodConfig = new MethodConfig
{
Names = { MethodName.Default },
RetryPolicy = retryPolicy,
};

var serviceConfig = new ServiceConfig { MethodConfigs = { methodConfig } };

var channelOptions = new GrpcChannelOptions { ServiceConfig = serviceConfig };

return GrpcChannel.ForAddress(options.Endpoint, channelOptions);
#else
ChannelCredentials channelCredentials;
if (options.Endpoint.Scheme == Uri.UriSchemeHttps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public async Task TestRecoveryAfterFailedExport()

var httpClient = new HttpClient() { BaseAddress = new System.Uri("http://localhost:5050") };

var codes = new[] { Grpc.Core.StatusCode.Unimplemented, Grpc.Core.StatusCode.OK };
var codes = new[] { Grpc.Core.StatusCode.Unimplemented };
await httpClient.GetAsync($"/MockCollector/SetResponseCodes/{string.Join(",", codes.Select(x => (int)x))}");

var exportResults = new List<ExportResult>();
Expand Down Expand Up @@ -108,6 +108,96 @@ public async Task TestRecoveryAfterFailedExport()
await host.StopAsync().ConfigureAwait(false);
}

[Theory]
[InlineData(ExportResult.Success, new Grpc.Core.StatusCode[] { })]
[InlineData(ExportResult.Success, new[] { Grpc.Core.StatusCode.Cancelled })]
[InlineData(ExportResult.Success, new[] { Grpc.Core.StatusCode.DeadlineExceeded })]
[InlineData(ExportResult.Success, new[] { Grpc.Core.StatusCode.ResourceExhausted })]
[InlineData(ExportResult.Success, new[] { Grpc.Core.StatusCode.Aborted })]
[InlineData(ExportResult.Success, new[] { Grpc.Core.StatusCode.OutOfRange })]
[InlineData(ExportResult.Success, new[] { Grpc.Core.StatusCode.Unavailable })]
[InlineData(ExportResult.Success, new[] { Grpc.Core.StatusCode.DataLoss })]

[InlineData(ExportResult.Failure, new[] { Grpc.Core.StatusCode.Cancelled, Grpc.Core.StatusCode.Cancelled, Grpc.Core.StatusCode.Cancelled, Grpc.Core.StatusCode.Cancelled, Grpc.Core.StatusCode.Cancelled })]

[InlineData(ExportResult.Failure, new[] { Grpc.Core.StatusCode.Unknown })]
[InlineData(ExportResult.Failure, new[] { Grpc.Core.StatusCode.InvalidArgument })]
[InlineData(ExportResult.Failure, new[] { Grpc.Core.StatusCode.NotFound })]
[InlineData(ExportResult.Failure, new[] { Grpc.Core.StatusCode.AlreadyExists })]
[InlineData(ExportResult.Failure, new[] { Grpc.Core.StatusCode.PermissionDenied })]
[InlineData(ExportResult.Failure, new[] { Grpc.Core.StatusCode.FailedPrecondition })]
[InlineData(ExportResult.Failure, new[] { Grpc.Core.StatusCode.Unimplemented })]
[InlineData(ExportResult.Failure, new[] { Grpc.Core.StatusCode.Internal })]
[InlineData(ExportResult.Failure, new[] { Grpc.Core.StatusCode.Unauthenticated })]

public async Task TestOtlpExporterRetry(ExportResult result, Grpc.Core.StatusCode[] codes)
{
using var host = await new HostBuilder()
.ConfigureWebHostDefaults(webBuilder => webBuilder
.ConfigureKestrel(options =>
{
options.ListenLocalhost(5050, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1);
options.ListenLocalhost(4317, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2);
})
.ConfigureServices(services =>
{
services.AddSingleton(new MockCollectorState());
services.AddGrpc();
})
.Configure(app =>
{
app.UseRouting();

app.UseEndpoints(endpoints =>
{
endpoints.MapGet(
"/MockCollector/SetResponseCodes/{responseCodesCsv}",
(MockCollectorState collectorState, string responseCodesCsv) =>
{
var codes = responseCodesCsv.Split(",").Select(x => int.Parse(x)).ToArray();
collectorState.SetStatusCodes(codes);
});

endpoints.MapGrpcService<MockTraceService>();
});
}))
.StartAsync();

var httpClient = new HttpClient() { BaseAddress = new System.Uri("http://localhost:5050") };
await httpClient.GetAsync($"/MockCollector/SetResponseCodes/{string.Join(",", codes.Select(x => (int)x))}");

var exportResults = new List<ExportResult>();
var otlpExporter = new OtlpTraceExporter(new OtlpExporterOptions()
{
Endpoint = new System.Uri("http://localhost:4317"),
});
var delegatingExporter = new DelegatingExporter<Activity>
{
OnExportFunc = (batch) =>
{
var result = otlpExporter.Export(batch);
exportResults.Add(result);
return result;
},
};

var activitySourceName = "otel.mock.collector.test";

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(activitySourceName)
.AddProcessor(new SimpleActivityExportProcessor(delegatingExporter))
.Build();

using var source = new ActivitySource(activitySourceName);

source.StartActivity().Stop();

Assert.Single(exportResults);
Assert.Equal(result, exportResults[0]);

await host.StopAsync().ConfigureAwait(false);
}

private class MockCollectorState
{
private Grpc.Core.StatusCode[] statusCodes = { };
Expand Down