From c5556853dbc8a63466d5bea5bc1f69ccdd0014e2 Mon Sep 17 00:00:00 2001 From: Debdatta Kunda <87335885+kundadebdatta@users.noreply.github.com> Date: Sat, 30 Dec 2023 04:51:20 -0800 Subject: [PATCH] [Internal] Per Partition Automatic Failover: Fixes Metadata Requests Retry Policy (#4205) * Code changes to retry on next preferred region for metadata reads on gateway timeouts. * Code changes to add retry for PK Ranges call. * Code changes to mark endpoint unavailable for read when cosmos exception occurs! * Code changes to fix unit tests. Added global endpoint manager in Pk Range Cache ctor. * Code changes to fix unit tests. * Code changes to fix build break. * Minor code clean-up. * Code changes to capture metadata location endpoint within on before send request. * Code changes to address review comments. * Code changes to fix build failure. * Code changes to refactor metadata timeout policy. * Code changes to add retry for request timeout. Fix emulator tests. * Code changes to add metadata retry policy unit tests. * Code changes to add more tests. * Code changes to refactor metadata retry policy logic to increment location index. Addressed review comments. * Code changes to address review comments. * Code changes to address review comments. * Code changes to add separate condition for pk range requests. --- .../src/ClientRetryPolicy.cs | 14 ++ Microsoft.Azure.Cosmos/src/DocumentClient.cs | 4 +- .../src/HttpClient/HttpTimeoutPolicy.cs | 13 +- .../src/MetadataRequestThrottleRetryPolicy.cs | 190 ++++++++++++++++++ .../src/Routing/PartitionKeyRangeCache.cs | 62 +++--- .../Mocks/MockDocumentClient.cs | 2 +- .../Batch/BatchAsyncBatcherTests.cs | 2 +- .../Batch/BatchAsyncContainerExecutorTests.cs | 2 +- .../Batch/BatchAsyncOperationContextTests.cs | 2 +- ...lkPartitionKeyRangeGoneRetryPolicyTests.cs | 2 +- .../PartitionSynchronizerCoreTests.cs | 43 +++- .../GatewayAddressCacheTests.cs | 4 +- .../GatewayStoreModelTest.cs | 24 +-- ...MetadataRequestThrottleRetryPolicyTests.cs | 129 ++++++++++++ .../PartitionKeyRangeCacheTest.cs | 141 ++++++++++++- .../PartitionKeyRangeHandlerTests.cs | 7 +- .../Utils/MockDocumentClient.cs | 9 +- 17 files changed, 578 insertions(+), 72 deletions(-) create mode 100644 Microsoft.Azure.Cosmos/src/MetadataRequestThrottleRetryPolicy.cs create mode 100644 Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/MetadataRequestThrottleRetryPolicyTests.cs diff --git a/Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs b/Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs index 147977e130..e19433c17e 100644 --- a/Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs +++ b/Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs @@ -98,6 +98,20 @@ public async Task ShouldRetryAsync( } } + // Any metadata request will throw a cosmos exception from CosmosHttpClientCore if + // it receives a 503 service unavailable from gateway. This check is to add retry + // mechanism for the metadata requests in such cases. + if (exception is CosmosException cosmosException) + { + ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync( + cosmosException.StatusCode, + cosmosException.Headers.SubStatusCode); + if (shouldRetryResult != null) + { + return shouldRetryResult; + } + } + return await this.throttlingRetry.ShouldRetryAsync(exception, cancellationToken); } diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 632c158f96..8340df8207 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -662,7 +662,7 @@ private async Task OpenPrivateAsync(CancellationToken cancellationToken) tokenProvider: this, retryPolicy: this.retryPolicy, telemetryToServiceHelper: this.telemetryToServiceHelper); - this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache); + this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager); DefaultTrace.TraceWarning("{0} occurred while OpenAsync. Exception Message: {1}", ex.ToString(), ex.Message); } @@ -1033,7 +1033,7 @@ private async Task GetInitializationTaskAsync(IStoreClientFactory storeCli tokenProvider: this, retryPolicy: this.retryPolicy, telemetryToServiceHelper: this.telemetryToServiceHelper); - this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache); + this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager); this.ResetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy); gatewayStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache); diff --git a/Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicy.cs b/Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicy.cs index 42742f86c1..5b39a2c113 100644 --- a/Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicy.cs +++ b/Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicy.cs @@ -29,8 +29,15 @@ public static HttpTimeoutPolicy GetTimeoutPolicy( return HttpTimeoutPolicyControlPlaneRetriableHotPath.InstanceShouldThrow503OnTimeout; } - //Partition Key Requests - if (documentServiceRequest.ResourceType == ResourceType.PartitionKeyRange) + //Get Partition Key Range Requests + if (documentServiceRequest.ResourceType == ResourceType.PartitionKeyRange + && documentServiceRequest.OperationType == OperationType.ReadFeed) + { + return HttpTimeoutPolicyControlPlaneRetriableHotPath.InstanceShouldThrow503OnTimeout; + } + + //Get Addresses Requests + if (documentServiceRequest.ResourceType == ResourceType.Address) { return HttpTimeoutPolicyControlPlaneRetriableHotPath.Instance; } @@ -44,7 +51,7 @@ public static HttpTimeoutPolicy GetTimeoutPolicy( //Meta Data Read if (HttpTimeoutPolicy.IsMetaData(documentServiceRequest) && documentServiceRequest.IsReadOnlyRequest) { - return HttpTimeoutPolicyDefault.InstanceShouldThrow503OnTimeout; + return HttpTimeoutPolicyControlPlaneRetriableHotPath.InstanceShouldThrow503OnTimeout; } //Default behavior diff --git a/Microsoft.Azure.Cosmos/src/MetadataRequestThrottleRetryPolicy.cs b/Microsoft.Azure.Cosmos/src/MetadataRequestThrottleRetryPolicy.cs new file mode 100644 index 0000000000..928d2f2e87 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/MetadataRequestThrottleRetryPolicy.cs @@ -0,0 +1,190 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Net; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Core.Trace; + using Microsoft.Azure.Cosmos.Routing; + using Microsoft.Azure.Documents; + + /// + /// Metadata Request Throttle Retry Policy is combination of endpoint change retry + throttling retry. + /// + internal sealed class MetadataRequestThrottleRetryPolicy : IDocumentClientRetryPolicy + { + /// + /// A constant integer defining the default maximum retry wait time in seconds. + /// + private const int DefaultMaxWaitTimeInSeconds = 60; + + /// + /// A constant integer defining the default maximum retry count on service unavailable. + /// + private const int DefaultMaxServiceUnavailableRetryCount = 1; + + /// + /// An instance of . + /// + private readonly IGlobalEndpointManager globalEndpointManager; + + /// + /// Defines the throttling retry policy that is used as the underlying retry policy. + /// + private readonly IDocumentClientRetryPolicy throttlingRetryPolicy; + + /// + /// An integer defining the maximum retry count on service unavailable. + /// + private readonly int maxServiceUnavailableRetryCount; + + /// + /// An instance of containing the location endpoint where the partition key + /// range http request will be sent over. + /// + private MetadataRetryContext retryContext; + + /// + /// An integer capturing the current retry count on service unavailable. + /// + private int serviceUnavailableRetryCount; + + /// + /// The constructor to initialize an instance of . + /// + /// An instance of + /// An integer defining the maximum number + /// of attempts to retry when requests are throttled. + /// An integer defining the maximum wait time in seconds. + public MetadataRequestThrottleRetryPolicy( + IGlobalEndpointManager endpointManager, + int maxRetryAttemptsOnThrottledRequests, + int maxRetryWaitTimeInSeconds = DefaultMaxWaitTimeInSeconds) + { + this.globalEndpointManager = endpointManager; + this.maxServiceUnavailableRetryCount = Math.Max( + MetadataRequestThrottleRetryPolicy.DefaultMaxServiceUnavailableRetryCount, + this.globalEndpointManager.PreferredLocationCount); + + this.throttlingRetryPolicy = new ResourceThrottleRetryPolicy( + maxRetryAttemptsOnThrottledRequests, + maxRetryWaitTimeInSeconds); + + this.retryContext = new MetadataRetryContext + { + RetryLocationIndex = 0, + RetryRequestOnPreferredLocations = true, + }; + } + + /// + /// Should the caller retry the operation. + /// + /// Exception that occured when the operation was tried + /// An instance of . + /// True indicates caller should retry, False otherwise + public Task ShouldRetryAsync( + Exception exception, + CancellationToken cancellationToken) + { + if (exception is CosmosException cosmosException + && cosmosException.StatusCode == HttpStatusCode.ServiceUnavailable + && cosmosException.Headers.SubStatusCode == SubStatusCodes.TransportGenerated503) + { + if (this.IncrementRetryIndexOnServiceUnavailableForMetadataRead()) + { + return Task.FromResult(ShouldRetryResult.RetryAfter(TimeSpan.Zero)); + } + } + + return this.throttlingRetryPolicy.ShouldRetryAsync(exception, cancellationToken); + } + + /// + /// Should the caller retry the operation. + /// + /// in return of the request + /// An instance of . + /// True indicates caller should retry, False otherwise + public Task ShouldRetryAsync( + ResponseMessage cosmosResponseMessage, + CancellationToken cancellationToken) + { + if (cosmosResponseMessage?.StatusCode == HttpStatusCode.ServiceUnavailable + && cosmosResponseMessage?.Headers?.SubStatusCode == SubStatusCodes.TransportGenerated503) + { + if (this.IncrementRetryIndexOnServiceUnavailableForMetadataRead()) + { + return Task.FromResult(ShouldRetryResult.RetryAfter(TimeSpan.Zero)); + } + } + + return this.throttlingRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken); + } + + /// + /// Method that is called before a request is sent to allow the retry policy implementation + /// to modify the state of the request. + /// + /// The request being sent to the service. + public void OnBeforeSendRequest(DocumentServiceRequest request) + { + // Clear the previous location-based routing directive. + request.RequestContext.ClearRouteToLocation(); + request.RequestContext.RouteToLocation( + this.retryContext.RetryLocationIndex, + this.retryContext.RetryRequestOnPreferredLocations); + + Uri metadataLocationEndpoint = this.globalEndpointManager.ResolveServiceEndpoint(request); + + DefaultTrace.TraceInformation("MetadataRequestThrottleRetryPolicy: Routing the metadata request to: {0} for operation type: {1} and resource type: {2}.", metadataLocationEndpoint, request.OperationType, request.ResourceType); + request.RequestContext.RouteToLocation(metadataLocationEndpoint); + } + + /// + /// Increments the location index when a service unavailable exception ocurrs, for any future read requests. + /// + /// A boolean flag indicating if the operation was successful. + private bool IncrementRetryIndexOnServiceUnavailableForMetadataRead() + { + if (this.serviceUnavailableRetryCount++ >= this.maxServiceUnavailableRetryCount) + { + DefaultTrace.TraceWarning("MetadataRequestThrottleRetryPolicy: Retry count: {0} has exceeded the maximum permitted retry count on service unavailable: {1}.", this.serviceUnavailableRetryCount, this.maxServiceUnavailableRetryCount); + return false; + } + + // Retrying on second PreferredLocations. + // RetryCount is used as zero-based index. + DefaultTrace.TraceWarning("MetadataRequestThrottleRetryPolicy: Incrementing the metadata retry location index to: {0}.", this.serviceUnavailableRetryCount); + this.retryContext = new MetadataRetryContext() + { + RetryLocationIndex = this.serviceUnavailableRetryCount, + RetryRequestOnPreferredLocations = true, + }; + + return true; + } + + /// + /// A helper class containing the required attributes for + /// metadata retry context. + /// + internal sealed class MetadataRetryContext + { + /// + /// An integer defining the current retry location index. + /// + public int RetryLocationIndex { get; set; } + + /// + /// A boolean flag indicating if the request should retry on + /// preferred locations. + /// + public bool RetryRequestOnPreferredLocations { get; set; } + } + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs b/Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs index d9e2854b2b..d849e4d3c0 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs @@ -30,17 +30,20 @@ internal class PartitionKeyRangeCache : IRoutingMapProvider, ICollectionRoutingM private readonly ICosmosAuthorizationTokenProvider authorizationTokenProvider; private readonly IStoreModel storeModel; private readonly CollectionCache collectionCache; + private readonly IGlobalEndpointManager endpointManager; public PartitionKeyRangeCache( ICosmosAuthorizationTokenProvider authorizationTokenProvider, IStoreModel storeModel, - CollectionCache collectionCache) + CollectionCache collectionCache, + IGlobalEndpointManager endpointManager) { this.routingMapCache = new AsyncCacheNonBlocking( keyEqualityComparer: StringComparer.Ordinal); this.authorizationTokenProvider = authorizationTokenProvider; this.storeModel = storeModel; this.collectionCache = collectionCache; + this.endpointManager = endpointManager; } public virtual async Task> TryGetOverlappingRangesAsync( @@ -121,10 +124,10 @@ public virtual async Task TryLookupAsync( return await this.routingMapCache.GetAsync( key: collectionRid, singleValueInitFunc: (_) => this.GetRoutingMapForCollectionAsync( - collectionRid, - previousValue, - trace, - request?.RequestContext?.ClientRequestStatistics), + collectionRid: collectionRid, + previousRoutingMap: previousValue, + trace: trace, + clientSideRequestStatistics: request?.RequestContext?.ClientRequestStatistics), forceRefresh: (currentValue) => PartitionKeyRangeCache.ShouldForceRefresh(previousValue, currentValue)); } catch (DocumentClientException ex) @@ -174,35 +177,6 @@ private static bool ShouldForceRefresh( return previousValue.ChangeFeedNextIfNoneMatch == currentValue.ChangeFeedNextIfNoneMatch; } - public async Task TryGetRangeByPartitionKeyRangeIdAsync(string collectionRid, - string partitionKeyRangeId, - ITrace trace, - IClientSideRequestStatistics clientSideRequestStatistics) - { - try - { - CollectionRoutingMap routingMap = await this.routingMapCache.GetAsync( - key: collectionRid, - singleValueInitFunc: (_) => this.GetRoutingMapForCollectionAsync( - collectionRid: collectionRid, - previousRoutingMap: null, - trace: trace, - clientSideRequestStatistics: clientSideRequestStatistics), - forceRefresh: (_) => false); - - return routingMap.TryGetRangeByPartitionKeyRangeId(partitionKeyRangeId); - } - catch (DocumentClientException ex) - { - if (ex.StatusCode == HttpStatusCode.NotFound) - { - return null; - } - - throw; - } - } - private async Task GetRoutingMapForCollectionAsync( string collectionRid, CollectionRoutingMap previousRoutingMap, @@ -213,6 +187,12 @@ private async Task GetRoutingMapForCollectionAsync( string changeFeedNextIfNoneMatch = previousRoutingMap?.ChangeFeedNextIfNoneMatch; HttpStatusCode lastStatusCode = HttpStatusCode.OK; + + RetryOptions retryOptions = new RetryOptions(); + MetadataRequestThrottleRetryPolicy metadataRetryPolicy = new ( + endpointManager: this.endpointManager, + maxRetryAttemptsOnThrottledRequests: retryOptions.MaxRetryAttemptsOnThrottledRequests, + maxRetryWaitTimeInSeconds: retryOptions.MaxRetryWaitTimeInSeconds); do { INameValueCollection headers = new RequestNameValueCollection(); @@ -224,10 +204,9 @@ private async Task GetRoutingMapForCollectionAsync( headers.Set(HttpConstants.HttpHeaders.IfNoneMatch, changeFeedNextIfNoneMatch); } - RetryOptions retryOptions = new RetryOptions(); using (DocumentServiceResponse response = await BackoffRetryUtility.ExecuteAsync( - () => this.ExecutePartitionKeyRangeReadChangeFeedAsync(collectionRid, headers, trace, clientSideRequestStatistics), - new ResourceThrottleRetryPolicy(retryOptions.MaxRetryAttemptsOnThrottledRequests, retryOptions.MaxRetryWaitTimeInSeconds))) + () => this.ExecutePartitionKeyRangeReadChangeFeedAsync(collectionRid, headers, trace, clientSideRequestStatistics, metadataRetryPolicy), + retryPolicy: metadataRetryPolicy)) { lastStatusCode = response.StatusCode; changeFeedNextIfNoneMatch = response.Headers[HttpConstants.HttpHeaders.ETag]; @@ -274,7 +253,8 @@ private async Task GetRoutingMapForCollectionAsync( private async Task ExecutePartitionKeyRangeReadChangeFeedAsync(string collectionRid, INameValueCollection headers, ITrace trace, - IClientSideRequestStatistics clientSideRequestStatistics) + IClientSideRequestStatistics clientSideRequestStatistics, + IDocumentClientRetryPolicy retryPolicy) { using (ITrace childTrace = trace.StartChild("Read PartitionKeyRange Change Feed", TraceComponent.Transport, Tracing.TraceLevel.Info)) { @@ -285,6 +265,7 @@ private async Task ExecutePartitionKeyRangeReadChangeFe AuthorizationTokenType.PrimaryMasterKey, headers)) { + retryPolicy.OnBeforeSendRequest(request); string authorizationToken = null; try { @@ -333,6 +314,11 @@ private async Task ExecutePartitionKeyRangeReadChangeFe childTrace.AddDatum("Exception Message", ex.Message); throw; } + catch (CosmosException ce) + { + childTrace.AddDatum("Exception Message", ce.Message); + throw; + } } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Mocks/MockDocumentClient.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Mocks/MockDocumentClient.cs index 7d268407de..c6b59b8cea 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Mocks/MockDocumentClient.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Mocks/MockDocumentClient.cs @@ -228,7 +228,7 @@ private void Init() }, string.Empty); - this.partitionKeyRangeCache = new Mock(null, null, null); + this.partitionKeyRangeCache = new Mock(null, null, null, null); this.partitionKeyRangeCache.Setup( m => m.TryLookupAsync( It.IsAny(), diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs index 044a735734..acc83d3ff4 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncBatcherTests.cs @@ -798,7 +798,7 @@ private class ClientWithSplitDetection : MockDocumentClient public ClientWithSplitDetection() { - this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null); + this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null, null); this.partitionKeyRangeCache.Setup( m => m.TryGetOverlappingRangesAsync( It.IsAny(), diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs index 993afa449b..c6bca20950 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncContainerExecutorTests.cs @@ -376,7 +376,7 @@ private class ClientWithSplitDetection : MockDocumentClient public ClientWithSplitDetection() { - this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null); + this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null, null); this.partitionKeyRangeCache.Setup( m => m.TryGetOverlappingRangesAsync( It.IsAny(), diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs index 5ec655d386..8e3c6bcbc3 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/BatchAsyncOperationContextTests.cs @@ -287,7 +287,7 @@ private class ClientWithSplitDetection : MockDocumentClient public ClientWithSplitDetection() { - this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null); + this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null, null); this.partitionKeyRangeCache.Setup( m => m.TryGetOverlappingRangesAsync( It.IsAny(), diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs index e1c8ad0cdf..1f87d40635 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BulkPartitionKeyRangeGoneRetryPolicyTests.cs @@ -145,7 +145,7 @@ private class ClientWithSplitDetection : MockDocumentClient public ClientWithSplitDetection() { - this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null); + this.partitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null, null); this.partitionKeyRangeCache.Setup( m => m.TryGetOverlappingRangesAsync( It.IsAny(), diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/PartitionSynchronizerCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/PartitionSynchronizerCoreTests.cs index b56a9b377a..166b0c52cf 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/PartitionSynchronizerCoreTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/PartitionSynchronizerCoreTests.cs @@ -20,6 +20,28 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests [TestCategory("ChangeFeed")] public class PartitionSynchronizerCoreTests { + private GlobalEndpointManager endpointManager; + + [TestInitialize] + public void TestInitialize() + { + Mock mockDocumentClient = new(); + + mockDocumentClient + .Setup(client => client.ServiceEndpoint) + .Returns(new Uri("https://foo")); + + this.endpointManager = new( + mockDocumentClient.Object, + new ConnectionPolicy()); + } + + [TestCleanup] + public void Cleanup() + { + this.endpointManager.Dispose(); + } + /// /// Verifies handling of Splits on PKRange based leases /// @@ -39,7 +61,8 @@ public async Task HandlePartitionGoneAsync_PKRangeBasedLease_Split() Mock pkRangeCache = new Mock( Mock.Of(), Mock.Of(), - Mock.Of()); + Mock.Of(), + this.endpointManager); List resultingRanges = new List() { @@ -102,7 +125,8 @@ public async Task HandlePartitionGoneAsync_EpkBasedLease_Split() Mock pkRangeCache = new Mock( Mock.Of(), Mock.Of(), - Mock.Of()); + Mock.Of(), + this.endpointManager); List resultingRanges = new List() { @@ -170,7 +194,8 @@ public async Task HandlePartitionGoneAsync_PKRangeBasedLease_Merge() Mock pkRangeCache = new Mock( Mock.Of(), Mock.Of(), - Mock.Of()); + Mock.Of(), + this.endpointManager); List resultingRanges = new List() { @@ -228,7 +253,8 @@ public async Task HandlePartitionGoneAsync_EpkBasedLease_Merge() Mock pkRangeCache = new Mock( Mock.Of(), Mock.Of(), - Mock.Of()); + Mock.Of(), + this.endpointManager); List resultingRanges = new List() { @@ -276,7 +302,8 @@ public async Task CreateMissingLeases_NoLeases() Mock pkRangeCache = new Mock( Mock.Of(), Mock.Of(), - Mock.Of()); + Mock.Of(), + this.endpointManager); List resultingRanges = new List() { @@ -321,7 +348,8 @@ public async Task CreateMissingLeases_SomePKRangeLeases() Mock pkRangeCache = new Mock( Mock.Of(), Mock.Of(), - Mock.Of()); + Mock.Of(), + this.endpointManager); List resultingRanges = new List() { @@ -372,7 +400,8 @@ public async Task CreateMissingLeases_SomePKRangeAndEPKLeases() Mock pkRangeCache = new Mock( Mock.Of(), Mock.Of(), - Mock.Of()); + Mock.Of(), + this.endpointManager); List resultingRanges = new List() { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs index fded897e42..4b49c29d8e 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs @@ -60,7 +60,7 @@ public GatewayAddressCacheTests() } }; - this.partitionKeyRangeCache = new Mock(null, null, null); + this.partitionKeyRangeCache = new Mock(null, null, null, null); this.partitionKeyRangeCache .Setup(m => m.TryGetOverlappingRangesAsync( It.IsAny(), @@ -755,7 +755,7 @@ public async Task GlobalAddressResolver_OpenConnectionsToAllReplicasAsync_WhenIn .Returns(Task.FromResult(containerProperties)); string exceptionMessage = "Failed to lookup partition key ranges."; - Mock partitionKeyRangeCache = new (null, null, null); + Mock partitionKeyRangeCache = new (null, null, null, null); partitionKeyRangeCache .Setup(m => m.TryGetOverlappingRangesAsync( It.IsAny(), diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs index fd9a8a3efd..c578aef620 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs @@ -274,7 +274,7 @@ await GatewayStoreModel.ApplySessionTokenAsync( dsr, ConsistencyLevel.Session, new Mock().Object, - partitionKeyRangeCache: new Mock(null, null, null).Object, + partitionKeyRangeCache: new Mock(null, null, null, null).Object, clientCollectionCache: new Mock(new SessionContainer("testhost"), gatewayStoreModel, null, null, null).Object, globalEndpointManager: Mock.Of()); @@ -301,7 +301,7 @@ await GatewayStoreModel.ApplySessionTokenAsync( dsrQueryPlan, ConsistencyLevel.Session, new Mock().Object, - partitionKeyRangeCache: new Mock(null, null, null).Object, + partitionKeyRangeCache: new Mock(null, null, null, null).Object, clientCollectionCache: new Mock(new SessionContainer("testhost"), gatewayStoreModel, null, null, null).Object, globalEndpointManager: Mock.Of()); @@ -355,7 +355,7 @@ await GatewayStoreModel.ApplySessionTokenAsync( dsr, ConsistencyLevel.Session, new Mock().Object, - partitionKeyRangeCache: new Mock(null, null, null).Object, + partitionKeyRangeCache: new Mock(null, null, null, null).Object, clientCollectionCache: new Mock(new SessionContainer("testhost"), gatewayStoreModel, null, null, null).Object, globalEndpointManager: Mock.Of()); @@ -385,7 +385,7 @@ await GatewayStoreModel.ApplySessionTokenAsync( dsrNoSessionToken, ConsistencyLevel.Session, sessionContainer, - partitionKeyRangeCache: new Mock(null, null, null).Object, + partitionKeyRangeCache: new Mock(null, null, null, null).Object, clientCollectionCache: new Mock(new SessionContainer("testhost"), gatewayStoreModel, null, null, null).Object, globalEndpointManager: globalEndpointManager.Object); @@ -426,7 +426,7 @@ await GatewayStoreModel.ApplySessionTokenAsync( It.IsAny(), NoOpTrace.Singleton)).Returns(Task.FromResult(containerProperties)); - Mock mockPartitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null); + Mock mockPartitionKeyRangeCache = new Mock(MockBehavior.Strict, null, null, null, null); mockPartitionKeyRangeCache.Setup(x => x.TryGetPartitionKeyRangeByIdAsync( containerProperties.ResourceId, partitionKeyRangeId, @@ -473,7 +473,7 @@ await GatewayStoreModel.ApplySessionTokenAsync( dsrSprocExecute, ConsistencyLevel.Session, new Mock().Object, - partitionKeyRangeCache: new Mock(null, null, null).Object, + partitionKeyRangeCache: new Mock(null, null, null, null).Object, clientCollectionCache: new Mock(new SessionContainer("testhost"), gatewayStoreModel, null, null, null).Object, globalEndpointManager: Mock.Of()); @@ -512,7 +512,7 @@ await GatewayStoreModel.ApplySessionTokenAsync( dsrNoSessionToken, ConsistencyLevel.Session, sessionContainer, - partitionKeyRangeCache: new Mock(null, null, null).Object, + partitionKeyRangeCache: new Mock(null, null, null, null).Object, clientCollectionCache: new Mock(new SessionContainer("testhost"), gatewayStoreModel, null, null, null).Object, globalEndpointManager: globalEndpointManager.Object); @@ -972,7 +972,7 @@ public async Task GatewayStoreModel_AvoidGlobalSessionToken() null, MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient())); Mock clientCollectionCache = new Mock(new SessionContainer("testhost"), storeModel, null, null, null); - Mock partitionKeyRangeCache = new Mock(null, storeModel, clientCollectionCache.Object); + Mock partitionKeyRangeCache = new Mock(null, storeModel, clientCollectionCache.Object, endpointManager); sessionContainer.SetSessionToken( ResourceId.NewDocumentCollectionId(42, 129).DocumentCollectionId.ToString(), @@ -1068,7 +1068,7 @@ Task sendFunc(HttpRequestMessage request) Mock clientCollectionCache = new Mock(new SessionContainer("testhost"), storeModel, null, null, null); - Mock partitionKeyRangeCache = new Mock(null, storeModel, clientCollectionCache.Object); + Mock partitionKeyRangeCache = new Mock(null, storeModel, clientCollectionCache.Object, endpointManager); storeModel.SetCaches(partitionKeyRangeCache.Object, clientCollectionCache.Object); INameValueCollection headers = new RequestNameValueCollection(); @@ -1136,7 +1136,7 @@ await GatewayStoreModel.ApplySessionTokenAsync( documentServiceRequestToChild, ConsistencyLevel.Session, sessionContainer, - partitionKeyRangeCache: new Mock(null, null, null).Object, + partitionKeyRangeCache: new Mock(null, null, null, null).Object, clientCollectionCache: new Mock(sessionContainer, gatewayStoreModel, null, null, null).Object, globalEndpointManager: globalEndpointManager.Object); @@ -1202,7 +1202,7 @@ await GatewayStoreModel.ApplySessionTokenAsync( documentServiceRequestToChild, ConsistencyLevel.Session, sessionContainer, - partitionKeyRangeCache: new Mock(null, null, null).Object, + partitionKeyRangeCache: new Mock(null, null, null, null).Object, clientCollectionCache: new Mock(sessionContainer, gatewayStoreModel, null, null, null).Object, globalEndpointManager: globalEndpointManager.Object); @@ -1273,7 +1273,7 @@ static async Task messageHandler(HttpRequestMessage request MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(httpMessageHandler))); ClientCollectionCache clientCollectionCache = new Mock(new SessionContainer("testhost"), storeModel, null, null, null).Object; - PartitionKeyRangeCache partitionKeyRangeCache = new Mock(null, storeModel, clientCollectionCache).Object; + PartitionKeyRangeCache partitionKeyRangeCache = new Mock(null, storeModel, clientCollectionCache, endpointManager).Object; storeModel.SetCaches(partitionKeyRangeCache, clientCollectionCache); await executeWithGatewayStoreModel(storeModel); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/MetadataRequestThrottleRetryPolicyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/MetadataRequestThrottleRetryPolicyTests.cs new file mode 100644 index 0000000000..40faac51e2 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/MetadataRequestThrottleRetryPolicyTests.cs @@ -0,0 +1,129 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Tests +{ + using System; + using System.Threading.Tasks; + using Microsoft.Azure.Documents; + using Microsoft.Azure.Cosmos.Routing; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + using Microsoft.Azure.Cosmos.Resource.CosmosExceptions; + using Microsoft.Azure.Cosmos.Tracing; + using System.Net; + using System.Reflection; + using static Microsoft.Azure.Cosmos.MetadataRequestThrottleRetryPolicy; + + /// + /// Unit tests for . + /// + [TestClass] + public class MetadataRequestThrottleRetryPolicyTests + { + [TestMethod] + [Owner("dkunda")] + [DataRow(true, true, DisplayName = "Test when a response message with a valid substatus code was used.")] + [DataRow(false, true, DisplayName = "Test when an exception was thrown with a valid substatus code.")] + [DataRow(true, false, DisplayName = "Test when a response message with an invalid substatus code was used.")] + [DataRow(false, false, DisplayName = "Test when an exception was thrown with an invalid substatus code.")] + public async Task ShouldRetryAsync_WithValidAndInvalidSubStatusCodes_ShouldIncrementLocationIndexOrSkip( + bool useResponseMessage, + bool isValidSubStatusCode) + { + // Arrange. + ShouldRetryResult retryResult; + string collectionRid = "test-collection"; + Uri primaryServiceEndpoint = new ("https://default-endpoint-region1.net/"); + Uri routedServiceEndpoint = new("https://default-endpoint-region2.net/"); + + Documents.Collections.INameValueCollection headers = new Documents.Collections.RequestNameValueCollection(); + + headers.Set(HttpConstants.HttpHeaders.PageSize, "10"); + headers.Set(HttpConstants.HttpHeaders.A_IM, HttpConstants.A_IMHeaderValues.IncrementalFeed); + + DocumentServiceRequest request = DocumentServiceRequest.Create( + OperationType.ReadFeed, + collectionRid, + Documents.ResourceType.PartitionKeyRange, + AuthorizationTokenType.PrimaryMasterKey, + headers); + + Mock mockedGlobalEndpointManager = new (); + mockedGlobalEndpointManager + .SetupSequence(gem => gem.ResolveServiceEndpoint(It.IsAny())) + .Returns(primaryServiceEndpoint) + .Returns(isValidSubStatusCode ? routedServiceEndpoint : primaryServiceEndpoint); + + MetadataRequestThrottleRetryPolicy policy = new (mockedGlobalEndpointManager.Object, 0); + policy.OnBeforeSendRequest(request); + + Assert.AreEqual(primaryServiceEndpoint, request.RequestContext.LocationEndpointToRoute); + + // Act. + if (useResponseMessage) + { + Headers responseHeaders = new() + { + SubStatusCode = isValidSubStatusCode + ? SubStatusCodes.TransportGenerated503 + : SubStatusCodes.BWTermCountLimitExceeded + }; + + ResponseMessage responseMessage = new( + statusCode: HttpStatusCode.ServiceUnavailable, + requestMessage: null, + headers: responseHeaders, + cosmosException: null, + trace: NoOpTrace.Singleton); + + retryResult = await policy.ShouldRetryAsync(responseMessage, default); + } + else + { + CosmosException exception = CosmosExceptionFactory.CreateServiceUnavailableException( + message: "Service Unavailable at the moment.", + headers: new Headers() + { + ActivityId = System.Diagnostics.Trace.CorrelationManager.ActivityId.ToString(), + SubStatusCode = isValidSubStatusCode + ? SubStatusCodes.TransportGenerated503 + : SubStatusCodes.BWTermCountLimitExceeded + }, + trace: NoOpTrace.Singleton, + innerException: null); + + + retryResult = await policy.ShouldRetryAsync(exception, default); + } + + policy.OnBeforeSendRequest(request); + + // Assert. + FieldInfo fieldInfo = policy + .GetType() + .GetField( + name: "retryContext", + bindingAttr: BindingFlags.Instance | BindingFlags.NonPublic); + + MetadataRetryContext retryContext = (MetadataRetryContext)fieldInfo + .GetValue( + obj: policy); + + Assert.IsNotNull(retryResult); + if (isValidSubStatusCode) + { + Assert.AreEqual(true, retryResult.ShouldRetry, "MetadataRequestThrottleRetryPolicy should return true since the sub status code indicates to retry the request in the next preferred read region."); + Assert.AreEqual(1, retryContext.RetryLocationIndex, "Indicates that the retry location index was incremented."); + Assert.AreEqual(routedServiceEndpoint, request.RequestContext.LocationEndpointToRoute); + } + else + { + Assert.AreEqual(false, retryResult.ShouldRetry, "ResourceThrottleRetryPolicy should return false since the status code does not indicate the request was throttled."); + Assert.AreEqual(0, retryContext.RetryLocationIndex, "Indicates that the retry location index remain unchanged."); + Assert.AreEqual(primaryServiceEndpoint, request.RequestContext.LocationEndpointToRoute); + } + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeCacheTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeCacheTest.cs index 8d7b32a7b7..44eb2fc565 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeCacheTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeCacheTest.cs @@ -21,6 +21,8 @@ namespace Microsoft.Azure.Cosmos.Tests using TraceLevel = Cosmos.Tracing.TraceLevel; using Newtonsoft.Json.Linq; using Newtonsoft.Json; + using System; + using Microsoft.Azure.Cosmos.Resource.CosmosExceptions; /// /// Unit Tests for . @@ -77,6 +79,11 @@ public async Task TryGetOverlappingRangesAsync_WithFreshContainer_ShouldNotAddSa [HttpConstants.HttpHeaders.ETag] = eTag, }; + Mock mockDocumentClient = new Mock(); + mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo")); + + using GlobalEndpointManager endpointManager = new (mockDocumentClient.Object, new ConnectionPolicy()); + mockStoreModel.SetupSequence(x => x.ProcessMessageAsync(It.IsAny(), It.IsAny())) .ReturnsAsync(new DocumentServiceResponse(new MemoryStream(singlePkCollectionCacheByte), new StoreResponseNameValueCollection() @@ -91,7 +98,7 @@ public async Task TryGetOverlappingRangesAsync_WithFreshContainer_ShouldNotAddSa .Returns(new ValueTask(authToken)); // Act. - PartitionKeyRangeCache partitionKeyRangeCache = new(mockTokenProvider.Object, mockStoreModel.Object, mockCollectioNCache.Object); + PartitionKeyRangeCache partitionKeyRangeCache = new(mockTokenProvider.Object, mockStoreModel.Object, mockCollectioNCache.Object, endpointManager); IReadOnlyList partitionKeyRanges = await partitionKeyRangeCache.TryGetOverlappingRangesAsync( containerRId, FeedRangeEpk.FullRange.Range, @@ -119,5 +126,137 @@ public async Task TryGetOverlappingRangesAsync_WithFreshContainer_ShouldNotAddSa Assert.AreEqual(eTag, secondPkRangeValue); } } + + /// + /// Test to validate that when the gateway service is unavailable, the partition key range cache is able to retry + /// the Get PK Range calls to different regions, based on the preferred locations count. Therefore, the cache keep + /// retrying until the max attempt is exausted. + /// + [TestMethod] + [DataRow(1, false, DisplayName = "Validates when the preferred location count is just one, the cache retries only once and fails.")] + [DataRow(2, false, DisplayName = "Validates when the preferred location count is two, the cache retries twice and fails.")] + [DataRow(3, true, DisplayName = "Validates when the preferred location count is three, the cache retries thrice and succeeds on the last attempt.")] + public async Task TryGetOverlappingRangesAsync_WhenGatewayThrowsServiceUnavailable_ShouldMarkReadEndpointAsUnavailable2(int preferredLocationsCount, bool shouldSucceed) + { + // Arrange. + string eTag = "483"; + string authToken = "token!"; + string containerRId = "kjhsAA=="; + string singlePkCollectionCache = "{\"_rid\":\"3FIlAOzjvyg=\",\"PartitionKeyRanges\":[{\"_rid\":\"3FIlAOzjvygCAAAAAAAAUA==\",\"id\":\"0\",\"_etag\":\"\\\"00005565-0000-0800-0000-621fd98a0000\\\"\",\"minInclusive\":\"\",\"maxExclusive\":\"FF\",\"ridPrefix\":0,\"_self\":\"dbs/3FIlAA==/colls/3FIlAOzjvyg=/pkranges/3FIlAOzjvygCAAAAAAAAUA==/\",\"throughputFraction\":1,\"status\":\"splitting\",\"parents\":[],\"_ts\":1646254474,\"_lsn\":44}],\"_count\":1}"; + byte[] singlePkCollectionCacheByte = Encoding.UTF8.GetBytes(singlePkCollectionCache); + using (ITrace trace = Trace.GetRootTrace(this.TestContext.TestName, TraceComponent.Unknown, TraceLevel.Info)) + { + Mock mockStoreModel = new(); + Mock mockCollectioNCache = new(); + Mock mockTokenProvider = new(); + NameValueCollectionWrapper headers = new() + { + [HttpConstants.HttpHeaders.ETag] = eTag, + }; + + Uri serviceUri = new("https://foo"); + Mock mockDocumentClient = new Mock(); + mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(serviceUri); + + Mock mockedEndpointManager = new Mock(); + mockedEndpointManager + .Setup(gem => gem.ResolveServiceEndpoint(It.IsAny())) + .Returns(serviceUri); + + mockedEndpointManager + .Setup(gem => gem.PreferredLocationCount) + .Returns(preferredLocationsCount); + + mockStoreModel.SetupSequence(x => x.ProcessMessageAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(CosmosExceptionFactory.CreateServiceUnavailableException( + message: "Service is Unavailable.", + headers: new Headers() + { + ActivityId = System.Diagnostics.Trace.CorrelationManager.ActivityId.ToString(), + SubStatusCode = SubStatusCodes.TransportGenerated503 + }, + trace: trace, + innerException: null)) + .ThrowsAsync(CosmosExceptionFactory.CreateServiceUnavailableException( + message: "Service is Unavailable.", + headers: new Headers() + { + ActivityId = System.Diagnostics.Trace.CorrelationManager.ActivityId.ToString(), + SubStatusCode = SubStatusCodes.TransportGenerated503 + }, + trace: trace, + innerException: null)) + .ThrowsAsync(CosmosExceptionFactory.CreateServiceUnavailableException( + message: "Service is Unavailable.", + headers: new Headers() + { + ActivityId = System.Diagnostics.Trace.CorrelationManager.ActivityId.ToString(), + SubStatusCode = SubStatusCodes.TransportGenerated503 + }, + trace: trace, + innerException: null)) + .ReturnsAsync(new DocumentServiceResponse(new MemoryStream(singlePkCollectionCacheByte), + new StoreResponseNameValueCollection() + { + ETag = eTag, + }, + HttpStatusCode.OK)) + .ReturnsAsync(new DocumentServiceResponse(null, headers, HttpStatusCode.NotModified, null)) + .ReturnsAsync(new DocumentServiceResponse(null, headers, HttpStatusCode.NotModified, null)); + + mockTokenProvider.Setup(x => x.GetUserAuthorizationTokenAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(new ValueTask(authToken)); + + PartitionKeyRangeCache partitionKeyRangeCache = new(mockTokenProvider.Object, mockStoreModel.Object, mockCollectioNCache.Object, mockedEndpointManager.Object); + + if (shouldSucceed) + { + // Act. + IReadOnlyList partitionKeyRanges = await partitionKeyRangeCache.TryGetOverlappingRangesAsync( + containerRId, + FeedRangeEpk.FullRange.Range, + trace, + forceRefresh: true); + + // Assert. + string dataPath = "children[0].data", previousContinuationTokenKey = "['Previous Continuation Token']", diagnostics = new CosmosTraceDiagnostics(trace).ToString(); + JObject traceData = JsonConvert.DeserializeObject(diagnostics); + + Assert.IsNotNull(traceData); + + string firstPkRangeCacheKeyName = ((JProperty)traceData.SelectToken(dataPath)?.First).Name; + string secondPkRangeCacheKeyName = ((JProperty)traceData.SelectToken(dataPath)?.Last).Name; + + Assert.IsTrue(!string.IsNullOrWhiteSpace(firstPkRangeCacheKeyName)); + Assert.IsTrue(!string.IsNullOrWhiteSpace(secondPkRangeCacheKeyName)); + Assert.AreNotEqual(firstPkRangeCacheKeyName, secondPkRangeCacheKeyName); + + string firstPkRangeValue = ((JProperty)traceData.SelectToken(dataPath).First)?.Value?.SelectToken(previousContinuationTokenKey)?.ToString(); + string secondPkRangeValue = ((JProperty)traceData.SelectToken(dataPath).Last)?.Value?.SelectToken(previousContinuationTokenKey)?.ToString(); + + Assert.IsTrue(!string.IsNullOrWhiteSpace(secondPkRangeValue)); + Assert.IsTrue(string.IsNullOrEmpty(firstPkRangeValue)); + Assert.AreEqual(eTag, secondPkRangeValue); + } + else + { + // Act. + CosmosException cosmosException = await Assert.ThrowsExceptionAsync(() => partitionKeyRangeCache.TryGetOverlappingRangesAsync( + containerRId, + FeedRangeEpk.FullRange.Range, + trace, + forceRefresh: true)); + + // Assert. + string diagnostics = new CosmosTraceDiagnostics(trace).ToString(); + JObject traceData = JsonConvert.DeserializeObject(diagnostics); + + Assert.IsNotNull(cosmosException); + Assert.IsNotNull(traceData); + Assert.AreEqual(HttpStatusCode.ServiceUnavailable, cosmosException.StatusCode); + Assert.AreEqual(SubStatusCodes.TransportGenerated503, cosmosException.Headers.SubStatusCode); + } + } + } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeHandlerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeHandlerTests.cs index 66eed8a723..2db6f9e9d7 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeHandlerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeHandlerTests.cs @@ -651,6 +651,10 @@ public async Task PartitionKeyRangeGoneTracePlumbingTest() containerProperties.Id = "TestContainer"; containerProperties.PartitionKey = partitionKeyDefinition; + Mock mockDocumentClient = new Mock(); + mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo")); + + using GlobalEndpointManager endpointManager = new(mockDocumentClient.Object, new ConnectionPolicy()); Mock collectionCache = new Mock(MockBehavior.Strict); collectionCache.Setup(c => c.ResolveCollectionAsync(It.IsAny(), default, trace)) @@ -661,7 +665,8 @@ public async Task PartitionKeyRangeGoneTracePlumbingTest() MockBehavior.Strict, new Mock().Object, new Mock().Object, - collectionCache.Object); + collectionCache.Object, + endpointManager); partitionKeyRangeCache.Setup(c => c.TryLookupAsync(collectionRid, null, It.IsAny(), trace)) .ReturnsAsync(collectionRoutingMap); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/MockDocumentClient.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/MockDocumentClient.cs index b0c29eb016..b6498dec6f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/MockDocumentClient.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/MockDocumentClient.cs @@ -227,7 +227,14 @@ private void Init() return Task.FromResult(cosmosContainerSetting); }); - this.partitionKeyRangeCache = new Mock(null, null, null); + Mock mockDocumentClient = new(); + mockDocumentClient + .Setup(client => client.ServiceEndpoint) + .Returns(new Uri("https://foo")); + + using GlobalEndpointManager endpointManager = new(mockDocumentClient.Object, new ConnectionPolicy()); + + this.partitionKeyRangeCache = new Mock(null, null, null, endpointManager); this.partitionKeyRangeCache.Setup( m => m.TryLookupAsync( It.IsAny(),