Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabling queryplan cache by default #24673

Merged
merged 2 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.Map;

/**
* Provides a client-side logical representation of the Azure Cosmos DB
Expand Down Expand Up @@ -1570,7 +1570,7 @@ Flux<FeedResponse<Document>> readAllDocuments(
CosmosQueryRequestOptions options
);

ConcurrentMap<String, PartitionedQueryExecutionInfo> getQueryPlanCache();
Map<String, PartitionedQueryExecutionInfo> getQueryPlanCache();

/**
* Gets the collection cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public static String getEnvironmentName() {

public static boolean isQueryPlanCachingEnabled() {
// Queryplan caching will be disabled by default
return getJVMConfigAsBoolean(QUERYPLAN_CACHING_ENABLED, false);
return getJVMConfigAsBoolean(QUERYPLAN_CACHING_ENABLED, true);
}

public static int getAddressRefreshResponseTimeoutInSeconds() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,6 @@ public static final class PartitionedQueryExecutionInfo {
public static final class QueryExecutionContext {
public static final String INCREMENTAL_FEED_HEADER_VALUE = "Incremental feed";
}

public static final int QUERYPLAN_CACHE_SIZE = 1000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.azure.cosmos.implementation.batch.PartitionKeyRangeServerBatchRequest;
import com.azure.cosmos.implementation.batch.ServerBatchRequest;
import com.azure.cosmos.implementation.batch.SinglePartitionKeyServerBatchRequest;
import com.azure.cosmos.implementation.caches.SizeLimitingLRUCache;
import com.azure.cosmos.implementation.caches.RxClientCollectionCache;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
Expand Down Expand Up @@ -142,7 +143,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization
private RxPartitionKeyRangeCache partitionKeyRangeCache;
private Map<String, List<PartitionKeyAndResourceTokenPair>> resourceTokensMap;
private final boolean contentResponseOnWriteEnabled;
private ConcurrentMap<String, PartitionedQueryExecutionInfo> queryPlanCache;
private Map<String, PartitionedQueryExecutionInfo> queryPlanCache;

private final AtomicBoolean closed = new AtomicBoolean(false);
private final int clientId;
Expand Down Expand Up @@ -362,7 +363,7 @@ private RxDocumentClientImpl(URI serviceEndpoint,
this.retryPolicy = new RetryPolicy(this, this.globalEndpointManager, this.connectionPolicy);
this.resetSessionTokenRetryPolicy = retryPolicy;
CpuMemoryMonitor.register(this);
this.queryPlanCache = new ConcurrentHashMap<>();
this.queryPlanCache = Collections.synchronizedMap(new SizeLimitingLRUCache(Constants.QUERYPLAN_CACHE_SIZE));
} catch (RuntimeException e) {
logger.error("unexpected failure in initializing client.", e);
close();
Expand Down Expand Up @@ -2462,7 +2463,7 @@ public Flux<FeedResponse<Document>> readAllDocuments(
}

@Override
public ConcurrentMap<String, PartitionedQueryExecutionInfo> getQueryPlanCache() {
public Map<String, PartitionedQueryExecutionInfo> getQueryPlanCache() {
return queryPlanCache;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.caches;

import com.azure.cosmos.implementation.query.PartitionedQueryExecutionInfo;

import java.util.LinkedHashMap;
import java.util.Map;

/**
* LRU Cache using LinkedHashMap that limits the number of entries
*/
public class SizeLimitingLRUCache extends LinkedHashMap<String, PartitionedQueryExecutionInfo> {
mbhaskar marked this conversation as resolved.
Show resolved Hide resolved

private static final long serialVersionUID = 1L;
private final int maxEntries;

public SizeLimitingLRUCache(int maxEntries) {
this.maxEntries = maxEntries;
}

public SizeLimitingLRUCache(int initialCapacity, float loadFactor, int maxEntries) {
super(initialCapacity, loadFactor);
this.maxEntries = maxEntries;
}

public SizeLimitingLRUCache(
Map<? extends String, ? extends PartitionedQueryExecutionInfo> m, int maxEntries) {
super(m);
this.maxEntries = maxEntries;
}

public SizeLimitingLRUCache(int initialCapacity, float loadFactor, boolean accessOrder, int maxEntries) {
super(initialCapacity, loadFactor, accessOrder);
this.maxEntries = maxEntries;
}

public SizeLimitingLRUCache(int initialCapacity, int maxEntries) {
super(initialCapacity);
this.maxEntries = maxEntries;
}

@Override
protected boolean removeEldestEntry(
Map.Entry<String, PartitionedQueryExecutionInfo> eldest) {
return size() > maxEntries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -71,7 +70,7 @@ private static <T extends Resource> Mono<Pair<List<Range<String>>,QueryInfo>> ge
String resourceLink,
DocumentCollection collection,
DefaultDocumentQueryExecutionContext<T> queryExecutionContext, boolean queryPlanCachingEnabled,
ConcurrentMap<String, PartitionedQueryExecutionInfo> queryPlanCache) {
Map<String, PartitionedQueryExecutionInfo> queryPlanCache) {

// The partitionKeyRangeIdInternal is no more a public API on
// FeedOptions, but have the below condition
Expand Down Expand Up @@ -99,6 +98,7 @@ private static <T extends Resource> Mono<Pair<List<Range<String>>,QueryInfo>> ge
Instant endTime = Instant.now(); // endTime for query plan diagnostics
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = queryPlanCache.get(query.getQueryText());
if (partitionedQueryExecutionInfo != null) {
logger.info("Skipping query plan round trip by using the cached plan");
return getTargetRangesFromQueryPlan(cosmosQueryRequestOptions, collection, queryExecutionContext,
partitionedQueryExecutionInfo, startTime, endTime);
}
Expand All @@ -117,7 +117,7 @@ private static <T extends Resource> Mono<Pair<List<Range<String>>,QueryInfo>> ge

Instant endTime = Instant.now();

if (queryPlanCachingEnabled) {
if (queryPlanCachingEnabled && isScopedToSinglePartition(cosmosQueryRequestOptions)) {
mbhaskar marked this conversation as resolved.
Show resolved Hide resolved
tryCacheQueryPlan(query, partitionedQueryExecutionInfo, queryPlanCache);
}

Expand Down Expand Up @@ -169,14 +169,9 @@ private static <T extends Resource> Mono<Pair<List<Range<String>>, QueryInfo>> g
private static void tryCacheQueryPlan(
SqlQuerySpec query,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
ConcurrentMap<String, PartitionedQueryExecutionInfo> queryPlanCache) {
Map<String, PartitionedQueryExecutionInfo> queryPlanCache) {
QueryInfo queryInfo = partitionedQueryExecutionInfo.getQueryInfo();
if (canCacheQuery(queryInfo) && !queryPlanCache.containsKey(query.getQueryText())) {
if (queryPlanCache.size() > MAX_CACHE_SIZE) {
// Clearing query plan cache if size is above max size. This can be optimized in future by using
// a threadsafe LRU cache
queryPlanCache.clear();
}
queryPlanCache.put(query.getQueryText(), partitionedQueryExecutionInfo);
mbhaskar marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand All @@ -188,7 +183,9 @@ private static boolean canCacheQuery(QueryInfo queryInfo) {
&& !queryInfo.hasGroupBy()
&& !queryInfo.hasLimit()
&& !queryInfo.hasTop()
&& !queryInfo.hasOffset();
&& !queryInfo.hasOffset()
&& !queryInfo.hasDCount()
&& !queryInfo.hasOrderBy();
}

private static boolean isScopedToSinglePartition(CosmosQueryRequestOptions cosmosQueryRequestOptions) {
Expand All @@ -208,7 +205,7 @@ public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext
boolean isContinuationExpected,
UUID correlatedActivityId,
boolean queryPlanCachingEnabled,
ConcurrentMap<String, PartitionedQueryExecutionInfo> queryPlanCache) {
Map<String, PartitionedQueryExecutionInfo> queryPlanCache) {

// return proxy
Flux<Utils.ValueHolder<DocumentCollection>> collectionObs = Flux.just(new Utils.ValueHolder<>(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ public void queryOptionNullValidation() {
private Object[][] query() {
return new Object[][]{
new Object[] { "Select * from c "},
new Object[] { "select * from c order by c.prop ASC"},
};
}

Expand Down Expand Up @@ -299,7 +298,6 @@ public void queryPlanCacheSinglePartitionParameterizedQueriesCorrectness() {
createdContainer);
documentsInserted.addAll(pk2Docs);


CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
options.setPartitionKey(new PartitionKey(pk2));

Expand All @@ -325,6 +323,20 @@ public void queryPlanCacheSinglePartitionParameterizedQueriesCorrectness() {
// Top query should not be cached
assertThat(contextClient.getQueryPlanCache().containsKey(sqlQuerySpec.getQueryText())).isFalse();

// group by should not be cached
sqlQuerySpec.setQueryText("select max(c.id) from c order by c.name group by c.name");
values1 = queryAndGetResults(sqlQuerySpec, options, TestObject.class);
assertThat(contextClient.getQueryPlanCache().containsKey(sqlQuerySpec.getQueryText())).isFalse();

// distinct queries should not be cached
sqlQuerySpec.setQueryText("SELECT distinct c.name from c");
values1 = queryAndGetResults(sqlQuerySpec, options, TestObject.class);
assertThat(contextClient.getQueryPlanCache().containsKey(sqlQuerySpec.getQueryText())).isFalse();

//order by query should not be cached
sqlQuerySpec.setQueryText("select * from c order by c.name");
values1 = queryAndGetResults(sqlQuerySpec, options, TestObject.class);
assertThat(contextClient.getQueryPlanCache().containsKey(sqlQuerySpec.getQueryText())).isFalse();
}

@Test(groups = {"simple"}, timeOut = TIMEOUT * 40)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;

import static com.azure.spring.data.cosmos.common.TestConstants.ADDRESSES;
import static com.azure.spring.data.cosmos.common.TestConstants.FIRST_NAME;
Expand Down Expand Up @@ -144,7 +144,7 @@ public void testFindWithPartitionWithQueryPlanCachingEnabled() {

CosmosAsyncClient cosmosAsyncClient = cosmosFactory.getCosmosAsyncClient();
AsyncDocumentClient asyncDocumentClient = CosmosBridgeInternal.getAsyncDocumentClient(cosmosAsyncClient);
ConcurrentMap<String, PartitionedQueryExecutionInfo> initialCache = asyncDocumentClient.getQueryPlanCache();
Map<String, PartitionedQueryExecutionInfo> initialCache = asyncDocumentClient.getQueryPlanCache();
assertThat(initialCache.containsKey(sqlQuerySpec.getQueryText())).isTrue();
int initialSize = initialCache.size();

Expand All @@ -158,7 +158,7 @@ public void testFindWithPartitionWithQueryPlanCachingEnabled() {
result = TestUtils.toList(cosmosTemplate.find(query, PartitionPerson.class,
PartitionPerson.class.getSimpleName()));

ConcurrentMap<String, PartitionedQueryExecutionInfo> postQueryCallCache = asyncDocumentClient.getQueryPlanCache();
Map<String, PartitionedQueryExecutionInfo> postQueryCallCache = asyncDocumentClient.getQueryPlanCache();
assertThat(postQueryCallCache.containsKey(sqlQuerySpec.getQueryText())).isTrue();
assertThat(postQueryCallCache.size()).isEqualTo(initialSize);
assertThat(result.size()).isEqualTo(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import reactor.test.StepVerifier;

import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -126,7 +126,7 @@ public void testFindWithPartitionWithQueryPlanCachingEnabled() {

CosmosAsyncClient cosmosAsyncClient = cosmosFactory.getCosmosAsyncClient();
AsyncDocumentClient asyncDocumentClient = CosmosBridgeInternal.getAsyncDocumentClient(cosmosAsyncClient);
ConcurrentMap<String, PartitionedQueryExecutionInfo> initialCache = asyncDocumentClient.getQueryPlanCache();
Map<String, PartitionedQueryExecutionInfo> initialCache = asyncDocumentClient.getQueryPlanCache();
assertThat(initialCache.containsKey(sqlQuerySpec.getQueryText())).isTrue();
int initialSize = initialCache.size();

Expand All @@ -145,7 +145,7 @@ public void testFindWithPartitionWithQueryPlanCachingEnabled() {
Assert.assertThat(actual.getZipCode(), is(equalTo(TEST_PERSON_2.getZipCode())));
}).verifyComplete();

ConcurrentMap<String, PartitionedQueryExecutionInfo> postQueryCallCache = asyncDocumentClient.getQueryPlanCache();
Map<String, PartitionedQueryExecutionInfo> postQueryCallCache = asyncDocumentClient.getQueryPlanCache();
assertThat(postQueryCallCache.containsKey(sqlQuerySpec.getQueryText())).isTrue();
assertThat(postQueryCallCache.size()).isEqualTo(initialSize);
}
Expand Down