Skip to content

Commit

Permalink
Fixing query metric issue for zero item response (#14143)
Browse files Browse the repository at this point in the history
* Fixing query metrics for emptry page result

* Fixing query metrics for group by and order by

* spot bug fix and merge with master

* fixing spot bugs

* adding data provider for different query
  • Loading branch information
simplynaveen20 authored Aug 19, 2020
1 parent 33f92f3 commit f416970
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1489,6 +1489,13 @@
<Bug pattern="RV_RETURN_VALUE_IGNORED_INFERRED"/>
</Match>

<!-- Bug: https://github.com/Azure/azure-sdk-for-java/issues/9092 -->
<Match>
<Class name="com.azure.cosmos.implementation.QueryMetrics"/>
<Method name="mergeQueryMetricsMap"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED_INFERRED"/>
</Match>

<!-- Bug: https://github.com/Azure/azure-sdk-for-java/issues/9093 -->
<Match>
<Class name="com.azure.cosmos.implementation.PathsHelper"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

/**
* Query metrics in the Azure Cosmos database service.
Expand Down Expand Up @@ -188,6 +190,21 @@ public QueryMetrics add(QueryMetrics... queryMetricsArgs) {
return QueryMetrics.createFromCollection(queryMetricsList);
}

/**
* Utility method to merge two query metrics map.
* @param base metrics map which will be updated with new values.
* @param addOn metrics map whose values will be merge in base map.
*/
public static void mergeQueryMetricsMap(ConcurrentMap<String, QueryMetrics> base, ConcurrentMap<String, QueryMetrics> addOn) {
for (ConcurrentMap.Entry<String, QueryMetrics> entry : addOn.entrySet()) {
if (base.containsKey(entry.getKey())) {
base.get(entry.getKey()).add(entry.getValue());
} else {
base.put(entry.getKey(), entry.getValue());
}
}
}

private String toTextString(int indentLevel) {
StringBuilder stringBuilder = new StringBuilder();
QueryMetricsTextWriter queryMetricsTextWriter = new QueryMetricsTextWriter(stringBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
import com.azure.cosmos.implementation.BadRequestException;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.JsonSerializable;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.query.aggregation.AggregateOperator;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.implementation.JsonSerializable;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.databind.node.ObjectNode;
import reactor.core.publisher.Flux;
Expand All @@ -20,6 +21,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;

public final class GroupByDocumentQueryExecutionContext<T extends Resource> implements
Expand Down Expand Up @@ -76,39 +79,50 @@ public Flux<FeedResponse<T>> drainAsync(int maxPageSize) {
/* Do groupBy stuff here */
// Stage 1:
// Drain the groupings fully from all continuation and all partitions
ConcurrentMap<String, QueryMetrics> queryMetrics = new ConcurrentHashMap<>();
for (FeedResponse<T> page : superList) {
List<Document> results = (List<Document>) page.getResults();
documentList.addAll(results);
requestCharge += page.getRequestCharge();
QueryMetrics.mergeQueryMetricsMap(queryMetrics, BridgeInternal.queryMetricsFromFeedResponse(page));
}

this.aggregateGroupings(documentList);

// Stage 2:
// Emit the results from the grouping table page by page
return createFeedResponseFromGroupingTable(maxPageSize, requestCharge);
List<Document> groupByResults = null;
if (this.groupingTable != null) {
groupByResults = this.groupingTable.drain(maxPageSize);
}

return createFeedResponseFromGroupingTable(maxPageSize, requestCharge, queryMetrics, groupByResults);
}).expand(tFeedResponse -> {
// For groupBy query, we have already drained everything for the first page request
// so for following requests, we will just need to drain page by page from the grouping table
FeedResponse<T> response = createFeedResponseFromGroupingTable(maxPageSize, 0);
if (response == null) {
List<Document> groupByResults = null;
if (this.groupingTable != null) {
groupByResults = this.groupingTable.drain(maxPageSize);
}

if (groupByResults == null || groupByResults.size() == 0) {
return Mono.empty();
}

FeedResponse<T> response = createFeedResponseFromGroupingTable(maxPageSize, 0 , new ConcurrentHashMap<>(), groupByResults);
return Mono.just(response);
});
}

@SuppressWarnings("unchecked") // safe to upcast
private FeedResponse<T> createFeedResponseFromGroupingTable(int pageSize, double requestCharge) {
private FeedResponse<T> createFeedResponseFromGroupingTable(int pageSize,
double requestCharge,
ConcurrentMap<String, QueryMetrics> queryMetrics,
List<Document> groupByResults) {
if (this.groupingTable != null) {
List<Document> groupByResults = groupingTable.drain(pageSize);
if (groupByResults.size() == 0) {
return null;
}

HashMap<String, String> headers = new HashMap<>();
headers.put(HttpConstants.HttpHeaders.REQUEST_CHARGE, Double.toString(requestCharge));
FeedResponse<Document> frp = BridgeInternal.createFeedResponse(groupByResults, headers);
FeedResponse<Document> frp = BridgeInternal.createFeedResponseWithQueryMetrics(groupByResults, headers, queryMetrics, null);
return (FeedResponse<T>) frp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,8 @@ public Flux<FeedResponse<T>> apply(Flux<OrderByRowResult<T>> source) {
ModelBridgeInternal.getQueryPlanDiagnosticsContext(feedOfOrderByRowResults));
}).switchIfEmpty(Flux.defer(() -> {
// create an empty page if there is no result
return Flux.just(BridgeInternal.createFeedResponse(Utils.immutableListOf(),
headerResponse(tracker.getAndResetCharge())));
return Flux.just(BridgeInternal.createFeedResponseWithQueryMetrics(Utils.immutableListOf(),
headerResponse(tracker.getAndResetCharge()), queryMetricMap, null));
}));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.RequestChargeTracker;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.ResourceType;
Expand All @@ -30,6 +31,8 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
Expand Down Expand Up @@ -198,6 +201,7 @@ private static class EmptyPagesFilterTransformer<T extends Resource>
private final RequestChargeTracker tracker;
private DocumentProducer<T>.DocumentProducerFeedResponse previousPage;
private final CosmosQueryRequestOptions cosmosQueryRequestOptions;
private ConcurrentMap<String, QueryMetrics> emptyPageQueryMetricsMap = new ConcurrentHashMap<>();

public EmptyPagesFilterTransformer(RequestChargeTracker tracker, CosmosQueryRequestOptions options) {

Expand Down Expand Up @@ -258,10 +262,21 @@ public Flux<FeedResponse<T>> apply(Flux<DocumentProducer<T>.DocumentProducerFeed
&& !ModelBridgeInternal.getEmptyPagesAllowedFromQueryRequestOptions(this.cosmosQueryRequestOptions)) {
// filter empty pages and accumulate charge
tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge());
ConcurrentMap<String, QueryMetrics> currentQueryMetrics =
BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult);
QueryMetrics.mergeQueryMetricsMap(emptyPageQueryMetricsMap, currentQueryMetrics);
return false;
}
return true;
}).map(documentProducerFeedResponse -> {
//Combining previous empty page query metrics with current non empty page query metrics
if (!emptyPageQueryMetricsMap.isEmpty()) {
ConcurrentMap<String, QueryMetrics> currentQueryMetrics =
BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult);
QueryMetrics.mergeQueryMetricsMap(currentQueryMetrics, emptyPageQueryMetricsMap);
emptyPageQueryMetricsMap.clear();
}

// Add the request charge
double charge = tracker.getAndResetCharge();
if (charge > 0) {
Expand Down Expand Up @@ -319,8 +334,8 @@ public Flux<FeedResponse<T>> apply(Flux<DocumentProducer<T>.DocumentProducerFeed
return documentProducerFeedResponse.pageResult;
}).switchIfEmpty(Flux.defer(() -> {
// create an empty page if there is no result
return Flux.just(BridgeInternal.createFeedResponse(Utils.immutableListOf(),
headerResponse(tracker.getAndResetCharge())));
return Flux.just(BridgeInternal.createFeedResponseWithQueryMetrics(Utils.immutableListOf(),
headerResponse(tracker.getAndResetCharge()), emptyPageQueryMetricsMap, null));
}));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.lang.reflect.Field;
Expand Down Expand Up @@ -77,7 +78,18 @@ public void afterClass() {
}
}

@Test(groups = {"simple"})
@DataProvider(name = "query")
private Object[][] query() {
return new Object[][]{
{"Select * from c where c.id = 'wrongId'"},
{"Select top 1 * from c where c.id = 'wrongId'"},
{"Select * from c where c.id = 'wrongId' order by c.id"},
{"Select count(1) from c where c.id = 'wrongId' group by c.pk"},
{"Select distinct c.pk from c where c.id = 'wrongId'"},
};
}

@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void gatewayDiagnostics() {
CosmosClient testGatewayClient = null;
try {
Expand Down Expand Up @@ -108,7 +120,7 @@ public void gatewayDiagnostics() {
}
}

@Test(groups = {"simple"})
@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void gatewayDiagnosticsOnException() {
InternalObjectNode internalObjectNode = getInternalObjectNode();
CosmosItemResponse<InternalObjectNode> createResponse = null;
Expand All @@ -135,7 +147,7 @@ public void gatewayDiagnosticsOnException() {
}
}

@Test(groups = {"simple"})
@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void systemDiagnosticsForSystemStateInformation() {
InternalObjectNode internalObjectNode = getInternalObjectNode();
CosmosItemResponse<InternalObjectNode> createResponse = this.container.createItem(internalObjectNode);
Expand All @@ -149,7 +161,7 @@ public void systemDiagnosticsForSystemStateInformation() {
assertThat(createResponse.getDiagnostics().getDuration()).isNotNull();
}

@Test(groups = {"simple"})
@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void directDiagnostics() {
CosmosClient testDirectClient = null;
try {
Expand Down Expand Up @@ -183,7 +195,7 @@ public void directDiagnostics() {
}
}

@Test(groups = {"simple"})
@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void queryPlanDiagnostics() {
CosmosContainer cosmosContainer = directClient.getDatabase(cosmosAsyncContainer.getDatabase().getId()).getContainer(cosmosAsyncContainer.getId());
List<String> itemIdList = new ArrayList<>();
Expand Down Expand Up @@ -232,7 +244,32 @@ public void queryPlanDiagnostics() {
}
}

@Test(groups = {"simple"})
@Test(groups = {"simple"}, dataProvider = "query", timeOut = TIMEOUT)
public void queryMetrics(String query) {
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
options.setQueryMetricsEnabled(true);
boolean qroupByFirstResponse = true; // TODO https://github.com/Azure/azure-sdk-for-java/issues/14142
Iterator<FeedResponse<InternalObjectNode>> iterator = this.container.queryItems(query, options,
InternalObjectNode.class).iterableByPage().iterator();
assertThat(iterator.hasNext()).isTrue();
while (iterator.hasNext()) {
FeedResponse<InternalObjectNode> feedResponse = iterator.next();
String queryDiagnostics = feedResponse.getCosmosDiagnostics().toString();
assertThat(feedResponse.getResults().size()).isEqualTo(0);
if (!query.contains("group by") || qroupByFirstResponse) { // TODO https://github
// .com/Azure/azure-sdk-for-java/issues/14142
assertThat(queryDiagnostics).contains("Retrieved Document Count");
assertThat(queryDiagnostics).contains("Query Preparation Times");
assertThat(queryDiagnostics).contains("Runtime Execution Times");
assertThat(queryDiagnostics).contains("Partition Execution Timeline");
if (query.contains("group by")) {
qroupByFirstResponse = false;
}
}
}
}

@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void directDiagnosticsOnException() {
CosmosContainer cosmosContainer = directClient.getDatabase(cosmosAsyncContainer.getDatabase().getId()).getContainer(cosmosAsyncContainer.getId());
InternalObjectNode internalObjectNode = getInternalObjectNode();
Expand Down Expand Up @@ -270,7 +307,7 @@ public void directDiagnosticsOnException() {
}
}

@Test(groups = {"simple"})
@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void supplementalResponseStatisticsList() throws Exception {
ClientSideRequestStatistics clientSideRequestStatistics = new ClientSideRequestStatistics();
for (int i = 0; i < 15; i++) {
Expand Down Expand Up @@ -320,7 +357,7 @@ public void supplementalResponseStatisticsList() throws Exception {
}
}

@Test(groups = {"simple"})
@Test(groups = {"simple"}, timeOut = TIMEOUT)
public void serializationOnVariousScenarios() {
//checking database serialization
CosmosDatabaseResponse cosmosDatabase = gatewayClient.getDatabase(cosmosAsyncContainer.getDatabase().getId()).read();
Expand Down

0 comments on commit f416970

Please sign in to comment.