diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs index 08e6292a03..01be3ffc83 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs @@ -32,6 +32,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext internal static class CosmosQueryExecutionContextFactory { + internal const string ClientDisableOptimisticDirectExecution = "clientDisableOptimisticDirectExecution"; private const string InternalPartitionKeyDefinitionProperty = "x-ms-query-partitionkey-definition"; private const string QueryInspectionPattern = @"\s+(GROUP\s+BY\s+|COUNT\s*\(|MIN\s*\(|MAX\s*\(|AVG\s*\(|SUM\s*\(|DISTINCT\s+)"; private const string OptimisticDirectExecution = "OptimisticDirectExecution"; @@ -250,9 +251,9 @@ private static async Task> TryCreateFromPartitione TryCatch tryCreatePipelineStage; Documents.PartitionKeyRange targetRange = await TryGetTargetRangeOptimisticDirectExecutionAsync( - inputParameters, - partitionedQueryExecutionInfo, - cosmosQueryContext, + inputParameters, + partitionedQueryExecutionInfo, + cosmosQueryContext, containerQueryProperties, trace); @@ -761,13 +762,18 @@ private static Documents.PartitionKeyDefinition GetPartitionKeyDefinition(InputP ContainerQueryProperties containerQueryProperties, ITrace trace) { - if (!inputParameters.EnableOptimisticDirectExecution) + bool clientDisableOptimisticDirectExecution = await cosmosQueryContext.QueryClient.GetClientDisableOptimisticDirectExecutionAsync(); + + // Use the Ode code path only if ClientDisableOptimisticDirectExecution is false and EnableOptimisticDirectExecution is true + if (clientDisableOptimisticDirectExecution || !inputParameters.EnableOptimisticDirectExecution) { - if (inputParameters.InitialUserContinuationToken != null - && OptimisticDirectExecutionContinuationToken.IsOptimisticDirectExecutionContinuationToken(inputParameters.InitialUserContinuationToken)) + if (inputParameters.InitialUserContinuationToken != null + && OptimisticDirectExecutionContinuationToken.IsOptimisticDirectExecutionContinuationToken(inputParameters.InitialUserContinuationToken)) { - throw new MalformedContinuationTokenException($"The continuation token supplied requires the Optimistic Direct Execution flag to be enabled in QueryRequestOptions for the query execution to resume. " + - $"{inputParameters.InitialUserContinuationToken}"); + string errorMessage = "Execution of this query using the supplied continuation token requires EnableOptimisticDirectExecution to be set in QueryRequestOptions. " + + "If the error persists after that, contact system administrator."; + + throw new MalformedContinuationTokenException($"{errorMessage} Continuation Token: {inputParameters.InitialUserContinuationToken}"); } return null; diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/QueryClient/CosmosQueryClient.cs b/Microsoft.Azure.Cosmos/src/Query/Core/QueryClient/CosmosQueryClient.cs index d9dc0ac976..f32ab1035c 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/QueryClient/CosmosQueryClient.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/QueryClient/CosmosQueryClient.cs @@ -63,6 +63,8 @@ public abstract Task> ExecuteItemQueryAsync( ITrace trace, CancellationToken cancellationToken); + public abstract Task GetClientDisableOptimisticDirectExecutionAsync(); + public abstract Task ExecuteQueryPlanRequestAsync( string resourceUri, Documents.ResourceType resourceType, diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/QueryPlan/QueryPartitionProvider.cs b/Microsoft.Azure.Cosmos/src/Query/Core/QueryPlan/QueryPartitionProvider.cs index 6f95673d80..dcb17d6463 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/QueryPlan/QueryPartitionProvider.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/QueryPlan/QueryPartitionProvider.cs @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.QueryPlan using System.Text; using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Cosmos.Query.Core.Exceptions; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext; using Microsoft.Azure.Cosmos.Query.Core.Monads; using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Cosmos.Tracing; @@ -46,6 +47,9 @@ internal sealed class QueryPartitionProvider : IDisposable private bool disposed; private string queryengineConfiguration; + // TODO: Move this into a config class of its own + public bool ClientDisableOptimisticDirectExecution { get; private set; } + public QueryPartitionProvider(IDictionary queryengineConfiguration) { if (queryengineConfiguration == null) @@ -60,6 +64,7 @@ public QueryPartitionProvider(IDictionary queryengineConfigurati this.disposed = false; this.queryengineConfiguration = JsonConvert.SerializeObject(queryengineConfiguration); + this.ClientDisableOptimisticDirectExecution = GetClientDisableOptimisticDirectExecution((IReadOnlyDictionary)queryengineConfiguration); this.serviceProvider = IntPtr.Zero; this.serviceProviderStateLock = new object(); @@ -92,6 +97,7 @@ public void Update(IDictionary queryengineConfiguration) if (!string.Equals(this.queryengineConfiguration, newConfiguration)) { this.queryengineConfiguration = newConfiguration; + this.ClientDisableOptimisticDirectExecution = GetClientDisableOptimisticDirectExecution((IReadOnlyDictionary)queryengineConfiguration); if (!this.disposed && this.serviceProvider != IntPtr.Zero) { @@ -132,6 +138,7 @@ public TryCatch TryGetPartitionedQueryExecutionIn allowDCount: allowDCount, useSystemPrefix: useSystemPrefix, geospatialType: geospatialType); + if (!tryGetInternalQueryInfo.Succeeded) { return TryCatch.FromException(tryGetInternalQueryInfo.Exception); @@ -141,6 +148,16 @@ public TryCatch TryGetPartitionedQueryExecutionIn return TryCatch.FromResult(queryInfo); } + private static bool GetClientDisableOptimisticDirectExecution(IReadOnlyDictionary queryengineConfiguration) + { + if (queryengineConfiguration.TryGetValue(CosmosQueryExecutionContextFactory.ClientDisableOptimisticDirectExecution, out object queryConfigProperty)) + { + return (bool)queryConfigProperty; + } + + return false; + } + internal PartitionedQueryExecutionInfo ConvertPartitionedQueryExecutionInfo( PartitionedQueryExecutionInfoInternal queryInfoInternal, PartitionKeyDefinition partitionKeyDefinition) diff --git a/Microsoft.Azure.Cosmos/src/Query/v3Query/CosmosQueryClientCore.cs b/Microsoft.Azure.Cosmos/src/Query/v3Query/CosmosQueryClientCore.cs index a392632649..78fe2c985d 100644 --- a/Microsoft.Azure.Cosmos/src/Query/v3Query/CosmosQueryClientCore.cs +++ b/Microsoft.Azure.Cosmos/src/Query/v3Query/CosmosQueryClientCore.cs @@ -209,6 +209,12 @@ public override async Task ExecuteQueryPlanReques return partitionedQueryExecutionInfo; } + public override async Task GetClientDisableOptimisticDirectExecutionAsync() + { + QueryPartitionProvider provider = await this.clientContext.DocumentClient.QueryPartitionProvider; + return provider.ClientDisableOptimisticDirectExecution; + } + public override async Task> GetTargetPartitionKeyRangeByFeedRangeAsync( string resourceLink, string collectionResourceId, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/OptimisticDirectExecutionQueryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/OptimisticDirectExecutionQueryTests.cs index 78d4ddb8ed..0ac41b6f39 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/OptimisticDirectExecutionQueryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/OptimisticDirectExecutionQueryTests.cs @@ -17,6 +17,7 @@ public sealed class OptimisticDirectExecutionQueryTests : QueryTestsBase private const string PartitionKeyField = "key"; private const string NumberField = "numberField"; private const string NullField = "nullField"; + private const string ClientDisableOptimisticDirectExecution = "clientDisableOptimisticDirectExecution"; private static class PageSizeOptions { @@ -538,6 +539,22 @@ await this.CreateIngestQueryDeleteAsync( documents, (container, documents) => RunFailingTests(container, invalidQueries), "/" + PartitionKeyField); + } + + //TODO: Remove Ignore flag once emulator is updated to 1101 + [Ignore] + [TestMethod] + public async Task TestClientDisableOdeDefaultValue() + { + string authKey = Utils.ConfigurationManager.AppSettings["MasterKey"]; + string endpoint = Utils.ConfigurationManager.AppSettings["GatewayEndpoint"]; + + CosmosClient client = new CosmosClient($"AccountEndpoint={endpoint};AccountKey={authKey}"); + AccountProperties properties = await client.ReadAccountAsync(); + + bool success = bool.TryParse(properties.QueryEngineConfiguration[ClientDisableOptimisticDirectExecution].ToString(), out bool clientDisablOde); + Assert.IsTrue(success, $"Parsing must succeed. Value supplied '{ClientDisableOptimisticDirectExecution}'"); + Assert.IsFalse(clientDisablOde); } private static async Task RunTests(IEnumerable testCases, Container container) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs index 824232c412..8ec63b59ae 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs @@ -29,6 +29,7 @@ using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Routing; using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; using Newtonsoft.Json; [TestClass] @@ -162,6 +163,7 @@ public void TestDefaultQueryRequestOptionsSettings() public async Task TestPipelineForBackendDocumentsOnSinglePartitionAsync() { int numItems = 100; + int documentCountInSinglePartition = 0; OptimisticDirectExecutionTestInput input = CreateInput( description: @"Single Partition Key and Value Field", query: "SELECT VALUE COUNT(1) FROM c", @@ -172,7 +174,6 @@ public async Task TestPipelineForBackendDocumentsOnSinglePartitionAsync() QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: true); DocumentContainer inMemoryCollection = await CreateDocumentContainerAsync(numItems, multiPartition: false); IQueryPipelineStage queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions); - int documentCountInSinglePartition = 0; while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton)) { @@ -214,13 +215,15 @@ public async Task TestOdeTokenWithSpecializedPipeline() DocumentContainer documentContainer = await CreateDocumentContainerAsync(numItems, multiPartition: false); QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: input.ExpectedOptimisticDirectExecution); (CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) = CreateInputParamsAndQueryContext(input, queryRequestOptions); + IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create( documentContainer, cosmosQueryContextCore, inputParameters, NoOpTrace.Singleton); - string expectedErrorMessage = "The continuation token supplied requires the Optimistic Direct Execution flag to be enabled in QueryRequestOptions for the query execution to resume. "; + string expectedErrorMessage = "Execution of this query using the supplied continuation token requires EnableOptimisticDirectExecution to be set in QueryRequestOptions. " + + "If the error persists after that, contact system administrator."; while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton)) { @@ -446,7 +449,7 @@ public async Task TestQueryValidityCheckWithODEAsync() } catch (Exception ex) { - Assert.IsTrue(ex.InnerException.Message.Contains(testCase.ExpectedMessage)); + Assert.IsTrue(ex.InnerException.InnerException.Message.Contains(testCase.ExpectedMessage)); continue; } } @@ -494,6 +497,88 @@ public async Task TestPipelineForDistributedQueryAsync() Assert.AreEqual(1, result); } + [TestMethod] + public async Task TestClientDisableOdeLogic() + { + // GetPipelineAndDrainAsyc() contains asserts to confirm that the Ode pipeline only gets picked if clientDisableOptimisticDirectExecution flag is false + int numItems = 100; + OptimisticDirectExecutionTestInput input = CreateInput( + description: @"Single Partition Key and Value Field", + query: "SELECT * FROM c", + expectedOptimisticDirectExecution: true, + partitionKeyPath: @"/pk", + partitionKeyValue: "a"); + + // Test with ClientDisableOde = true + int result = await this.GetPipelineAndDrainAsync( + input, + numItems: numItems, + isMultiPartition: false, + expectedContinuationTokenCount: 10, + requiresDist: false, + clientDisableOde: true); + + Assert.AreEqual(numItems, result); + + // Test with ClientDisableOde = false + result = await this.GetPipelineAndDrainAsync( + input, + numItems: numItems, + isMultiPartition: false, + expectedContinuationTokenCount: 10, + requiresDist: false, + clientDisableOde: false); + + Assert.AreEqual(numItems, result); + } + + [TestMethod] + public async Task TestOdeFlagsWithContinuationToken() + { + ParallelContinuationToken parallelContinuationToken = new ParallelContinuationToken( + token: Guid.NewGuid().ToString(), + range: new Range("A", "B", true, false)); + + OptimisticDirectExecutionContinuationToken optimisticDirectExecutionContinuationToken = new OptimisticDirectExecutionContinuationToken(parallelContinuationToken); + CosmosElement cosmosElementContinuationToken = OptimisticDirectExecutionContinuationToken.ToCosmosElement(optimisticDirectExecutionContinuationToken); + + OptimisticDirectExecutionTestInput input = CreateInput( + description: @"Single Partition Key and Ode continuation token", + query: "SELECT * FROM c", + expectedOptimisticDirectExecution: true, + partitionKeyPath: @"/pk", + partitionKeyValue: "a", + continuationToken: cosmosElementContinuationToken); + + // All of these cases should throw the same exception message. + await this.ValidateErrorMessageWithModifiedOdeFlags(input, enableOde: true, clientDisableOde: true); + await this.ValidateErrorMessageWithModifiedOdeFlags(input, enableOde: false, clientDisableOde: true); + await this.ValidateErrorMessageWithModifiedOdeFlags(input, enableOde: false, clientDisableOde: false); + } + + private async Task ValidateErrorMessageWithModifiedOdeFlags(OptimisticDirectExecutionTestInput input, bool enableOde, bool clientDisableOde) + { + string expectedErrorMessage = "Execution of this query using the supplied continuation token requires EnableOptimisticDirectExecution to be set in QueryRequestOptions. " + + "If the error persists after that, contact system administrator."; + try + { + int result = await this.GetPipelineAndDrainAsync( + input, + numItems: 100, + isMultiPartition: false, + expectedContinuationTokenCount: 10, + requiresDist: false, + enableOde, + clientDisableOde); + + Assert.Fail("A MalformedContinuationTokenException was expected in this scenario"); + } + catch (Exception ex) + { + Assert.IsTrue(ex.InnerException.Message.Contains(expectedErrorMessage)); + } + } + // Creates a gone exception after the first MoveNexyAsync() call. This allows for the pipeline to return some documents before failing private static async Task ExecuteGoneExceptionOnODEPipeline(bool isMultiPartition) { @@ -584,23 +669,28 @@ private static async Task TestHandlingOfFailedFallbackPipeline(bool isMult return (mergeTest, queryPipelineStage); } - private async Task GetPipelineAndDrainAsync(OptimisticDirectExecutionTestInput input, int numItems, bool isMultiPartition, int expectedContinuationTokenCount, bool requiresDist = false) + private async Task GetPipelineAndDrainAsync(OptimisticDirectExecutionTestInput input, int numItems, bool isMultiPartition, int expectedContinuationTokenCount, bool requiresDist = false, bool enableOptimisticDirectExecution = true, bool clientDisableOde = false) { - QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: true); - DocumentContainer inMemoryCollection = await CreateDocumentContainerAsync(numItems, multiPartition: isMultiPartition, requiresDist: requiresDist); - IQueryPipelineStage queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions); - List documents = new List(); int continuationTokenCount = 0; + List documents = new List(); + QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution); + DocumentContainer inMemoryCollection = await CreateDocumentContainerAsync(numItems, multiPartition: isMultiPartition, requiresDist: requiresDist); + IQueryPipelineStage queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions, clientDisableOde); while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton)) { - if (!requiresDist) + TryCatch tryGetPage = queryPipelineStage.Current; + tryGetPage.ThrowIfFailed(); + + if (clientDisableOde || !enableOptimisticDirectExecution) { - Assert.AreEqual(TestInjections.PipelineType.OptimisticDirectExecution, queryRequestOptions.TestSettings.Stats.PipelineType.Value); + Assert.AreNotEqual(TestInjections.PipelineType.OptimisticDirectExecution, queryRequestOptions.TestSettings.Stats.PipelineType.Value); } - TryCatch tryGetPage = queryPipelineStage.Current; - tryGetPage.ThrowIfFailed(); + if (!clientDisableOde && enableOptimisticDirectExecution && !requiresDist) + { + Assert.AreEqual(TestInjections.PipelineType.OptimisticDirectExecution, queryRequestOptions.TestSettings.Stats.PipelineType.Value); + } documents.AddRange(tryGetPage.Result.Documents); @@ -628,9 +718,10 @@ private async Task GetPipelineAndDrainAsync(OptimisticDirectExecutionTestIn return documents.Count; } - internal static TryCatch TryGetPartitionedQueryExecutionInfo(string querySpecJsonString, PartitionKeyDefinition pkDefinition) + internal static Tuple GetPartitionedQueryExecutionInfoAndPartitionProvider(string querySpecJsonString, PartitionKeyDefinition pkDefinition, bool clientDisableOde = false) { - TryCatch tryGetQueryPlan = QueryPartitionProviderTestInstance.Object.TryGetPartitionedQueryExecutionInfo( + QueryPartitionProvider queryPartitionProvider = CreateCustomQueryPartitionProvider("clientDisableOptimisticDirectExecution", clientDisableOde.ToString().ToLower()); + TryCatch tryGetQueryPlan = queryPartitionProvider.TryGetPartitionedQueryExecutionInfo( querySpecJsonString: querySpecJsonString, partitionKeyDefinition: pkDefinition, requireFormattableOrderByQuery: true, @@ -640,14 +731,14 @@ internal static TryCatch TryGetPartitionedQueryEx allowDCount: true, useSystemPrefix: false, geospatialType: Cosmos.GeospatialType.Geography); - - return tryGetQueryPlan; + + PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = tryGetQueryPlan.Succeeded ? tryGetQueryPlan.Result : throw tryGetQueryPlan.Exception; + return Tuple.Create(partitionedQueryExecutionInfo, queryPartitionProvider); } - private static Task GetOdePipelineAsync(OptimisticDirectExecutionTestInput input, DocumentContainer documentContainer, QueryRequestOptions queryRequestOptions) + private static async Task GetOdePipelineAsync(OptimisticDirectExecutionTestInput input, DocumentContainer documentContainer, QueryRequestOptions queryRequestOptions, bool clientDisableOde = false) { - (CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) = CreateInputParamsAndQueryContext(input, queryRequestOptions); - + (CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) = CreateInputParamsAndQueryContext(input, queryRequestOptions, clientDisableOde); IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create( documentContainer, cosmosQueryContextCore, @@ -655,7 +746,7 @@ private static Task GetOdePipelineAsync(OptimisticDirectExe NoOpTrace.Singleton); Assert.IsNotNull(queryPipelineStage); - return Task.FromResult(queryPipelineStage); + return queryPipelineStage; } private static async Task CreateDocumentContainerAsync( @@ -725,6 +816,38 @@ private static async Task CreateDocumentContainerAsync( return documentContainer; } + private static QueryPartitionProvider CreateCustomQueryPartitionProvider(string key, string value) + { + Dictionary queryEngineConfiguration = new Dictionary() + { + {"maxSqlQueryInputLength", 262144}, + {"maxJoinsPerSqlQuery", 5}, + {"maxLogicalAndPerSqlQuery", 2000}, + {"maxLogicalOrPerSqlQuery", 2000}, + {"maxUdfRefPerSqlQuery", 10}, + {"maxInExpressionItemsCount", 16000}, + {"queryMaxGroupByTableCellCount", 500000 }, + {"queryMaxInMemorySortDocumentCount", 500}, + {"maxQueryRequestTimeoutFraction", 0.90}, + {"sqlAllowNonFiniteNumbers", false}, + {"sqlAllowAggregateFunctions", true}, + {"sqlAllowSubQuery", true}, + {"sqlAllowScalarSubQuery", true}, + {"allowNewKeywords", true}, + {"sqlAllowLike", true}, + {"sqlAllowGroupByClause", true}, + {"maxSpatialQueryCells", 12}, + {"spatialMaxGeometryPointCount", 256}, + {"sqlDisableQueryILOptimization", false}, + {"sqlDisableFilterPlanOptimization", false}, + {"clientDisableOptimisticDirectExecution", false} + }; + + queryEngineConfiguration[key] = bool.TryParse(value, out bool boolValue) ? boolValue : value; + + return new QueryPartitionProvider(queryEngineConfiguration); + } + private static OptimisticDirectExecutionTestInput CreateInput( string description, string query, @@ -755,11 +878,8 @@ public override OptimisticDirectExecutionTestOutput ExecuteTest(OptimisticDirect // gets DocumentContainer IMonadicDocumentContainer monadicDocumentContainer = new InMemoryContainer(input.PartitionKeyDefinition); DocumentContainer documentContainer = new DocumentContainer(monadicDocumentContainer); - QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: true); - (CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) = CreateInputParamsAndQueryContext(input, queryRequestOptions); - IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create( documentContainer, cosmosQueryContextCore, @@ -783,12 +903,13 @@ public override OptimisticDirectExecutionTestOutput ExecuteTest(OptimisticDirect return new OptimisticDirectExecutionTestOutput(input.ExpectedOptimisticDirectExecution); } - private static Tuple CreateInputParamsAndQueryContext(OptimisticDirectExecutionTestInput input, QueryRequestOptions queryRequestOptions) + private static Tuple CreateInputParamsAndQueryContext(OptimisticDirectExecutionTestInput input, QueryRequestOptions queryRequestOptions, bool clientDisableOde = false) { CosmosSerializerCore serializerCore = new(); using StreamReader streamReader = new(serializerCore.ToStreamSqlQuerySpec(new SqlQuerySpec(input.Query), Documents.ResourceType.Document)); string sqlQuerySpecJsonString = streamReader.ReadToEnd(); + (PartitionedQueryExecutionInfo partitionedQueryExecutionInfo, QueryPartitionProvider queryPartitionProvider) = GetPartitionedQueryExecutionInfoAndPartitionProvider(sqlQuerySpecJsonString, input.PartitionKeyDefinition, clientDisableOde); CosmosQueryExecutionContextFactory.InputParameters inputParameters = new CosmosQueryExecutionContextFactory.InputParameters( sqlQuerySpec: new SqlQuerySpec(input.Query), initialUserContinuationToken: input.ContinuationToken, @@ -807,7 +928,7 @@ public override OptimisticDirectExecutionTestOutput ExecuteTest(OptimisticDirect string databaseId = "db1234"; string resourceLink = $"dbs/{databaseId}/colls"; CosmosQueryContextCore cosmosQueryContextCore = new CosmosQueryContextCore( - client: new TestCosmosQueryClient(), + client: new TestCosmosQueryClient(queryPartitionProvider), resourceTypeEnum: Documents.ResourceType.Document, operationType: Documents.OperationType.Query, resourceType: typeof(QueryResponseCore), @@ -1032,6 +1153,13 @@ public override void SerializeAsXml(XmlWriter xmlWriter) internal class TestCosmosQueryClient : CosmosQueryClient { + private readonly QueryPartitionProvider queryPartitionProvider; + + public TestCosmosQueryClient(QueryPartitionProvider queryPartitionProvider) + { + this.queryPartitionProvider = queryPartitionProvider; + } + public override Action OnExecuteScalarQueryCallback => throw new NotImplementedException(); public override bool BypassQueryParsing() @@ -1075,6 +1203,11 @@ public override Task GetCachedContainerQueryProperties Cosmos.GeospatialType.Geometry)); } + public override async Task GetClientDisableOptimisticDirectExecutionAsync() + { + return this.queryPartitionProvider.ClientDisableOptimisticDirectExecution; + } + public override Task> GetTargetPartitionKeyRangeByFeedRangeAsync(string resourceLink, string collectionResourceId, PartitionKeyDefinition partitionKeyDefinition, FeedRangeInternal feedRangeInternal, bool forceRefresh, ITrace trace) { throw new NotImplementedException(); @@ -1095,15 +1228,14 @@ public override Task> TryGetOverlappingRangesAs throw new NotImplementedException(); } - public override Task> TryGetPartitionedQueryExecutionInfoAsync(SqlQuerySpec sqlQuerySpec, ResourceType resourceType, PartitionKeyDefinition partitionKeyDefinition, bool requireFormattableOrderByQuery, bool isContinuationExpected, bool allowNonValueAggregateQuery, bool hasLogicalPartitionKey, bool allowDCount, bool useSystemPrefix, Cosmos.GeospatialType geospatialType, CancellationToken cancellationToken) + public override async Task> TryGetPartitionedQueryExecutionInfoAsync(SqlQuerySpec sqlQuerySpec, ResourceType resourceType, PartitionKeyDefinition partitionKeyDefinition, bool requireFormattableOrderByQuery, bool isContinuationExpected, bool allowNonValueAggregateQuery, bool hasLogicalPartitionKey, bool allowDCount, bool useSystemPrefix, Cosmos.GeospatialType geospatialType, CancellationToken cancellationToken) { CosmosSerializerCore serializerCore = new(); using StreamReader streamReader = new(serializerCore.ToStreamSqlQuerySpec(sqlQuerySpec, Documents.ResourceType.Document)); string sqlQuerySpecJsonString = streamReader.ReadToEnd(); - - TryCatch queryPlan = OptimisticDirectExecutionQueryBaselineTests.TryGetPartitionedQueryExecutionInfo(sqlQuerySpecJsonString, partitionKeyDefinition); - PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = queryPlan.Succeeded ? queryPlan.Result : throw queryPlan.Exception; - return Task.FromResult(TryCatch.FromResult(partitionedQueryExecutionInfo)); + + (PartitionedQueryExecutionInfo partitionedQueryExecutionInfo, QueryPartitionProvider queryPartitionProvider) = OptimisticDirectExecutionQueryBaselineTests.GetPartitionedQueryExecutionInfoAndPartitionProvider(sqlQuerySpecJsonString, partitionKeyDefinition); + return TryCatch.FromResult(partitionedQueryExecutionInfo); } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs index b02f19374a..ce2d808304 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs @@ -390,8 +390,7 @@ private async Task TestPageSizeAsync(string query, int expectedPageSize, int exp using StreamReader streamReader = new(serializerCore.ToStreamSqlQuerySpec(sqlQuerySpec, Documents.ResourceType.Document)); string sqlQuerySpecJsonString = streamReader.ReadToEnd(); - TryCatch queryPlan = OptimisticDirectExecutionQueryBaselineTests.TryGetPartitionedQueryExecutionInfo(sqlQuerySpecJsonString, partitionKeyDefinition); - PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = queryPlan.Succeeded ? queryPlan.Result : throw queryPlan.Exception; + (PartitionedQueryExecutionInfo partitionedQueryExecutionInfo, QueryPartitionProvider queryPartitionProvider) = OptimisticDirectExecutionQueryBaselineTests.GetPartitionedQueryExecutionInfoAndPartitionProvider(sqlQuerySpecJsonString, partitionKeyDefinition); return Task.FromResult(TryCatch.FromResult(partitionedQueryExecutionInfo)); } );