Skip to content

Commit

Permalink
[otlp] Grpc Status check and retry (open-telemetry#6000)
Browse files Browse the repository at this point in the history
Co-authored-by: Mikel Blanchard <[email protected]>
  • Loading branch information
rajkumar-rangaraj and CodeBlanch authored Nov 26, 2024
1 parent f9a0b4c commit 88d2ad6
Show file tree
Hide file tree
Showing 16 changed files with 348 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie
/// <typeparam name="TRequest">Type of export request.</typeparam>
internal abstract class BaseOtlpGrpcExportClient<TRequest> : IExportClient<TRequest>
{
protected static readonly ExportClientGrpcResponse SuccessExportResponse = new ExportClientGrpcResponse(success: true, deadlineUtc: default, exception: null);
protected static readonly ExportClientGrpcResponse SuccessExportResponse
= new(
success: true,
deadlineUtc: default,
exception: null,
status: null,
grpcStatusDetailsHeader: null);

protected BaseOtlpGrpcExportClient(OtlpExporterOptions options)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient.Grpc;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

internal sealed class ExportClientGrpcResponse : ExportClientResponse
{
public ExportClientGrpcResponse(
bool success,
DateTime deadlineUtc,
Exception? exception)
Exception? exception,
Status? status,
string? grpcStatusDetailsHeader)
: base(success, deadlineUtc, exception)
{
this.Status = status;
this.GrpcStatusDetailsHeader = grpcStatusDetailsHeader;
}

public Status? Status { get; }

public string? GrpcStatusDetailsHeader { get; }
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics.CodeAnalysis;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

internal abstract class ExportClientResponse
Expand All @@ -14,7 +12,6 @@ protected ExportClientResponse(bool success, DateTime deadlineUtc, Exception? ex
this.DeadlineUtc = deadlineUtc;
}

[MemberNotNullWhen(false, nameof(Exception))]
public bool Success { get; }

public Exception? Exception { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Diagnostics.CodeAnalysis;
#if NET462
using System.Net.Http;
#endif
Expand All @@ -27,38 +26,30 @@ internal static class GrpcProtocolHelpers
{
internal const string StatusTrailer = "grpc-status";
internal const string MessageTrailer = "grpc-message";
internal const string CancelledDetail = "No grpc-status found on response.";

public static Status? GetResponseStatus(HttpHeaders trailingHeaders, HttpResponseMessage httpResponse)
public static Status GetResponseStatus(HttpResponseMessage httpResponse, HttpHeaders trailingHeaders)
{
Status? status;
try
{
var result = trailingHeaders.Any() ? TryGetStatusCore(trailingHeaders, out status) : TryGetStatusCore(httpResponse.Headers, out status);

if (!result)
{
status = new Status(StatusCode.Cancelled, CancelledDetail);
}
return trailingHeaders.Any()
? GetStatusCore(trailingHeaders)
: GetStatusCore(httpResponse.Headers);
}
catch (Exception ex)
{
// Handle error from parsing badly formed status
status = new Status(StatusCode.Cancelled, ex.Message, ex);
return new Status(StatusCode.Internal, ex.Message, ex);
}

return status;
}

public static bool TryGetStatusCore(HttpHeaders headers, [NotNullWhen(true)] out Status? status)
public static Status GetStatusCore(HttpHeaders headers)
{
var grpcStatus = GetHeaderValue(headers, StatusTrailer);

// grpc-status is a required trailer
if (grpcStatus == null)
{
status = null;
return false;
return Status.NoReply;
}

int statusValue;
Expand All @@ -79,8 +70,7 @@ public static bool TryGetStatusCore(HttpHeaders headers, [NotNullWhen(true)] out
grpcMessage = Uri.UnescapeDataString(grpcMessage);
}

status = new Status((StatusCode)statusValue, grpcMessage ?? string.Empty);
return true;
return new Status((StatusCode)statusValue, grpcMessage ?? string.Empty);
}

public static string? GetHeaderValue(HttpHeaders? headers, string name, bool first = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie
[DebuggerDisplay("{DebuggerToString(),nq}")]
internal struct Status
{
public const string NoReplyDetailMessage = "No grpc-status found on response.";

/// <summary>
/// Default result of a successful RPC. StatusCode=OK, empty details message.
/// </summary>
Expand All @@ -35,6 +37,11 @@ internal struct Status
/// </summary>
public static readonly Status DefaultCancelled = new Status(StatusCode.Cancelled, string.Empty);

/// <summary>
/// Default result of a cancelled RPC with no grpc-status found on response.
/// </summary>
public static readonly Status NoReply = new Status(StatusCode.Internal, NoReplyDetailMessage);

/// <summary>
/// Initializes a new instance of the <see cref="Status"/> struct.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public override ExportClientResponse SendExportRequest(OtlpCollector.ExportLogsS
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);

return new ExportClientGrpcResponse(success: false, deadlineUtc: deadlineUtc, exception: ex);
return new ExportClientGrpcResponse(success: false, deadlineUtc, ex, null, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public override ExportClientResponse SendExportRequest(OtlpCollector.ExportMetri
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);

return new ExportClientGrpcResponse(success: false, deadlineUtc: deadlineUtc, exception: ex);
return new ExportClientGrpcResponse(false, deadlineUtc, ex, null, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public override ExportClientResponse SendExportRequest(OtlpCollector.ExportTrace
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);

return new ExportClientGrpcResponse(success: false, deadlineUtc: deadlineUtc, exception: ex);
return new ExportClientGrpcResponse(false, deadlineUtc, ex, null, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,54 @@ public static bool ShouldHandleHttpRequestException(Exception? exception)

public static bool TryGetGrpcRetryResult(ExportClientGrpcResponse response, int retryDelayMilliseconds, out RetryResult retryResult)
{
retryResult = default;

if (response.Exception is RpcException rpcException)
{
return TryGetRetryResult(rpcException.StatusCode, IsGrpcStatusCodeRetryable, response.DeadlineUtc, rpcException.Trailers, TryGetGrpcRetryDelay, retryDelayMilliseconds, out retryResult);
}
else if (response.Status != null)
{
var nextRetryDelayMilliseconds = retryDelayMilliseconds;

if (IsDeadlineExceeded(response.DeadlineUtc))
{
return false;
}

var throttleDelay = Grpc.GrpcStatusDeserializer.TryGetGrpcRetryDelay(response.GrpcStatusDetailsHeader);
var retryable = IsGrpcStatusCodeRetryable(response.Status.Value.StatusCode, throttleDelay.HasValue);

if (!retryable)
{
return false;
}

var delayDuration = throttleDelay ?? TimeSpan.FromMilliseconds(GetRandomNumber(0, nextRetryDelayMilliseconds));

if (IsDeadlineExceeded(response.DeadlineUtc + delayDuration))
{
return false;
}

if (throttleDelay.HasValue)
{
try
{
// TODO: Consider making nextRetryDelayMilliseconds a double to avoid the need for convert/overflow handling
nextRetryDelayMilliseconds = Convert.ToInt32(throttleDelay.Value.TotalMilliseconds);
}
catch (OverflowException)
{
nextRetryDelayMilliseconds = MaxBackoffMilliseconds;
}
}

nextRetryDelayMilliseconds = CalculateNextRetryDelay(nextRetryDelayMilliseconds);
retryResult = new RetryResult(throttleDelay.HasValue, delayDuration, nextRetryDelayMilliseconds);
return true;
}

retryResult = default;
return false;
}

Expand Down Expand Up @@ -216,6 +258,24 @@ private static bool IsGrpcStatusCodeRetryable(StatusCode statusCode, bool hasRet
}
}

private static bool IsGrpcStatusCodeRetryable(Grpc.StatusCode statusCode, bool hasRetryDelay)
{
switch (statusCode)
{
case Grpc.StatusCode.Cancelled:
case Grpc.StatusCode.DeadlineExceeded:
case Grpc.StatusCode.Aborted:
case Grpc.StatusCode.OutOfRange:
case Grpc.StatusCode.Unavailable:
case Grpc.StatusCode.DataLoss:
return true;
case Grpc.StatusCode.ResourceExhausted:
return hasRetryDelay;
default:
return false;
}
}

private static bool IsHttpStatusCodeRetryable(HttpStatusCode statusCode, bool hasRetryDelay)
{
switch (statusCode)
Expand Down
Loading

0 comments on commit 88d2ad6

Please sign in to comment.