Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[otlp] Add Retry Handler #5433

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
09d71ca
add retry handler with tests
vishweshbankwar Mar 9, 2024
c749edf
Merge branch 'main' into vibankwa/add-retryhandler-with-tests
vishweshbankwar Mar 11, 2024
5b28c49
Merge branch 'main' into vibankwa/add-retryhandler-with-tests
vishweshbankwar Mar 11, 2024
403544c
add code comments
vishweshbankwar Mar 11, 2024
051bb0d
Update src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementatio…
vishweshbankwar Mar 11, 2024
e9013ee
refactor
vishweshbankwar Mar 11, 2024
2d87734
Merge branch 'vibankwa/add-retryhandler-with-tests' of https://github…
vishweshbankwar Mar 11, 2024
9e82b49
add issue comment
vishweshbankwar Mar 11, 2024
ad42732
Update src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementatio…
vishweshbankwar Mar 11, 2024
36ade27
fix build
vishweshbankwar Mar 11, 2024
fa53f57
Merge branch 'vibankwa/add-retryhandler-with-tests' of https://github…
vishweshbankwar Mar 11, 2024
3e4ed94
Merge branch 'main' into vibankwa/add-retryhandler-with-tests
vishweshbankwar Mar 11, 2024
a40aa16
refactor
vishweshbankwar Mar 11, 2024
af1a7c3
Merge branch 'vibankwa/add-retryhandler-with-tests' of https://github…
vishweshbankwar Mar 11, 2024
c4ed362
feedback
vishweshbankwar Mar 12, 2024
c704473
Update test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/MockCo…
vishweshbankwar Mar 12, 2024
ef6a1ec
update test
vishweshbankwar Mar 12, 2024
2d756ae
Merge branch 'main' into vibankwa/add-retryhandler-with-tests
utpilla Mar 12, 2024
deeb6dd
clear logging providers
vishweshbankwar Mar 12, 2024
572f529
Merge branch 'vibankwa/add-retryhandler-with-tests' of https://github…
vishweshbankwar Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ internal static class OtlpRetry

public static bool TryGetHttpRetryResult(ExportClientHttpResponse response, int retryDelayInMilliSeconds, out RetryResult retryResult)
{
retryResult = default;
if (response.StatusCode.HasValue)
{
return TryGetRetryResult(response.StatusCode.Value, IsHttpStatusCodeRetryable, response.DeadlineUtc, response.Headers, TryGetHttpRetryDelay, retryDelayInMilliSeconds, out retryResult);
Expand All @@ -73,6 +72,7 @@ public static bool TryGetHttpRetryResult(ExportClientHttpResponse response, int
}
}

retryResult = default;
return false;
}
}
Expand All @@ -83,9 +83,15 @@ public static bool ShouldHandleHttpRequestException(Exception? exception)
return true;
}

public static bool TryGetGrpcRetryResult(StatusCode statusCode, DateTime? deadline, Metadata trailers, int retryDelayMilliseconds, out RetryResult retryResult)
public static bool TryGetGrpcRetryResult(ExportClientGrpcResponse response, int retryDelayMilliseconds, out RetryResult retryResult)
{
return TryGetRetryResult(statusCode, IsGrpcStatusCodeRetryable, deadline, trailers, TryGetGrpcRetryDelay, retryDelayMilliseconds, out retryResult);
if (response.Exception is RpcException rpcException)
{
return TryGetRetryResult(rpcException.StatusCode, IsGrpcStatusCodeRetryable, response.DeadlineUtc, rpcException.Trailers, TryGetGrpcRetryDelay, retryDelayMilliseconds, out retryResult);
}

retryResult = default;
return false;
}

private static bool TryGetRetryResult<TStatusCode, TCarrier>(TStatusCode statusCode, Func<TStatusCode, bool, bool> isRetryable, DateTime? deadline, TCarrier carrier, Func<TStatusCode, TCarrier, TimeSpan?> throttleGetter, int nextRetryDelayMilliseconds, out RetryResult retryResult)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;

internal sealed class OtlpExporterRetryTransmissionHandler<TRequest> : OtlpExporterTransmissionHandler<TRequest>
{
internal OtlpExporterRetryTransmissionHandler(IExportClient<TRequest> exportClient, double timeoutMilliseconds)
: base(exportClient, timeoutMilliseconds)
{
}

protected override bool OnSubmitRequestFailure(TRequest request, ExportClientResponse response)
{
var nextRetryDelayMilliseconds = OtlpRetry.InitialBackoffMilliseconds;
while (RetryHelper.ShouldRetryRequest(request, response, nextRetryDelayMilliseconds, out var retryResult))
{
// Note: This delay cannot exceed the configured timeout period for otlp exporter.
// If the backend responds with `RetryAfter` duration that would result in exceeding the configured timeout period
// we would fail fast and drop the data.
Thread.Sleep(retryResult.RetryDelay);

if (this.TryRetryRequest(request, response.DeadlineUtc, out response))
{
return true;
}

nextRetryDelayMilliseconds = retryResult.NextRetryDelayMilliseconds;
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;

internal static class RetryHelper
{
internal static bool ShouldRetryRequest<TRequest>(TRequest request, ExportClientResponse response, int retryDelayMilliseconds, out OtlpRetry.RetryResult retryResult)
{
if (response is ExportClientGrpcResponse grpcResponse)
{
if (OtlpRetry.TryGetGrpcRetryResult(grpcResponse, retryDelayMilliseconds, out retryResult))
{
return true;
}
}
else if (response is ExportClientHttpResponse httpResponse)
{
if (OtlpRetry.TryGetHttpRetryResult(httpResponse, retryDelayMilliseconds, out retryResult))
{
return true;
}
}

retryResult = default;
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@

#if !NETFRAMEWORK
using System.Diagnostics;
using System.Net;
using Grpc.Core;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetry.Metrics;
using OpenTelemetry.Proto.Collector.Trace.V1;
using OpenTelemetry.Tests;
Expand All @@ -18,6 +22,9 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests;

public sealed class MockCollectorIntegrationTests
{
private static int gRPCPort = 4317;
utpilla marked this conversation as resolved.
Show resolved Hide resolved
private static int httpPort = 5051;

[Fact]
public async Task TestRecoveryAfterFailedExport()
{
Expand Down Expand Up @@ -91,6 +98,201 @@ public async Task TestRecoveryAfterFailedExport()
await host.StopAsync();
}

// For `Grpc.Core.StatusCode.DeadlineExceeded`
// See https://github.com/open-telemetry/opentelemetry-dotnet/issues/5436.
[Theory]
[InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.Unavailable)]
utpilla marked this conversation as resolved.
Show resolved Hide resolved
[InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.Cancelled)]
[InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.Aborted)]
[InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.OutOfRange)]
[InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.DataLoss)]
[InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.Internal)]
[InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.InvalidArgument)]
[InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.Unimplemented)]
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved
[InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.FailedPrecondition)]
[InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.PermissionDenied)]
[InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.Unauthenticated)]
[InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.DeadlineExceeded)]
utpilla marked this conversation as resolved.
Show resolved Hide resolved
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.Unavailable)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.Cancelled)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.Aborted)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.OutOfRange)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.DataLoss)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.Internal)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.InvalidArgument)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.FailedPrecondition)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.DeadlineExceeded)]
public async Task GrpcRetryTests(bool useRetryTransmissionHandler, ExportResult expectedResult, Grpc.Core.StatusCode initialStatusCode)
{
var testGrpcPort = Interlocked.Increment(ref gRPCPort);
var testHttpPort = Interlocked.Increment(ref httpPort);

using var host = await new HostBuilder()
.ConfigureWebHostDefaults(webBuilder => webBuilder
.ConfigureKestrel(options =>
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved
{
options.ListenLocalhost(testHttpPort, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1);
options.ListenLocalhost(testGrpcPort, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2);
})
.ConfigureServices(services =>
{
services.AddSingleton(new MockCollectorState());
services.AddGrpc();
})
.Configure(app =>
{
app.UseRouting();

app.UseEndpoints(endpoints =>
{
endpoints.MapGet(
"/MockCollector/SetResponseCodes/{responseCodesCsv}",
(MockCollectorState collectorState, string responseCodesCsv) =>
{
var codes = responseCodesCsv.Split(",").Select(x => int.Parse(x)).ToArray();
collectorState.SetStatusCodes(codes);
});

endpoints.MapGrpcService<MockTraceService>();
});
}))
.StartAsync();

using var httpClient = new HttpClient() { BaseAddress = new Uri($"http://localhost:{testHttpPort}") };

// First reply with failure and then Ok
var codes = new[] { initialStatusCode, Grpc.Core.StatusCode.OK };
await httpClient.GetAsync($"/MockCollector/SetResponseCodes/{string.Join(",", codes.Select(x => (int)x))}");

var endpoint = new Uri($"http://localhost:{testGrpcPort}");

var exporterOptions = new OtlpExporterOptions() { Endpoint = endpoint, TimeoutMilliseconds = 20000 };

var exportClient = new OtlpGrpcTraceExportClient(exporterOptions);

OtlpExporterTransmissionHandler<ExportTraceServiceRequest> transmissionHandler;

// TODO: update this to configure via experimental environment variable.
if (useRetryTransmissionHandler)
{
transmissionHandler = new OtlpExporterRetryTransmissionHandler<ExportTraceServiceRequest>(exportClient, exporterOptions.TimeoutMilliseconds);
}
else
{
transmissionHandler = new OtlpExporterTransmissionHandler<ExportTraceServiceRequest>(exportClient, exporterOptions.TimeoutMilliseconds);
}

var otlpExporter = new OtlpTraceExporter(exporterOptions, new(), transmissionHandler);

var activitySourceName = "otel.grpc.retry.test";
using var source = new ActivitySource(activitySourceName);

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(activitySourceName)
.Build();

var activity = source.StartActivity("GrpcRetryTest");
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved
activity.Stop();
var batch = new Batch<Activity>([activity], 1);
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved

var exportResult = otlpExporter.Export(batch);

Assert.Equal(expectedResult, exportResult);

await host.StopAsync();
}

[Theory]
[InlineData(true, ExportResult.Success, HttpStatusCode.ServiceUnavailable)]
[InlineData(true, ExportResult.Success, HttpStatusCode.BadGateway)]
[InlineData(true, ExportResult.Success, HttpStatusCode.GatewayTimeout)]
[InlineData(true, ExportResult.Failure, HttpStatusCode.BadRequest)]
[InlineData(false, ExportResult.Failure, HttpStatusCode.ServiceUnavailable)]
[InlineData(false, ExportResult.Failure, HttpStatusCode.BadGateway)]
[InlineData(false, ExportResult.Failure, HttpStatusCode.GatewayTimeout)]
[InlineData(true, ExportResult.Success, HttpStatusCode.TooManyRequests)]
[InlineData(false, ExportResult.Failure, HttpStatusCode.TooManyRequests)]
[InlineData(false, ExportResult.Failure, HttpStatusCode.BadRequest)]
public async Task HttpRetryTests(bool useRetryTransmissionHandler, ExportResult expectedResult, HttpStatusCode initialHttpStatusCode)
{
var testHttpPort = Interlocked.Increment(ref httpPort);

using var host = await new HostBuilder()
.ConfigureWebHostDefaults(webBuilder => webBuilder
.ConfigureKestrel(options =>
{
options.ListenLocalhost(testHttpPort, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1);
})
.ConfigureServices(services =>
{
services.AddSingleton(new MockCollectorHttpState());
})
.Configure(app =>
{
app.UseRouting();

app.UseEndpoints(endpoints =>
{
endpoints.MapGet(
"/MockCollector/SetResponseCodes/{responseCodesCsv}",
(MockCollectorHttpState collectorState, string responseCodesCsv) =>
{
var codes = responseCodesCsv.Split(",").Select(x => int.Parse(x)).ToArray();
collectorState.SetStatusCodes(codes);
});

endpoints.MapPost("/v1/traces", async ctx =>
{
var state = ctx.RequestServices.GetRequiredService<MockCollectorHttpState>();
ctx.Response.StatusCode = (int)state.NextStatus();

await ctx.Response.WriteAsync("Request Received.");
});
});
}))
.StartAsync();

var httpClient = new HttpClient() { BaseAddress = new Uri($"http://localhost:{testHttpPort}") };
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved

var codes = new[] { initialHttpStatusCode, HttpStatusCode.OK };
await httpClient.GetAsync($"/MockCollector/SetResponseCodes/{string.Join(",", codes.Select(x => (int)x))}");

var endpoint = new Uri($"http://localhost:{testHttpPort}/v1/traces");

var exporterOptions = new OtlpExporterOptions() { Endpoint = endpoint, TimeoutMilliseconds = 20000 };

var exportClient = new OtlpHttpTraceExportClient(exporterOptions, new HttpClient());

OtlpExporterTransmissionHandler<ExportTraceServiceRequest> transmissionHandler;

// TODO: update this to configure via experimental environment variable.
if (useRetryTransmissionHandler)
{
transmissionHandler = new OtlpExporterRetryTransmissionHandler<ExportTraceServiceRequest>(exportClient, exporterOptions.TimeoutMilliseconds);
}
else
{
transmissionHandler = new OtlpExporterTransmissionHandler<ExportTraceServiceRequest>(exportClient, exporterOptions.TimeoutMilliseconds);
}

var otlpExporter = new OtlpTraceExporter(exporterOptions, new(), transmissionHandler);

var activitySourceName = "otel.http.retry.test";
using var source = new ActivitySource(activitySourceName);

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(activitySourceName)
.Build();

var activity = source.StartActivity("HttpRetryTest");
activity.Stop();
var batch = new Batch<Activity>([activity], 1);

var exportResult = otlpExporter.Export(batch);

Assert.Equal(expectedResult, exportResult);
}

private class MockCollectorState
{
private Grpc.Core.StatusCode[] statusCodes = { };
Expand All @@ -110,6 +312,25 @@ public Grpc.Core.StatusCode NextStatus()
}
}

private class MockCollectorHttpState
{
private HttpStatusCode[] statusCodes = { };
private int statusCodeIndex = 0;

public void SetStatusCodes(int[] statusCodes)
{
this.statusCodeIndex = 0;
this.statusCodes = statusCodes.Select(x => (HttpStatusCode)x).ToArray();
}

public HttpStatusCode NextStatus()
{
return this.statusCodeIndex < this.statusCodes.Length
? this.statusCodes[this.statusCodeIndex++]
: HttpStatusCode.OK;
}
}

private class MockTraceService : TraceService.TraceServiceBase
{
private readonly MockCollectorState state;
Expand Down
Loading