From 1da02973283654190c59d566df283c2072c58238 Mon Sep 17 00:00:00 2001 From: Alan West <3676547+alanwest@users.noreply.github.com> Date: Tue, 29 Aug 2023 17:34:59 -0700 Subject: [PATCH] Implement OTLP retry mechanism (#4616) --- .../Implementation/ExportClient/OtlpRetry.cs | 251 ++++++++++++++++++ .../google/rpc/error_details.proto | 37 +++ .../Implementation/google/rpc/status.proto | 42 +++ .../OtlpRetryTests.cs | 217 +++++++++++++++ 4 files changed, 547 insertions(+) create mode 100644 src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpRetry.cs create mode 100644 src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/google/rpc/error_details.proto create mode 100644 src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/google/rpc/status.proto create mode 100644 test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpRetryTests.cs diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpRetry.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpRetry.cs new file mode 100644 index 00000000000..e65a1204925 --- /dev/null +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpRetry.cs @@ -0,0 +1,251 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System.Diagnostics; +using System.Net; +using System.Net.Http.Headers; +using Google.Rpc; +using Grpc.Core; +using Status = Google.Rpc.Status; + +namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; + +/// +/// Implementation of the OTLP retry policy used by both OTLP/gRPC and OTLP/HTTP. +/// +/// OTLP/gRPC +/// https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures +/// +/// OTLP/HTTP +/// https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures-1 +/// +/// The specification requires retries use an exponential backoff strategy, +/// but does not provide specifics for the implementation. As such, this +/// implementation is inspired by the retry strategy provided by +/// Grpc.Net.Client which implements the gRPC retry specification. +/// +/// Grpc.Net.Client retry implementation +/// https://github.com/grpc/grpc-dotnet/blob/83d12ea1cb628156c990243bc98699829b88738b/src/Grpc.Net.Client/Internal/Retry/RetryCall.cs#L94 +/// +/// gRPC retry specification +/// https://github.com/grpc/proposal/blob/master/A6-client-retries.md +/// +/// The gRPC retry specification outlines configurable parameters used in its +/// exponential backoff strategy: initial backoff, max backoff, backoff +/// multiplier, and max retry attempts. The OTLP specification does not declare +/// a similar set of parameters, so this implementation uses fixed settings. +/// Furthermore, since the OTLP spec does not specify a max number of attempts, +/// this implementation will retry until the deadline is reached. +/// +/// The throttling mechanism for OTLP differs from the throttling mechanism +/// described in the gRPC retry specification. See: +/// https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#otlpgrpc-throttling. +/// +internal static class OtlpRetry +{ + public const string GrpcStatusDetailsHeader = "grpc-status-details-bin"; + public const int InitialBackoffMilliseconds = 1000; + private const int MaxBackoffMilliseconds = 5000; + private const double BackoffMultiplier = 1.5; + +#if !NET6_0_OR_GREATER + private static readonly Random Random = new Random(); +#endif + + public static bool TryGetHttpRetryResult(HttpStatusCode statusCode, DateTime? deadline, HttpResponseHeaders responseHeaders, int retryDelayMilliseconds, out RetryResult retryResult) + { + return OtlpRetry.TryGetRetryResult(statusCode, IsHttpStatusCodeRetryable, deadline, responseHeaders, TryGetHttpRetryDelay, retryDelayMilliseconds, out retryResult); + } + + public static bool TryGetGrpcRetryResult(StatusCode statusCode, DateTime? deadline, Metadata trailers, int retryDelayMilliseconds, out RetryResult retryResult) + { + return OtlpRetry.TryGetRetryResult(statusCode, IsGrpcStatusCodeRetryable, deadline, trailers, TryGetGrpcRetryDelay, retryDelayMilliseconds, out retryResult); + } + + private static bool TryGetRetryResult(TStatusCode statusCode, Func isRetryable, DateTime? deadline, TCarrier carrier, Func throttleGetter, int nextRetryDelayMilliseconds, out RetryResult retryResult) + { + retryResult = default; + + // TODO: Consider introducing a fixed max number of retries (e.g. max 5 retries). + // The spec does not specify a max number of retries, but it may be bad to not cap the number of attempts. + // Without a max number of retry attempts, retries would continue until the deadline. + // Maybe this is ok? However, it may lead to an unexpected behavioral change. For example: + // 1) When using a batch processor, a longer delay due to repeated + // retries up to the deadline may lead to a higher chance that the queue will be exhausted. + // 2) When using the simple processor, a longer delay due to repeated + // retries up to the deadline will lead to a prolonged blocking call. + // if (attemptCount >= MaxAttempts) + // { + // return false + // } + + if (IsDeadlineExceeded(deadline)) + { + return false; + } + + var throttleDelay = throttleGetter(statusCode, carrier); + var retryable = isRetryable(statusCode, throttleDelay.HasValue); + if (!retryable) + { + return false; + } + + var delayDuration = throttleDelay.HasValue + ? throttleDelay.Value + : TimeSpan.FromMilliseconds(GetRandomNumber(0, nextRetryDelayMilliseconds)); + + if (deadline.HasValue && IsDeadlineExceeded(deadline + delayDuration)) + { + return false; + } + + if (throttleDelay.HasValue) + { + try + { + // TODO: Consider making nextRetryDelayMilliseconds a double to avoid the need for convert/overflow handling + nextRetryDelayMilliseconds = Convert.ToInt32(throttleDelay.Value.TotalMilliseconds); + } + catch (OverflowException) + { + nextRetryDelayMilliseconds = MaxBackoffMilliseconds; + } + } + + nextRetryDelayMilliseconds = CalculateNextRetryDelay(nextRetryDelayMilliseconds); + retryResult = new RetryResult(throttleDelay.HasValue, delayDuration, nextRetryDelayMilliseconds); + return true; + } + + private static bool IsDeadlineExceeded(DateTime? deadline) + { + // This implementation is internal, and it is guaranteed that deadline is UTC. + return deadline.HasValue && deadline <= DateTime.UtcNow; + } + + private static int CalculateNextRetryDelay(int nextRetryDelayMilliseconds) + { + var nextMilliseconds = nextRetryDelayMilliseconds * BackoffMultiplier; + nextMilliseconds = Math.Min(nextMilliseconds, MaxBackoffMilliseconds); + return Convert.ToInt32(nextMilliseconds); + } + + private static TimeSpan? TryGetGrpcRetryDelay(StatusCode statusCode, Metadata trailers) + { + Debug.Assert(trailers != null, "trailers was null"); + + if (statusCode != StatusCode.ResourceExhausted && statusCode != StatusCode.Unavailable) + { + return null; + } + + var statusDetails = trailers.Get(GrpcStatusDetailsHeader); + if (statusDetails != null && statusDetails.IsBinary) + { + var status = Status.Parser.ParseFrom(statusDetails.ValueBytes); + foreach (var item in status.Details) + { + var success = item.TryUnpack(out var retryInfo); + if (success) + { + return retryInfo.RetryDelay.ToTimeSpan(); + } + } + } + + return null; + } + + private static TimeSpan? TryGetHttpRetryDelay(HttpStatusCode statusCode, HttpResponseHeaders headers) + { + Debug.Assert(headers != null, "headers was null"); + +#if NETSTANDARD2_1_OR_GREATER || NET6_0_OR_GREATER + return statusCode == HttpStatusCode.TooManyRequests || statusCode == HttpStatusCode.ServiceUnavailable +#else + return statusCode == (HttpStatusCode)429 || statusCode == HttpStatusCode.ServiceUnavailable +#endif + ? headers.RetryAfter?.Delta + : null; + } + + private static bool IsGrpcStatusCodeRetryable(StatusCode statusCode, bool hasRetryDelay) + { + switch (statusCode) + { + case StatusCode.Cancelled: + case StatusCode.DeadlineExceeded: + case StatusCode.Aborted: + case StatusCode.OutOfRange: + case StatusCode.Unavailable: + case StatusCode.DataLoss: + return true; + case StatusCode.ResourceExhausted: + return hasRetryDelay; + default: + return false; + } + } + +#pragma warning disable SA1313 // Parameter should begin with lower-case letter + private static bool IsHttpStatusCodeRetryable(HttpStatusCode statusCode, bool _) +#pragma warning restore SA1313 // Parameter should begin with lower-case letter + { + switch (statusCode) + { +#if NETSTANDARD2_1_OR_GREATER || NET6_0_OR_GREATER + case HttpStatusCode.TooManyRequests: +#else + case (HttpStatusCode)429: +#endif + case HttpStatusCode.BadGateway: + case HttpStatusCode.ServiceUnavailable: + case HttpStatusCode.GatewayTimeout: + return true; + default: + return false; + } + } + + private static int GetRandomNumber(int min, int max) + { +#if NET6_0_OR_GREATER + return Random.Shared.Next(min, max); +#else + // TODO: Implement this better to minimize lock contention. + // Consider pulling in Random.Shared implementation. + lock (Random) + { + return Random.Next(min, max); + } +#endif + } + + public readonly struct RetryResult + { + public readonly bool Throttled; + public readonly TimeSpan RetryDelay; + public readonly int NextRetryDelayMilliseconds; + + public RetryResult(bool throttled, TimeSpan retryDelay, int nextRetryDelayMilliseconds) + { + this.Throttled = throttled; + this.RetryDelay = retryDelay; + this.NextRetryDelayMilliseconds = nextRetryDelayMilliseconds; + } + } +} diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/google/rpc/error_details.proto b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/google/rpc/error_details.proto new file mode 100644 index 00000000000..0d5461a7287 --- /dev/null +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/google/rpc/error_details.proto @@ -0,0 +1,37 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.rpc; + +import "google/protobuf/duration.proto"; + +// Describes when the clients can retry a failed request. Clients could ignore +// the recommendation here or retry when this information is missing from error +// responses. +// +// It's always recommended that clients should use exponential backoff when +// retrying. +// +// Clients should wait until `retry_delay` amount of time has passed since +// receiving the error response before retrying. If retrying requests also +// fail, clients should use an exponential backoff scheme to gradually increase +// the delay between retries based on `retry_delay`, until either a maximum +// number of retries have been reached or a maximum retry delay cap has been +// reached. +message RetryInfo { + // Clients should wait at least this long between retrying the same request. + google.protobuf.Duration retry_delay = 1; +} diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/google/rpc/status.proto b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/google/rpc/status.proto new file mode 100644 index 00000000000..40c5bf318c2 --- /dev/null +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/google/rpc/status.proto @@ -0,0 +1,42 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.rpc; + +import "google/protobuf/any.proto"; + +// The `Status` type defines a logical error model that is suitable for +// different programming environments, including REST APIs and RPC APIs. It is +// used by [gRPC](https://github.com/grpc). Each `Status` message contains +// three pieces of data: error code, error message, and error details. +// +// You can find out more about this error model and how to work with it in the +// [API Design Guide](https://cloud.google.com/apis/design/errors). +message Status { + // The status code, which should be an enum value of + // [google.rpc.Code][google.rpc.Code]. + int32 code = 1; + + // A developer-facing error message, which should be in English. Any + // user-facing error message should be localized and sent in the + // [google.rpc.Status.details][google.rpc.Status.details] field, or localized + // by the client. + string message = 2; + + // A list of messages that carry the error details. There is a common set of + // message types for APIs to use. + repeated google.protobuf.Any details = 3; +} diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpRetryTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpRetryTests.cs new file mode 100644 index 00000000000..a5909d763d9 --- /dev/null +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpRetryTests.cs @@ -0,0 +1,217 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Xunit; + +namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient.Tests +{ + public class OtlpRetryTests + { + public static IEnumerable GrpcRetryTestData => GrpcRetryTestCase.GetTestCases(); + + [Theory] + [MemberData(nameof(GrpcRetryTestData))] + public void TryGetGrpcRetryResultTest(GrpcRetryTestCase testCase) + { + var attempts = 0; + var nextRetryDelayMilliseconds = OtlpRetry.InitialBackoffMilliseconds; + + foreach (var retryAttempt in testCase.RetryAttempts) + { + ++attempts; + var statusCode = retryAttempt.RpcException.StatusCode; + var deadline = retryAttempt.CallOptions.Deadline; + var trailers = retryAttempt.RpcException.Trailers; + var success = OtlpRetry.TryGetGrpcRetryResult(statusCode, deadline, trailers, nextRetryDelayMilliseconds, out var retryResult); + + Assert.Equal(retryAttempt.ExpectedSuccess, success); + + if (!success) + { + Assert.Equal(testCase.ExpectedRetryAttempts, attempts); + break; + } + + if (retryResult.Throttled) + { + Assert.Equal(retryAttempt.ThrottleDelay, retryResult.RetryDelay); + } + else + { + Assert.True(retryResult.RetryDelay >= TimeSpan.Zero); + Assert.True(retryResult.RetryDelay < TimeSpan.FromMilliseconds(nextRetryDelayMilliseconds)); + } + + Assert.Equal(retryAttempt.ExpectedNextRetryDelayMilliseconds, retryResult.NextRetryDelayMilliseconds); + + nextRetryDelayMilliseconds = retryResult.NextRetryDelayMilliseconds; + } + + Assert.Equal(testCase.ExpectedRetryAttempts, attempts); + } + + public class GrpcRetryTestCase + { + public int ExpectedRetryAttempts; + public GrpcRetryAttempt[] RetryAttempts; + + private string testRunnerName; + + private GrpcRetryTestCase(string testRunnerName, GrpcRetryAttempt[] retryAttempts, int expectedRetryAttempts = 1) + { + this.ExpectedRetryAttempts = expectedRetryAttempts; + this.RetryAttempts = retryAttempts; + this.testRunnerName = testRunnerName; + } + + public static IEnumerable GetTestCases() + { + yield return new[] { new GrpcRetryTestCase("Cancelled", new GrpcRetryAttempt[] { new(StatusCode.Cancelled) }) }; + yield return new[] { new GrpcRetryTestCase("DeadlineExceeded", new GrpcRetryAttempt[] { new(StatusCode.DeadlineExceeded) }) }; + yield return new[] { new GrpcRetryTestCase("Aborted", new GrpcRetryAttempt[] { new(StatusCode.Aborted) }) }; + yield return new[] { new GrpcRetryTestCase("OutOfRange", new GrpcRetryAttempt[] { new(StatusCode.OutOfRange) }) }; + yield return new[] { new GrpcRetryTestCase("DataLoss", new GrpcRetryAttempt[] { new(StatusCode.DataLoss) }) }; + yield return new[] { new GrpcRetryTestCase("Unavailable", new GrpcRetryAttempt[] { new(StatusCode.Unavailable) }) }; + + yield return new[] { new GrpcRetryTestCase("OK", new GrpcRetryAttempt[] { new(StatusCode.OK, expectedSuccess: false) }) }; + yield return new[] { new GrpcRetryTestCase("PermissionDenied", new GrpcRetryAttempt[] { new(StatusCode.PermissionDenied, expectedSuccess: false) }) }; + yield return new[] { new GrpcRetryTestCase("Unknown", new GrpcRetryAttempt[] { new(StatusCode.Unknown, expectedSuccess: false) }) }; + + yield return new[] { new GrpcRetryTestCase("ResourceExhausted w/o RetryInfo", new GrpcRetryAttempt[] { new(StatusCode.ResourceExhausted, expectedSuccess: false) }) }; + yield return new[] { new GrpcRetryTestCase("ResourceExhausted w/ RetryInfo", new GrpcRetryAttempt[] { new(StatusCode.ResourceExhausted, throttleDelay: new Duration { Seconds = 2 }, expectedNextRetryDelayMilliseconds: 3000) }) }; + + yield return new[] { new GrpcRetryTestCase("Unavailable w/ RetryInfo", new GrpcRetryAttempt[] { new(StatusCode.Unavailable, throttleDelay: Duration.FromTimeSpan(TimeSpan.FromMilliseconds(2000)), expectedNextRetryDelayMilliseconds: 3000) }) }; + + yield return new[] { new GrpcRetryTestCase("Expired deadline", new GrpcRetryAttempt[] { new(StatusCode.Unavailable, deadlineExceeded: true, expectedSuccess: false) }) }; + + yield return new[] + { + new GrpcRetryTestCase( + "Exponential backoff", + new GrpcRetryAttempt[] + { + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 1500), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 2250), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 3375), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 5000), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 5000), + }, + expectedRetryAttempts: 5), + }; + + yield return new[] + { + new GrpcRetryTestCase( + "Retry until non-retryable status code encountered", + new GrpcRetryAttempt[] + { + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 1500), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 2250), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 3375), + new(StatusCode.PermissionDenied, expectedSuccess: false), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 5000), + }, + expectedRetryAttempts: 4), + }; + + // Test throttling affects exponential backoff. + yield return new[] + { + new GrpcRetryTestCase( + "Exponential backoff after throttling", + new GrpcRetryAttempt[] + { + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 1500), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 2250), + new(StatusCode.Unavailable, throttleDelay: Duration.FromTimeSpan(TimeSpan.FromMilliseconds(500)), expectedNextRetryDelayMilliseconds: 750), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 1125), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 1688), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 2532), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 3798), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 5000), + new(StatusCode.Unavailable, expectedNextRetryDelayMilliseconds: 5000), + }, + expectedRetryAttempts: 9), + }; + + yield return new[] + { + new GrpcRetryTestCase( + "Ridiculous throttling delay", + new GrpcRetryAttempt[] + { + new(StatusCode.Unavailable, throttleDelay: Duration.FromTimeSpan(TimeSpan.FromDays(3000000)), expectedNextRetryDelayMilliseconds: 5000), + }), + }; + } + + public override string ToString() + { + return this.testRunnerName; + } + + private static Metadata GenerateTrailers(Duration throttleDelay) + { + var metadata = new Metadata(); + + var retryInfo = new Google.Rpc.RetryInfo(); + retryInfo.RetryDelay = throttleDelay; + + var status = new Google.Rpc.Status(); + status.Details.Add(Any.Pack(retryInfo)); + + var stream = new MemoryStream(); + status.WriteTo(stream); + + metadata.Add(OtlpRetry.GrpcStatusDetailsHeader, stream.ToArray()); + return metadata; + } + + public struct GrpcRetryAttempt + { + public RpcException RpcException; + public CallOptions CallOptions; + public TimeSpan? ThrottleDelay; + public int? ExpectedNextRetryDelayMilliseconds; + public bool ExpectedSuccess; + + public GrpcRetryAttempt( + StatusCode statusCode, + bool deadlineExceeded = false, + Duration throttleDelay = null, + int expectedNextRetryDelayMilliseconds = 1500, + bool expectedSuccess = true) + { + var status = new Status(statusCode, "Error"); + this.RpcException = throttleDelay != null + ? new RpcException(status, GenerateTrailers(throttleDelay)) + : new RpcException(status); + + this.CallOptions = deadlineExceeded ? new CallOptions(deadline: DateTime.UtcNow.AddSeconds(-1)) : default; + + this.ThrottleDelay = throttleDelay != null ? throttleDelay.ToTimeSpan() : null; + + this.ExpectedNextRetryDelayMilliseconds = expectedNextRetryDelayMilliseconds; + + this.ExpectedSuccess = expectedSuccess; + } + } + } + } +}