Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Per Partition Automatic Failover: Fixes Metadata Requests Retry Policy #4205

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
eb7b0a0
Code changes to retry on next preferred region for metadata reads on …
kundadebdatta Nov 9, 2023
ec125de
Code changes to add retry for PK Ranges call.
kundadebdatta Dec 6, 2023
483cc45
Code changes to mark endpoint unavailable for read when cosmos except…
kundadebdatta Dec 7, 2023
cc4657f
Code changes to fix unit tests. Added global endpoint manager in Pk R…
kundadebdatta Dec 8, 2023
dbee389
Code changes to fix unit tests.
kundadebdatta Dec 8, 2023
505ee41
Code changes to fix build break.
kundadebdatta Dec 8, 2023
77bc01d
Minor code clean-up.
kundadebdatta Dec 8, 2023
c26bbb9
Code changes to capture metadata location endpoint within on before s…
kundadebdatta Dec 19, 2023
416cb6e
Code changes to address review comments.
kundadebdatta Dec 19, 2023
ba31430
Code changes to fix build failure.
kundadebdatta Dec 19, 2023
dab70d0
Code changes to refactor metadata timeout policy.
kundadebdatta Dec 20, 2023
2e4cfc7
Code changes to add retry for request timeout. Fix emulator tests.
kundadebdatta Dec 20, 2023
697f9be
Code changes to add metadata retry policy unit tests.
kundadebdatta Dec 21, 2023
bcb2222
Code changes to add more tests.
kundadebdatta Dec 21, 2023
621bd64
Merge branch 'master' into users/kundadebdatta/4181_retry_metadata_re…
kundadebdatta Dec 21, 2023
0204173
Code changes to refactor metadata retry policy logic to increment loc…
kundadebdatta Dec 22, 2023
6724c77
Merge branch 'master' into users/kundadebdatta/4181_retry_metadata_re…
kundadebdatta Dec 22, 2023
b507ed2
Code changes to address review comments.
kundadebdatta Dec 22, 2023
2f427e3
Code changes to address review comments.
kundadebdatta Dec 29, 2023
a20af65
Code changes to add separate condition for pk range requests.
kundadebdatta Dec 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(
}
}

if (exception is CosmosException cosmosException)
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
{
ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
cosmosException.StatusCode,
cosmosException.Headers.SubStatusCode);
if (shouldRetryResult != null)
{
return shouldRetryResult;
}
}

return await this.throttlingRetry.ShouldRetryAsync(exception, cancellationToken);
}

Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ private async Task OpenPrivateAsync(CancellationToken cancellationToken)
tokenProvider: this,
retryPolicy: this.retryPolicy,
telemetryToServiceHelper: this.telemetryToServiceHelper);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager);

DefaultTrace.TraceWarning("{0} occurred while OpenAsync. Exception Message: {1}", ex.ToString(), ex.Message);
}
Expand Down Expand Up @@ -1033,7 +1033,7 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
tokenProvider: this,
retryPolicy: this.retryPolicy,
telemetryToServiceHelper: this.telemetryToServiceHelper);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager);
this.ResetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy);

gatewayStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,9 @@ private async Task<HttpResponseMessage> SendHttpHelperAsync(
headers: new Headers()
{
ActivityId = System.Diagnostics.Trace.CorrelationManager.ActivityId.ToString(),
SubStatusCode = SubStatusCodes.TransportGenerated503
SubStatusCode = resourceType == ResourceType.PartitionKeyRange
? SubStatusCodes.PartitionKeyRangeGone
ealsur marked this conversation as resolved.
Show resolved Hide resolved
: SubStatusCodes.TransportGenerated503
},
trace: trace,
innerException: e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static HttpTimeoutPolicy GetTimeoutPolicy(
}

//Partition Key Requests
if (documentServiceRequest.ResourceType == ResourceType.PartitionKeyRange)
ealsur marked this conversation as resolved.
Show resolved Hide resolved
if (documentServiceRequest.ResourceType == ResourceType.Address)
{
return HttpTimeoutPolicyControlPlaneRetriableHotPath.Instance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ private HttpTimeoutPolicyDefault(bool shouldThrow503OnTimeout)

private readonly IReadOnlyList<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)> TimeoutsAndDelays = new List<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)>()
{
(TimeSpan.FromSeconds(65), TimeSpan.Zero),
(TimeSpan.FromSeconds(65), TimeSpan.FromSeconds(1)),
(TimeSpan.FromSeconds(65), TimeSpan.Zero),
(TimeSpan.FromSeconds(3), TimeSpan.Zero),
ealsur marked this conversation as resolved.
Show resolved Hide resolved
(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1)),
(TimeSpan.FromSeconds(10), TimeSpan.Zero),
ealsur marked this conversation as resolved.
Show resolved Hide resolved
};

public override string TimeoutPolicyName => HttpTimeoutPolicyDefault.Name;
Expand Down
128 changes: 128 additions & 0 deletions Microsoft.Azure.Cosmos/src/MetadataRequestThrottleRetryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;

/// <summary>
/// Metadata Request Throttle Retry Policy is combination of endpoint change retry + throttling retry.
/// </summary>
internal sealed class MetadataRequestThrottleRetryPolicy : IDocumentClientRetryPolicy
{
/// <summary>
/// An instance of <see cref="GlobalEndpointManager"/>.
/// </summary>
private readonly GlobalEndpointManager globalEndpointManager;

/// <summary>
/// Defines the throttling retry policy that is used as the underlying retry policy.
/// </summary>
private readonly IDocumentClientRetryPolicy throttlingRetryPolicy;

/// <summary>
/// An instance of <see cref="Uri"/> containing the location endpoint where the partition key
/// range http request will be sent over.
/// </summary>
private Uri metadataLocationEndpoint;

/// <summary>
/// The constructor to initialize an instance of <see cref="MetadataRequestThrottleRetryPolicy"/>.
/// </summary>
/// <param name="endpointManager">An instance of <see cref="GlobalEndpointManager"/></param>
/// <param name="maxRetryAttemptsOnThrottledRequests">An integer defining the maximum number
/// of attempts to retry when requests are throttled.</param>
/// <param name="maxRetryWaitTimeInSeconds">An integer defining the maximum wait time in seconds.</param>
public MetadataRequestThrottleRetryPolicy(
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
GlobalEndpointManager endpointManager,
int maxRetryAttemptsOnThrottledRequests,
int maxRetryWaitTimeInSeconds)
{
this.globalEndpointManager = endpointManager;
this.throttlingRetryPolicy = new ResourceThrottleRetryPolicy(
maxRetryAttemptsOnThrottledRequests,
maxRetryWaitTimeInSeconds);
}

/// <summary>
/// Should the caller retry the operation.
/// </summary>
/// <param name="exception">Exception that occured when the operation was tried</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
/// <returns>True indicates caller should retry, False otherwise</returns>
public Task<ShouldRetryResult> ShouldRetryAsync(
Exception exception,
CancellationToken cancellationToken)
{
if (exception is CosmosException cosmosException
&& cosmosException.StatusCode == HttpStatusCode.ServiceUnavailable
&& cosmosException.Headers.SubStatusCode == SubStatusCodes.PartitionKeyRangeGone)
{
if (!this.MarkEndpointUnavailableForRead())
{
return Task.FromResult(ShouldRetryResult.NoRetry());
}
}

return this.throttlingRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
}

/// <summary>
/// Should the caller retry the operation.
/// </summary>
/// <param name="cosmosResponseMessage"><see cref="ResponseMessage"/> in return of the request</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
/// <returns>True indicates caller should retry, False otherwise</returns>
public Task<ShouldRetryResult> ShouldRetryAsync(
ResponseMessage cosmosResponseMessage,
CancellationToken cancellationToken)
{
if (cosmosResponseMessage?.StatusCode == HttpStatusCode.ServiceUnavailable
&& cosmosResponseMessage?.Headers.SubStatusCode == SubStatusCodes.PartitionKeyRangeGone)
{
if (!this.MarkEndpointUnavailableForRead())
{
return Task.FromResult(ShouldRetryResult.NoRetry());
}
}

return this.throttlingRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
}

/// <summary>
/// Method that is called before a request is sent to allow the retry policy implementation
/// to modify the state of the request.
/// </summary>
/// <param name="request">The request being sent to the service.</param>
public void OnBeforeSendRequest(DocumentServiceRequest request)
{
this.metadataLocationEndpoint = this.globalEndpointManager.ResolveServiceEndpoint(request);
}

/// <summary>
/// Marks an endpoint unavailable in the global endpoint manager, for any future read requests.
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
/// <returns>A boolean flag indicating if the operation was successful.</returns>
private bool MarkEndpointUnavailableForRead()
{
if (this.metadataLocationEndpoint != null)
{
DefaultTrace.TraceWarning("MetadataRequestThrottleRetryPolicy: Marking the following endpoint unavailable for reads: {0}.", this.metadataLocationEndpoint);
this.globalEndpointManager.MarkEndpointUnavailableForRead(this.metadataLocationEndpoint);
return true;
}
else
{
DefaultTrace.TraceWarning("MetadataRequestThrottleRetryPolicy: location endpoint couldn't be resolved. Skip marking endpoint unavailable for reads.");
return false;
}
}
}
}
55 changes: 18 additions & 37 deletions Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,20 @@ internal class PartitionKeyRangeCache : IRoutingMapProvider, ICollectionRoutingM
private readonly ICosmosAuthorizationTokenProvider authorizationTokenProvider;
private readonly IStoreModel storeModel;
private readonly CollectionCache collectionCache;
private readonly GlobalEndpointManager endpointManager;

public PartitionKeyRangeCache(
ICosmosAuthorizationTokenProvider authorizationTokenProvider,
IStoreModel storeModel,
CollectionCache collectionCache)
CollectionCache collectionCache,
GlobalEndpointManager endpointManager)
{
this.routingMapCache = new AsyncCacheNonBlocking<string, CollectionRoutingMap>(
keyEqualityComparer: StringComparer.Ordinal);
this.authorizationTokenProvider = authorizationTokenProvider;
this.storeModel = storeModel;
this.collectionCache = collectionCache;
this.endpointManager = endpointManager;
}

public virtual async Task<IReadOnlyList<PartitionKeyRange>> TryGetOverlappingRangesAsync(
Expand Down Expand Up @@ -121,10 +124,10 @@ public virtual async Task<CollectionRoutingMap> TryLookupAsync(
return await this.routingMapCache.GetAsync(
key: collectionRid,
singleValueInitFunc: (_) => this.GetRoutingMapForCollectionAsync(
collectionRid,
previousValue,
trace,
request?.RequestContext?.ClientRequestStatistics),
collectionRid: collectionRid,
previousRoutingMap: previousValue,
trace: trace,
clientSideRequestStatistics: request?.RequestContext?.ClientRequestStatistics),
forceRefresh: (currentValue) => PartitionKeyRangeCache.ShouldForceRefresh(previousValue, currentValue));
}
catch (DocumentClientException ex)
Expand Down Expand Up @@ -174,35 +177,6 @@ private static bool ShouldForceRefresh(
return previousValue.ChangeFeedNextIfNoneMatch == currentValue.ChangeFeedNextIfNoneMatch;
}

public async Task<PartitionKeyRange> TryGetRangeByPartitionKeyRangeIdAsync(string collectionRid,
ealsur marked this conversation as resolved.
Show resolved Hide resolved
string partitionKeyRangeId,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics)
{
try
{
CollectionRoutingMap routingMap = await this.routingMapCache.GetAsync(
key: collectionRid,
singleValueInitFunc: (_) => this.GetRoutingMapForCollectionAsync(
collectionRid: collectionRid,
previousRoutingMap: null,
trace: trace,
clientSideRequestStatistics: clientSideRequestStatistics),
forceRefresh: (_) => false);

return routingMap.TryGetRangeByPartitionKeyRangeId(partitionKeyRangeId);
}
catch (DocumentClientException ex)
{
if (ex.StatusCode == HttpStatusCode.NotFound)
{
return null;
}

throw;
}
}

private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
string collectionRid,
CollectionRoutingMap previousRoutingMap,
Expand All @@ -225,9 +199,14 @@ private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
}

RetryOptions retryOptions = new RetryOptions();
MetadataRequestThrottleRetryPolicy metadataRetryPolicy = new (
endpointManager: this.endpointManager,
maxRetryAttemptsOnThrottledRequests: retryOptions.MaxRetryAttemptsOnThrottledRequests,
maxRetryWaitTimeInSeconds: retryOptions.MaxRetryWaitTimeInSeconds);

using (DocumentServiceResponse response = await BackoffRetryUtility<DocumentServiceResponse>.ExecuteAsync(
() => this.ExecutePartitionKeyRangeReadChangeFeedAsync(collectionRid, headers, trace, clientSideRequestStatistics),
new ResourceThrottleRetryPolicy(retryOptions.MaxRetryAttemptsOnThrottledRequests, retryOptions.MaxRetryWaitTimeInSeconds)))
() => this.ExecutePartitionKeyRangeReadChangeFeedAsync(collectionRid, headers, trace, clientSideRequestStatistics, metadataRetryPolicy),
retryPolicy: metadataRetryPolicy))
{
lastStatusCode = response.StatusCode;
changeFeedNextIfNoneMatch = response.Headers[HttpConstants.HttpHeaders.ETag];
Expand Down Expand Up @@ -274,7 +253,8 @@ private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
private async Task<DocumentServiceResponse> ExecutePartitionKeyRangeReadChangeFeedAsync(string collectionRid,
INameValueCollection headers,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics)
IClientSideRequestStatistics clientSideRequestStatistics,
IDocumentClientRetryPolicy retryPolicy)
{
using (ITrace childTrace = trace.StartChild("Read PartitionKeyRange Change Feed", TraceComponent.Transport, Tracing.TraceLevel.Info))
{
Expand All @@ -285,6 +265,7 @@ private async Task<DocumentServiceResponse> ExecutePartitionKeyRangeReadChangeFe
AuthorizationTokenType.PrimaryMasterKey,
headers))
{
retryPolicy?.OnBeforeSendRequest(request);
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
string authorizationToken = null;
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private void Init()
},
string.Empty);

this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(null, null, null);
this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(null, null, null, null);
this.partitionKeyRangeCache.Setup(
m => m.TryLookupAsync(
It.IsAny<string>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ private class ClientWithSplitDetection : MockDocumentClient

public ClientWithSplitDetection()
{
this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(MockBehavior.Strict, null, null, null);
this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(MockBehavior.Strict, null, null, null, null);
this.partitionKeyRangeCache.Setup(
m => m.TryGetOverlappingRangesAsync(
It.IsAny<string>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ private class ClientWithSplitDetection : MockDocumentClient

public ClientWithSplitDetection()
{
this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(MockBehavior.Strict, null, null, null);
this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(MockBehavior.Strict, null, null, null, null);
this.partitionKeyRangeCache.Setup(
m => m.TryGetOverlappingRangesAsync(
It.IsAny<string>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ private class ClientWithSplitDetection : MockDocumentClient

public ClientWithSplitDetection()
{
this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(MockBehavior.Strict, null, null, null);
this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(MockBehavior.Strict, null, null, null, null);
this.partitionKeyRangeCache.Setup(
m => m.TryGetOverlappingRangesAsync(
It.IsAny<string>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private class ClientWithSplitDetection : MockDocumentClient

public ClientWithSplitDetection()
{
this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(MockBehavior.Strict, null, null, null);
this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(MockBehavior.Strict, null, null, null, null);
this.partitionKeyRangeCache.Setup(
m => m.TryGetOverlappingRangesAsync(
It.IsAny<string>(),
Expand Down
Loading
Loading