Skip to content

Commit

Permalink
** DONOT MERGE ** Enabling queryplan cache by default (#24673)
Browse files Browse the repository at this point in the history
* Enabling queryplan cache by default
- Queryplan cache, an LRU cache of size 1000 would try to cache query plan for simple queries
  • Loading branch information
mbhaskar authored Oct 14, 2021
1 parent e0d5e3e commit 01ad248
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.Map;

/**
* Provides a client-side logical representation of the Azure Cosmos DB
Expand Down Expand Up @@ -1570,7 +1570,7 @@ Flux<FeedResponse<Document>> readAllDocuments(
CosmosQueryRequestOptions options
);

ConcurrentMap<String, PartitionedQueryExecutionInfo> getQueryPlanCache();
Map<String, PartitionedQueryExecutionInfo> getQueryPlanCache();

/**
* Gets the collection cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public static String getEnvironmentName() {

public static boolean isQueryPlanCachingEnabled() {
// Queryplan caching will be disabled by default
return getJVMConfigAsBoolean(QUERYPLAN_CACHING_ENABLED, false);
return getJVMConfigAsBoolean(QUERYPLAN_CACHING_ENABLED, true);
}

public static int getAddressRefreshResponseTimeoutInSeconds() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,6 @@ public static final class PartitionedQueryExecutionInfo {
public static final class QueryExecutionContext {
public static final String INCREMENTAL_FEED_HEADER_VALUE = "Incremental feed";
}

public static final int QUERYPLAN_CACHE_SIZE = 1000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.azure.cosmos.implementation.batch.PartitionKeyRangeServerBatchRequest;
import com.azure.cosmos.implementation.batch.ServerBatchRequest;
import com.azure.cosmos.implementation.batch.SinglePartitionKeyServerBatchRequest;
import com.azure.cosmos.implementation.caches.SizeLimitingLRUCache;
import com.azure.cosmos.implementation.caches.RxClientCollectionCache;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
Expand Down Expand Up @@ -142,7 +143,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization
private RxPartitionKeyRangeCache partitionKeyRangeCache;
private Map<String, List<PartitionKeyAndResourceTokenPair>> resourceTokensMap;
private final boolean contentResponseOnWriteEnabled;
private ConcurrentMap<String, PartitionedQueryExecutionInfo> queryPlanCache;
private Map<String, PartitionedQueryExecutionInfo> queryPlanCache;

private final AtomicBoolean closed = new AtomicBoolean(false);
private final int clientId;
Expand Down Expand Up @@ -362,7 +363,7 @@ private RxDocumentClientImpl(URI serviceEndpoint,
this.retryPolicy = new RetryPolicy(this, this.globalEndpointManager, this.connectionPolicy);
this.resetSessionTokenRetryPolicy = retryPolicy;
CpuMemoryMonitor.register(this);
this.queryPlanCache = new ConcurrentHashMap<>();
this.queryPlanCache = Collections.synchronizedMap(new SizeLimitingLRUCache(Constants.QUERYPLAN_CACHE_SIZE));
} catch (RuntimeException e) {
logger.error("unexpected failure in initializing client.", e);
close();
Expand Down Expand Up @@ -2462,7 +2463,7 @@ public Flux<FeedResponse<Document>> readAllDocuments(
}

@Override
public ConcurrentMap<String, PartitionedQueryExecutionInfo> getQueryPlanCache() {
public Map<String, PartitionedQueryExecutionInfo> getQueryPlanCache() {
return queryPlanCache;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.caches;

import com.azure.cosmos.implementation.query.PartitionedQueryExecutionInfo;

import java.util.LinkedHashMap;
import java.util.Map;

/**
* LRU Cache using LinkedHashMap that limits the number of entries
*/
public class SizeLimitingLRUCache extends LinkedHashMap<String, PartitionedQueryExecutionInfo> {

private static final long serialVersionUID = 1L;
private final int maxEntries;

public SizeLimitingLRUCache(int maxEntries) {
this.maxEntries = maxEntries;
}

public SizeLimitingLRUCache(int initialCapacity, float loadFactor, int maxEntries) {
super(initialCapacity, loadFactor);
this.maxEntries = maxEntries;
}

public SizeLimitingLRUCache(
Map<? extends String, ? extends PartitionedQueryExecutionInfo> m, int maxEntries) {
super(m);
this.maxEntries = maxEntries;
}

public SizeLimitingLRUCache(int initialCapacity, float loadFactor, boolean accessOrder, int maxEntries) {
super(initialCapacity, loadFactor, accessOrder);
this.maxEntries = maxEntries;
}

public SizeLimitingLRUCache(int initialCapacity, int maxEntries) {
super(initialCapacity);
this.maxEntries = maxEntries;
}

@Override
protected boolean removeEldestEntry(
Map.Entry<String, PartitionedQueryExecutionInfo> eldest) {
return size() > maxEntries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -71,7 +70,7 @@ private static <T extends Resource> Mono<Pair<List<Range<String>>,QueryInfo>> ge
String resourceLink,
DocumentCollection collection,
DefaultDocumentQueryExecutionContext<T> queryExecutionContext, boolean queryPlanCachingEnabled,
ConcurrentMap<String, PartitionedQueryExecutionInfo> queryPlanCache) {
Map<String, PartitionedQueryExecutionInfo> queryPlanCache) {

// The partitionKeyRangeIdInternal is no more a public API on
// FeedOptions, but have the below condition
Expand Down Expand Up @@ -99,6 +98,7 @@ private static <T extends Resource> Mono<Pair<List<Range<String>>,QueryInfo>> ge
Instant endTime = Instant.now(); // endTime for query plan diagnostics
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = queryPlanCache.get(query.getQueryText());
if (partitionedQueryExecutionInfo != null) {
logger.info("Skipping query plan round trip by using the cached plan");
return getTargetRangesFromQueryPlan(cosmosQueryRequestOptions, collection, queryExecutionContext,
partitionedQueryExecutionInfo, startTime, endTime);
}
Expand All @@ -117,7 +117,7 @@ private static <T extends Resource> Mono<Pair<List<Range<String>>,QueryInfo>> ge

Instant endTime = Instant.now();

if (queryPlanCachingEnabled) {
if (queryPlanCachingEnabled && isScopedToSinglePartition(cosmosQueryRequestOptions)) {
tryCacheQueryPlan(query, partitionedQueryExecutionInfo, queryPlanCache);
}

Expand Down Expand Up @@ -169,14 +169,9 @@ private static <T extends Resource> Mono<Pair<List<Range<String>>, QueryInfo>> g
private static void tryCacheQueryPlan(
SqlQuerySpec query,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
ConcurrentMap<String, PartitionedQueryExecutionInfo> queryPlanCache) {
Map<String, PartitionedQueryExecutionInfo> queryPlanCache) {
QueryInfo queryInfo = partitionedQueryExecutionInfo.getQueryInfo();
if (canCacheQuery(queryInfo) && !queryPlanCache.containsKey(query.getQueryText())) {
if (queryPlanCache.size() > MAX_CACHE_SIZE) {
// Clearing query plan cache if size is above max size. This can be optimized in future by using
// a threadsafe LRU cache
queryPlanCache.clear();
}
queryPlanCache.put(query.getQueryText(), partitionedQueryExecutionInfo);
}
}
Expand All @@ -188,7 +183,9 @@ private static boolean canCacheQuery(QueryInfo queryInfo) {
&& !queryInfo.hasGroupBy()
&& !queryInfo.hasLimit()
&& !queryInfo.hasTop()
&& !queryInfo.hasOffset();
&& !queryInfo.hasOffset()
&& !queryInfo.hasDCount()
&& !queryInfo.hasOrderBy();
}

private static boolean isScopedToSinglePartition(CosmosQueryRequestOptions cosmosQueryRequestOptions) {
Expand All @@ -208,7 +205,7 @@ public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext
boolean isContinuationExpected,
UUID correlatedActivityId,
boolean queryPlanCachingEnabled,
ConcurrentMap<String, PartitionedQueryExecutionInfo> queryPlanCache) {
Map<String, PartitionedQueryExecutionInfo> queryPlanCache) {

// return proxy
Flux<Utils.ValueHolder<DocumentCollection>> collectionObs = Flux.just(new Utils.ValueHolder<>(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ public void queryOptionNullValidation() {
private Object[][] query() {
return new Object[][]{
new Object[] { "Select * from c "},
new Object[] { "select * from c order by c.prop ASC"},
};
}

Expand Down Expand Up @@ -299,7 +298,6 @@ public void queryPlanCacheSinglePartitionParameterizedQueriesCorrectness() {
createdContainer);
documentsInserted.addAll(pk2Docs);


CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
options.setPartitionKey(new PartitionKey(pk2));

Expand All @@ -325,6 +323,20 @@ public void queryPlanCacheSinglePartitionParameterizedQueriesCorrectness() {
// Top query should not be cached
assertThat(contextClient.getQueryPlanCache().containsKey(sqlQuerySpec.getQueryText())).isFalse();

// group by should not be cached
sqlQuerySpec.setQueryText("select max(c.id) from c order by c.name group by c.name");
values1 = queryAndGetResults(sqlQuerySpec, options, TestObject.class);
assertThat(contextClient.getQueryPlanCache().containsKey(sqlQuerySpec.getQueryText())).isFalse();

// distinct queries should not be cached
sqlQuerySpec.setQueryText("SELECT distinct c.name from c");
values1 = queryAndGetResults(sqlQuerySpec, options, TestObject.class);
assertThat(contextClient.getQueryPlanCache().containsKey(sqlQuerySpec.getQueryText())).isFalse();

//order by query should not be cached
sqlQuerySpec.setQueryText("select * from c order by c.name");
values1 = queryAndGetResults(sqlQuerySpec, options, TestObject.class);
assertThat(contextClient.getQueryPlanCache().containsKey(sqlQuerySpec.getQueryText())).isFalse();
}

@Test(groups = {"simple"}, timeOut = TIMEOUT * 40)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;

import static com.azure.spring.data.cosmos.common.TestConstants.ADDRESSES;
import static com.azure.spring.data.cosmos.common.TestConstants.FIRST_NAME;
Expand Down Expand Up @@ -144,7 +144,7 @@ public void testFindWithPartitionWithQueryPlanCachingEnabled() {

CosmosAsyncClient cosmosAsyncClient = cosmosFactory.getCosmosAsyncClient();
AsyncDocumentClient asyncDocumentClient = CosmosBridgeInternal.getAsyncDocumentClient(cosmosAsyncClient);
ConcurrentMap<String, PartitionedQueryExecutionInfo> initialCache = asyncDocumentClient.getQueryPlanCache();
Map<String, PartitionedQueryExecutionInfo> initialCache = asyncDocumentClient.getQueryPlanCache();
assertThat(initialCache.containsKey(sqlQuerySpec.getQueryText())).isTrue();
int initialSize = initialCache.size();

Expand All @@ -158,7 +158,7 @@ public void testFindWithPartitionWithQueryPlanCachingEnabled() {
result = TestUtils.toList(cosmosTemplate.find(query, PartitionPerson.class,
PartitionPerson.class.getSimpleName()));

ConcurrentMap<String, PartitionedQueryExecutionInfo> postQueryCallCache = asyncDocumentClient.getQueryPlanCache();
Map<String, PartitionedQueryExecutionInfo> postQueryCallCache = asyncDocumentClient.getQueryPlanCache();
assertThat(postQueryCallCache.containsKey(sqlQuerySpec.getQueryText())).isTrue();
assertThat(postQueryCallCache.size()).isEqualTo(initialSize);
assertThat(result.size()).isEqualTo(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import reactor.test.StepVerifier;

import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -126,7 +126,7 @@ public void testFindWithPartitionWithQueryPlanCachingEnabled() {

CosmosAsyncClient cosmosAsyncClient = cosmosFactory.getCosmosAsyncClient();
AsyncDocumentClient asyncDocumentClient = CosmosBridgeInternal.getAsyncDocumentClient(cosmosAsyncClient);
ConcurrentMap<String, PartitionedQueryExecutionInfo> initialCache = asyncDocumentClient.getQueryPlanCache();
Map<String, PartitionedQueryExecutionInfo> initialCache = asyncDocumentClient.getQueryPlanCache();
assertThat(initialCache.containsKey(sqlQuerySpec.getQueryText())).isTrue();
int initialSize = initialCache.size();

Expand All @@ -145,7 +145,7 @@ public void testFindWithPartitionWithQueryPlanCachingEnabled() {
Assert.assertThat(actual.getZipCode(), is(equalTo(TEST_PERSON_2.getZipCode())));
}).verifyComplete();

ConcurrentMap<String, PartitionedQueryExecutionInfo> postQueryCallCache = asyncDocumentClient.getQueryPlanCache();
Map<String, PartitionedQueryExecutionInfo> postQueryCallCache = asyncDocumentClient.getQueryPlanCache();
assertThat(postQueryCallCache.containsKey(sqlQuerySpec.getQueryText())).isTrue();
assertThat(postQueryCallCache.size()).isEqualTo(initialSize);
}
Expand Down

0 comments on commit 01ad248

Please sign in to comment.