Skip to content

Commit

Permalink
Merge branch 'main' into server.port-always
Browse files Browse the repository at this point in the history
  • Loading branch information
vishweshbankwar authored Mar 12, 2024
2 parents 3986442 + ff6725f commit 3da6a7f
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ internal static class OtlpRetry

public static bool TryGetHttpRetryResult(ExportClientHttpResponse response, int retryDelayInMilliSeconds, out RetryResult retryResult)
{
retryResult = default;
if (response.StatusCode.HasValue)
{
return TryGetRetryResult(response.StatusCode.Value, IsHttpStatusCodeRetryable, response.DeadlineUtc, response.Headers, TryGetHttpRetryDelay, retryDelayInMilliSeconds, out retryResult);
Expand All @@ -73,6 +72,7 @@ public static bool TryGetHttpRetryResult(ExportClientHttpResponse response, int
}
}

retryResult = default;
return false;
}
}
Expand All @@ -83,9 +83,15 @@ public static bool ShouldHandleHttpRequestException(Exception? exception)
return true;
}

public static bool TryGetGrpcRetryResult(StatusCode statusCode, DateTime? deadline, Metadata trailers, int retryDelayMilliseconds, out RetryResult retryResult)
public static bool TryGetGrpcRetryResult(ExportClientGrpcResponse response, int retryDelayMilliseconds, out RetryResult retryResult)
{
return TryGetRetryResult(statusCode, IsGrpcStatusCodeRetryable, deadline, trailers, TryGetGrpcRetryDelay, retryDelayMilliseconds, out retryResult);
if (response.Exception is RpcException rpcException)
{
return TryGetRetryResult(rpcException.StatusCode, IsGrpcStatusCodeRetryable, response.DeadlineUtc, rpcException.Trailers, TryGetGrpcRetryDelay, retryDelayMilliseconds, out retryResult);
}

retryResult = default;
return false;
}

private static bool TryGetRetryResult<TStatusCode, TCarrier>(TStatusCode statusCode, Func<TStatusCode, bool, bool> isRetryable, DateTime? deadline, TCarrier carrier, Func<TStatusCode, TCarrier, TimeSpan?> throttleGetter, int nextRetryDelayMilliseconds, out RetryResult retryResult)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;

internal sealed class OtlpExporterRetryTransmissionHandler<TRequest> : OtlpExporterTransmissionHandler<TRequest>
{
internal OtlpExporterRetryTransmissionHandler(IExportClient<TRequest> exportClient, double timeoutMilliseconds)
: base(exportClient, timeoutMilliseconds)
{
}

protected override bool OnSubmitRequestFailure(TRequest request, ExportClientResponse response)
{
var nextRetryDelayMilliseconds = OtlpRetry.InitialBackoffMilliseconds;
while (RetryHelper.ShouldRetryRequest(request, response, nextRetryDelayMilliseconds, out var retryResult))
{
// Note: This delay cannot exceed the configured timeout period for otlp exporter.
// If the backend responds with `RetryAfter` duration that would result in exceeding the configured timeout period
// we would fail fast and drop the data.
Thread.Sleep(retryResult.RetryDelay);

if (this.TryRetryRequest(request, response.DeadlineUtc, out response))
{
return true;
}

nextRetryDelayMilliseconds = retryResult.NextRetryDelayMilliseconds;
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;

internal static class RetryHelper
{
internal static bool ShouldRetryRequest<TRequest>(TRequest request, ExportClientResponse response, int retryDelayMilliseconds, out OtlpRetry.RetryResult retryResult)
{
if (response is ExportClientGrpcResponse grpcResponse)
{
if (OtlpRetry.TryGetGrpcRetryResult(grpcResponse, retryDelayMilliseconds, out retryResult))
{
return true;
}
}
else if (response is ExportClientHttpResponse httpResponse)
{
if (OtlpRetry.TryGetHttpRetryResult(httpResponse, retryDelayMilliseconds, out retryResult))
{
return true;
}
}

retryResult = default;
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@

#if !NETFRAMEWORK
using System.Diagnostics;
using System.Net;
using Grpc.Core;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetry.Metrics;
using OpenTelemetry.Proto.Collector.Trace.V1;
using OpenTelemetry.Tests;
Expand All @@ -18,21 +23,28 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests;

public sealed class MockCollectorIntegrationTests
{
private static int gRPCPort = 4317;
private static int httpPort = 5051;

[Fact]
public async Task TestRecoveryAfterFailedExport()
{
var testGrpcPort = Interlocked.Increment(ref gRPCPort);
var testHttpPort = Interlocked.Increment(ref httpPort);

using var host = await new HostBuilder()
.ConfigureWebHostDefaults(webBuilder => webBuilder
.ConfigureKestrel(options =>
{
options.ListenLocalhost(5050, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1);
options.ListenLocalhost(4317, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2);
options.ListenLocalhost(testHttpPort, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1);
options.ListenLocalhost(testGrpcPort, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2);
})
.ConfigureServices(services =>
{
services.AddSingleton(new MockCollectorState());
services.AddGrpc();
})
.ConfigureLogging(loggingBuilder => loggingBuilder.ClearProviders())
.Configure(app =>
{
app.UseRouting();
Expand All @@ -52,13 +64,13 @@ public async Task TestRecoveryAfterFailedExport()
}))
.StartAsync();

var httpClient = new HttpClient() { BaseAddress = new Uri("http://localhost:5050") };
using var httpClient = new HttpClient() { BaseAddress = new Uri($"http://localhost:{testHttpPort}") };

var codes = new[] { Grpc.Core.StatusCode.Unimplemented, Grpc.Core.StatusCode.OK };
await httpClient.GetAsync($"/MockCollector/SetResponseCodes/{string.Join(",", codes.Select(x => (int)x))}");

var exportResults = new List<ExportResult>();
var otlpExporter = new OtlpTraceExporter(new OtlpExporterOptions() { Endpoint = new Uri("http://localhost:4317") });
var otlpExporter = new OtlpTraceExporter(new OtlpExporterOptions() { Endpoint = new Uri($"http://localhost:{testGrpcPort}") });
var delegatingExporter = new DelegatingExporter<Activity>
{
OnExportFunc = (batch) =>
Expand Down Expand Up @@ -91,6 +103,203 @@ public async Task TestRecoveryAfterFailedExport()
await host.StopAsync();
}

// For `Grpc.Core.StatusCode.DeadlineExceeded`
// See https://github.com/open-telemetry/opentelemetry-dotnet/issues/5436.
[Theory]
[InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.Unavailable)]
[InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.Cancelled)]
[InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.Aborted)]
[InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.OutOfRange)]
[InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.DataLoss)]
[InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.Internal)]
[InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.InvalidArgument)]
[InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.Unimplemented)]
[InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.FailedPrecondition)]
[InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.PermissionDenied)]
[InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.Unauthenticated)]
[InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.DeadlineExceeded)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.Unavailable)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.Cancelled)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.Aborted)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.OutOfRange)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.DataLoss)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.Internal)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.InvalidArgument)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.FailedPrecondition)]
[InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.DeadlineExceeded)]
public async Task GrpcRetryTests(bool useRetryTransmissionHandler, ExportResult expectedResult, Grpc.Core.StatusCode initialStatusCode)
{
var testGrpcPort = Interlocked.Increment(ref gRPCPort);
var testHttpPort = Interlocked.Increment(ref httpPort);

using var host = await new HostBuilder()
.ConfigureWebHostDefaults(webBuilder => webBuilder
.ConfigureKestrel(options =>
{
options.ListenLocalhost(testHttpPort, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1);
options.ListenLocalhost(testGrpcPort, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2);
})
.ConfigureServices(services =>
{
services.AddSingleton(new MockCollectorState());
services.AddGrpc();
})
.ConfigureLogging(loggingBuilder => loggingBuilder.ClearProviders())
.Configure(app =>
{
app.UseRouting();

app.UseEndpoints(endpoints =>
{
endpoints.MapGet(
"/MockCollector/SetResponseCodes/{responseCodesCsv}",
(MockCollectorState collectorState, string responseCodesCsv) =>
{
var codes = responseCodesCsv.Split(",").Select(x => int.Parse(x)).ToArray();
collectorState.SetStatusCodes(codes);
});

endpoints.MapGrpcService<MockTraceService>();
});
}))
.StartAsync();

using var httpClient = new HttpClient() { BaseAddress = new Uri($"http://localhost:{testHttpPort}") };

// First reply with failure and then Ok
var codes = new[] { initialStatusCode, Grpc.Core.StatusCode.OK };
await httpClient.GetAsync($"/MockCollector/SetResponseCodes/{string.Join(",", codes.Select(x => (int)x))}");

var endpoint = new Uri($"http://localhost:{testGrpcPort}");

var exporterOptions = new OtlpExporterOptions() { Endpoint = endpoint, TimeoutMilliseconds = 20000 };

var exportClient = new OtlpGrpcTraceExportClient(exporterOptions);

OtlpExporterTransmissionHandler<ExportTraceServiceRequest> transmissionHandler;

// TODO: update this to configure via experimental environment variable.
if (useRetryTransmissionHandler)
{
transmissionHandler = new OtlpExporterRetryTransmissionHandler<ExportTraceServiceRequest>(exportClient, exporterOptions.TimeoutMilliseconds);
}
else
{
transmissionHandler = new OtlpExporterTransmissionHandler<ExportTraceServiceRequest>(exportClient, exporterOptions.TimeoutMilliseconds);
}

var otlpExporter = new OtlpTraceExporter(exporterOptions, new(), transmissionHandler);

var activitySourceName = "otel.grpc.retry.test";
using var source = new ActivitySource(activitySourceName);

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(activitySourceName)
.Build();

using var activity = source.StartActivity("GrpcRetryTest");
activity.Stop();
using var batch = new Batch<Activity>([activity], 1);

var exportResult = otlpExporter.Export(batch);

Assert.Equal(expectedResult, exportResult);

await host.StopAsync();
}

[Theory]
[InlineData(true, ExportResult.Success, HttpStatusCode.ServiceUnavailable)]
[InlineData(true, ExportResult.Success, HttpStatusCode.BadGateway)]
[InlineData(true, ExportResult.Success, HttpStatusCode.GatewayTimeout)]
[InlineData(true, ExportResult.Failure, HttpStatusCode.BadRequest)]
[InlineData(true, ExportResult.Success, HttpStatusCode.TooManyRequests)]
[InlineData(false, ExportResult.Failure, HttpStatusCode.ServiceUnavailable)]
[InlineData(false, ExportResult.Failure, HttpStatusCode.BadGateway)]
[InlineData(false, ExportResult.Failure, HttpStatusCode.GatewayTimeout)]
[InlineData(false, ExportResult.Failure, HttpStatusCode.TooManyRequests)]
[InlineData(false, ExportResult.Failure, HttpStatusCode.BadRequest)]
public async Task HttpRetryTests(bool useRetryTransmissionHandler, ExportResult expectedResult, HttpStatusCode initialHttpStatusCode)
{
var testHttpPort = Interlocked.Increment(ref httpPort);

using var host = await new HostBuilder()
.ConfigureWebHostDefaults(webBuilder => webBuilder
.ConfigureKestrel(options =>
{
options.ListenLocalhost(testHttpPort, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1);
})
.ConfigureServices(services =>
{
services.AddSingleton(new MockCollectorHttpState());
})
.ConfigureLogging(loggingBuilder => loggingBuilder.ClearProviders())
.Configure(app =>
{
app.UseRouting();

app.UseEndpoints(endpoints =>
{
endpoints.MapGet(
"/MockCollector/SetResponseCodes/{responseCodesCsv}",
(MockCollectorHttpState collectorState, string responseCodesCsv) =>
{
var codes = responseCodesCsv.Split(",").Select(x => int.Parse(x)).ToArray();
collectorState.SetStatusCodes(codes);
});

endpoints.MapPost("/v1/traces", async ctx =>
{
var state = ctx.RequestServices.GetRequiredService<MockCollectorHttpState>();
ctx.Response.StatusCode = (int)state.NextStatus();

await ctx.Response.WriteAsync("Request Received.");
});
});
}))
.StartAsync();

using var httpClient = new HttpClient() { BaseAddress = new Uri($"http://localhost:{testHttpPort}") };

var codes = new[] { initialHttpStatusCode, HttpStatusCode.OK };
await httpClient.GetAsync($"/MockCollector/SetResponseCodes/{string.Join(",", codes.Select(x => (int)x))}");

var endpoint = new Uri($"http://localhost:{testHttpPort}/v1/traces");

var exporterOptions = new OtlpExporterOptions() { Endpoint = endpoint, TimeoutMilliseconds = 20000 };

var exportClient = new OtlpHttpTraceExportClient(exporterOptions, new HttpClient());

OtlpExporterTransmissionHandler<ExportTraceServiceRequest> transmissionHandler;

// TODO: update this to configure via experimental environment variable.
if (useRetryTransmissionHandler)
{
transmissionHandler = new OtlpExporterRetryTransmissionHandler<ExportTraceServiceRequest>(exportClient, exporterOptions.TimeoutMilliseconds);
}
else
{
transmissionHandler = new OtlpExporterTransmissionHandler<ExportTraceServiceRequest>(exportClient, exporterOptions.TimeoutMilliseconds);
}

var otlpExporter = new OtlpTraceExporter(exporterOptions, new(), transmissionHandler);

var activitySourceName = "otel.http.retry.test";
using var source = new ActivitySource(activitySourceName);

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(activitySourceName)
.Build();

using var activity = source.StartActivity("HttpRetryTest");
activity.Stop();
using var batch = new Batch<Activity>([activity], 1);

var exportResult = otlpExporter.Export(batch);

Assert.Equal(expectedResult, exportResult);
}

private class MockCollectorState
{
private Grpc.Core.StatusCode[] statusCodes = { };
Expand All @@ -110,6 +319,25 @@ public Grpc.Core.StatusCode NextStatus()
}
}

private class MockCollectorHttpState
{
private HttpStatusCode[] statusCodes = { };
private int statusCodeIndex = 0;

public void SetStatusCodes(int[] statusCodes)
{
this.statusCodeIndex = 0;
this.statusCodes = statusCodes.Select(x => (HttpStatusCode)x).ToArray();
}

public HttpStatusCode NextStatus()
{
return this.statusCodeIndex < this.statusCodes.Length
? this.statusCodes[this.statusCodeIndex++]
: HttpStatusCode.OK;
}
}

private class MockTraceService : TraceService.TraceServiceBase
{
private readonly MockCollectorState state;
Expand Down
Loading

0 comments on commit 3da6a7f

Please sign in to comment.