Skip to content

Commit

Permalink
Query: Add Optimistic Direct Execution configuration override support…
Browse files Browse the repository at this point in the history
… on the Client (#4122)

* Added ability to accept the AllowOptimisticDirectExecution flag from the backend and use that flag to decide if the Ode pipeline should be used or not.

* Added comment and removed extra spacing

* Added test coverage

* Added exception handling logic

* Resolved comments

* Added null check for key parameter

* Removed changes to common test infra

* Removed all changes from QueryPartitionProviderTestInstance

* Remove changes pt2

* Removed the dictionary in QueryPartitionProvider and added a bool instead

* Updated GetClientDisableOptimisticDirectExecution()

* Fixed comments

* Revert QueryIterator.cs

* Undoing changes to settings.json

* Undoing changes to QueryIterator.cs

* Updated error message

* Made functions static

* Cast to bool instead of recasting in GetClientDisableOptimisticDirectExecution()

* Added ignore flag

* Fixed merge conflicts

* Updated GetPartitionedQueryExecutionInfoAndPartitionProvider()

* Updated return type in OffsetLimitPageSize()
  • Loading branch information
akotalwar authored Dec 13, 2023
1 parent 07aa28e commit 6258e24
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext

internal static class CosmosQueryExecutionContextFactory
{
internal const string ClientDisableOptimisticDirectExecution = "clientDisableOptimisticDirectExecution";
private const string InternalPartitionKeyDefinitionProperty = "x-ms-query-partitionkey-definition";
private const string QueryInspectionPattern = @"\s+(GROUP\s+BY\s+|COUNT\s*\(|MIN\s*\(|MAX\s*\(|AVG\s*\(|SUM\s*\(|DISTINCT\s+)";
private const string OptimisticDirectExecution = "OptimisticDirectExecution";
Expand Down Expand Up @@ -250,9 +251,9 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione

TryCatch<IQueryPipelineStage> tryCreatePipelineStage;
Documents.PartitionKeyRange targetRange = await TryGetTargetRangeOptimisticDirectExecutionAsync(
inputParameters,
partitionedQueryExecutionInfo,
cosmosQueryContext,
inputParameters,
partitionedQueryExecutionInfo,
cosmosQueryContext,
containerQueryProperties,
trace);

Expand Down Expand Up @@ -761,13 +762,18 @@ private static Documents.PartitionKeyDefinition GetPartitionKeyDefinition(InputP
ContainerQueryProperties containerQueryProperties,
ITrace trace)
{
if (!inputParameters.EnableOptimisticDirectExecution)
bool clientDisableOptimisticDirectExecution = await cosmosQueryContext.QueryClient.GetClientDisableOptimisticDirectExecutionAsync();

// Use the Ode code path only if ClientDisableOptimisticDirectExecution is false and EnableOptimisticDirectExecution is true
if (clientDisableOptimisticDirectExecution || !inputParameters.EnableOptimisticDirectExecution)
{
if (inputParameters.InitialUserContinuationToken != null
&& OptimisticDirectExecutionContinuationToken.IsOptimisticDirectExecutionContinuationToken(inputParameters.InitialUserContinuationToken))
if (inputParameters.InitialUserContinuationToken != null
&& OptimisticDirectExecutionContinuationToken.IsOptimisticDirectExecutionContinuationToken(inputParameters.InitialUserContinuationToken))
{
throw new MalformedContinuationTokenException($"The continuation token supplied requires the Optimistic Direct Execution flag to be enabled in QueryRequestOptions for the query execution to resume. " +
$"{inputParameters.InitialUserContinuationToken}");
string errorMessage = "Execution of this query using the supplied continuation token requires EnableOptimisticDirectExecution to be set in QueryRequestOptions. " +
"If the error persists after that, contact system administrator.";

throw new MalformedContinuationTokenException($"{errorMessage} Continuation Token: {inputParameters.InitialUserContinuationToken}");
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public abstract Task<TryCatch<QueryPage>> ExecuteItemQueryAsync(
ITrace trace,
CancellationToken cancellationToken);

public abstract Task<bool> GetClientDisableOptimisticDirectExecutionAsync();

public abstract Task<PartitionedQueryExecutionInfo> ExecuteQueryPlanRequestAsync(
string resourceUri,
Documents.ResourceType resourceType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.QueryPlan
using System.Text;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Tracing;
Expand Down Expand Up @@ -46,6 +47,9 @@ internal sealed class QueryPartitionProvider : IDisposable
private bool disposed;
private string queryengineConfiguration;

// TODO: Move this into a config class of its own
public bool ClientDisableOptimisticDirectExecution { get; private set; }

public QueryPartitionProvider(IDictionary<string, object> queryengineConfiguration)
{
if (queryengineConfiguration == null)
Expand All @@ -60,6 +64,7 @@ public QueryPartitionProvider(IDictionary<string, object> queryengineConfigurati

this.disposed = false;
this.queryengineConfiguration = JsonConvert.SerializeObject(queryengineConfiguration);
this.ClientDisableOptimisticDirectExecution = GetClientDisableOptimisticDirectExecution((IReadOnlyDictionary<string, object>)queryengineConfiguration);
this.serviceProvider = IntPtr.Zero;

this.serviceProviderStateLock = new object();
Expand Down Expand Up @@ -92,6 +97,7 @@ public void Update(IDictionary<string, object> queryengineConfiguration)
if (!string.Equals(this.queryengineConfiguration, newConfiguration))
{
this.queryengineConfiguration = newConfiguration;
this.ClientDisableOptimisticDirectExecution = GetClientDisableOptimisticDirectExecution((IReadOnlyDictionary<string, object>)queryengineConfiguration);

if (!this.disposed && this.serviceProvider != IntPtr.Zero)
{
Expand Down Expand Up @@ -132,6 +138,7 @@ public TryCatch<PartitionedQueryExecutionInfo> TryGetPartitionedQueryExecutionIn
allowDCount: allowDCount,
useSystemPrefix: useSystemPrefix,
geospatialType: geospatialType);

if (!tryGetInternalQueryInfo.Succeeded)
{
return TryCatch<PartitionedQueryExecutionInfo>.FromException(tryGetInternalQueryInfo.Exception);
Expand All @@ -141,6 +148,16 @@ public TryCatch<PartitionedQueryExecutionInfo> TryGetPartitionedQueryExecutionIn
return TryCatch<PartitionedQueryExecutionInfo>.FromResult(queryInfo);
}

private static bool GetClientDisableOptimisticDirectExecution(IReadOnlyDictionary<string, object> queryengineConfiguration)
{
if (queryengineConfiguration.TryGetValue(CosmosQueryExecutionContextFactory.ClientDisableOptimisticDirectExecution, out object queryConfigProperty))
{
return (bool)queryConfigProperty;
}

return false;
}

internal PartitionedQueryExecutionInfo ConvertPartitionedQueryExecutionInfo(
PartitionedQueryExecutionInfoInternal queryInfoInternal,
PartitionKeyDefinition partitionKeyDefinition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ public override async Task<PartitionedQueryExecutionInfo> ExecuteQueryPlanReques
return partitionedQueryExecutionInfo;
}

public override async Task<bool> GetClientDisableOptimisticDirectExecutionAsync()
{
QueryPartitionProvider provider = await this.clientContext.DocumentClient.QueryPartitionProvider;
return provider.ClientDisableOptimisticDirectExecution;
}

public override async Task<List<PartitionKeyRange>> GetTargetPartitionKeyRangeByFeedRangeAsync(
string resourceLink,
string collectionResourceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public sealed class OptimisticDirectExecutionQueryTests : QueryTestsBase
private const string PartitionKeyField = "key";
private const string NumberField = "numberField";
private const string NullField = "nullField";
private const string ClientDisableOptimisticDirectExecution = "clientDisableOptimisticDirectExecution";

private static class PageSizeOptions
{
Expand Down Expand Up @@ -538,6 +539,22 @@ await this.CreateIngestQueryDeleteAsync(
documents,
(container, documents) => RunFailingTests(container, invalidQueries),
"/" + PartitionKeyField);
}

//TODO: Remove Ignore flag once emulator is updated to 1101
[Ignore]
[TestMethod]
public async Task TestClientDisableOdeDefaultValue()
{
string authKey = Utils.ConfigurationManager.AppSettings["MasterKey"];
string endpoint = Utils.ConfigurationManager.AppSettings["GatewayEndpoint"];

CosmosClient client = new CosmosClient($"AccountEndpoint={endpoint};AccountKey={authKey}");
AccountProperties properties = await client.ReadAccountAsync();

bool success = bool.TryParse(properties.QueryEngineConfiguration[ClientDisableOptimisticDirectExecution].ToString(), out bool clientDisablOde);
Assert.IsTrue(success, $"Parsing must succeed. Value supplied '{ClientDisableOptimisticDirectExecution}'");
Assert.IsFalse(clientDisablOde);
}

private static async Task RunTests(IEnumerable<DirectExecutionTestCase> testCases, Container container)
Expand Down
Loading

0 comments on commit 6258e24

Please sign in to comment.