diff --git a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml index e5d369d3c111a..c9cf735bd6497 100755 --- a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml +++ b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml @@ -1489,6 +1489,13 @@ + + + + + + + diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/QueryMetrics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/QueryMetrics.java index ffda700ed1828..e8a168058d345 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/QueryMetrics.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/QueryMetrics.java @@ -13,6 +13,8 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; /** * Query metrics in the Azure Cosmos database service. @@ -188,6 +190,21 @@ public QueryMetrics add(QueryMetrics... queryMetricsArgs) { return QueryMetrics.createFromCollection(queryMetricsList); } + /** + * Utility method to merge two query metrics map. + * @param base metrics map which will be updated with new values. + * @param addOn metrics map whose values will be merge in base map. + */ + public static void mergeQueryMetricsMap(ConcurrentMap base, ConcurrentMap addOn) { + for (ConcurrentMap.Entry entry : addOn.entrySet()) { + if (base.containsKey(entry.getKey())) { + base.get(entry.getKey()).add(entry.getValue()); + } else { + base.put(entry.getKey(), entry.getValue()); + } + } + } + private String toTextString(int indentLevel) { StringBuilder stringBuilder = new StringBuilder(); QueryMetricsTextWriter queryMetricsTextWriter = new QueryMetricsTextWriter(stringBuilder); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/GroupByDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/GroupByDocumentQueryExecutionContext.java index be5074f4263f7..0f03e54d5d676 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/GroupByDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/GroupByDocumentQueryExecutionContext.java @@ -7,10 +7,11 @@ import com.azure.cosmos.implementation.BadRequestException; import com.azure.cosmos.implementation.Document; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.JsonSerializable; +import com.azure.cosmos.implementation.QueryMetrics; import com.azure.cosmos.implementation.Resource; import com.azure.cosmos.implementation.query.aggregation.AggregateOperator; import com.azure.cosmos.models.FeedResponse; -import com.azure.cosmos.implementation.JsonSerializable; import com.azure.cosmos.models.ModelBridgeInternal; import com.fasterxml.jackson.databind.node.ObjectNode; import reactor.core.publisher.Flux; @@ -20,6 +21,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.function.BiFunction; public final class GroupByDocumentQueryExecutionContext implements @@ -76,39 +79,50 @@ public Flux> drainAsync(int maxPageSize) { /* Do groupBy stuff here */ // Stage 1: // Drain the groupings fully from all continuation and all partitions + ConcurrentMap queryMetrics = new ConcurrentHashMap<>(); for (FeedResponse page : superList) { List results = (List) page.getResults(); documentList.addAll(results); requestCharge += page.getRequestCharge(); + QueryMetrics.mergeQueryMetricsMap(queryMetrics, BridgeInternal.queryMetricsFromFeedResponse(page)); } this.aggregateGroupings(documentList); // Stage 2: // Emit the results from the grouping table page by page - return createFeedResponseFromGroupingTable(maxPageSize, requestCharge); + List groupByResults = null; + if (this.groupingTable != null) { + groupByResults = this.groupingTable.drain(maxPageSize); + } + + return createFeedResponseFromGroupingTable(maxPageSize, requestCharge, queryMetrics, groupByResults); }).expand(tFeedResponse -> { // For groupBy query, we have already drained everything for the first page request // so for following requests, we will just need to drain page by page from the grouping table - FeedResponse response = createFeedResponseFromGroupingTable(maxPageSize, 0); - if (response == null) { + List groupByResults = null; + if (this.groupingTable != null) { + groupByResults = this.groupingTable.drain(maxPageSize); + } + + if (groupByResults == null || groupByResults.size() == 0) { return Mono.empty(); } + + FeedResponse response = createFeedResponseFromGroupingTable(maxPageSize, 0 , new ConcurrentHashMap<>(), groupByResults); return Mono.just(response); }); } @SuppressWarnings("unchecked") // safe to upcast - private FeedResponse createFeedResponseFromGroupingTable(int pageSize, double requestCharge) { + private FeedResponse createFeedResponseFromGroupingTable(int pageSize, + double requestCharge, + ConcurrentMap queryMetrics, + List groupByResults) { if (this.groupingTable != null) { - List groupByResults = groupingTable.drain(pageSize); - if (groupByResults.size() == 0) { - return null; - } - HashMap headers = new HashMap<>(); headers.put(HttpConstants.HttpHeaders.REQUEST_CHARGE, Double.toString(requestCharge)); - FeedResponse frp = BridgeInternal.createFeedResponse(groupByResults, headers); + FeedResponse frp = BridgeInternal.createFeedResponseWithQueryMetrics(groupByResults, headers, queryMetrics, null); return (FeedResponse) frp; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java index db9d90d718fea..f4ebf347e859a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/OrderByDocumentQueryExecutionContext.java @@ -496,8 +496,8 @@ public Flux> apply(Flux> source) { ModelBridgeInternal.getQueryPlanDiagnosticsContext(feedOfOrderByRowResults)); }).switchIfEmpty(Flux.defer(() -> { // create an empty page if there is no result - return Flux.just(BridgeInternal.createFeedResponse(Utils.immutableListOf(), - headerResponse(tracker.getAndResetCharge()))); + return Flux.just(BridgeInternal.createFeedResponseWithQueryMetrics(Utils.immutableListOf(), + headerResponse(tracker.getAndResetCharge()), queryMetricMap, null)); })); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java index 2114548945a93..53670467bc1c6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ParallelDocumentQueryExecutionContext.java @@ -8,6 +8,7 @@ import com.azure.cosmos.implementation.DocumentClientRetryPolicy; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.PartitionKeyRange; +import com.azure.cosmos.implementation.QueryMetrics; import com.azure.cosmos.implementation.RequestChargeTracker; import com.azure.cosmos.implementation.Resource; import com.azure.cosmos.implementation.ResourceType; @@ -30,6 +31,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.function.Function; import java.util.stream.Collectors; /** @@ -198,6 +201,7 @@ private static class EmptyPagesFilterTransformer private final RequestChargeTracker tracker; private DocumentProducer.DocumentProducerFeedResponse previousPage; private final CosmosQueryRequestOptions cosmosQueryRequestOptions; + private ConcurrentMap emptyPageQueryMetricsMap = new ConcurrentHashMap<>(); public EmptyPagesFilterTransformer(RequestChargeTracker tracker, CosmosQueryRequestOptions options) { @@ -258,10 +262,21 @@ public Flux> apply(Flux.DocumentProducerFeed && !ModelBridgeInternal.getEmptyPagesAllowedFromQueryRequestOptions(this.cosmosQueryRequestOptions)) { // filter empty pages and accumulate charge tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge()); + ConcurrentMap currentQueryMetrics = + BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult); + QueryMetrics.mergeQueryMetricsMap(emptyPageQueryMetricsMap, currentQueryMetrics); return false; } return true; }).map(documentProducerFeedResponse -> { + //Combining previous empty page query metrics with current non empty page query metrics + if (!emptyPageQueryMetricsMap.isEmpty()) { + ConcurrentMap currentQueryMetrics = + BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult); + QueryMetrics.mergeQueryMetricsMap(currentQueryMetrics, emptyPageQueryMetricsMap); + emptyPageQueryMetricsMap.clear(); + } + // Add the request charge double charge = tracker.getAndResetCharge(); if (charge > 0) { @@ -319,8 +334,8 @@ public Flux> apply(Flux.DocumentProducerFeed return documentProducerFeedResponse.pageResult; }).switchIfEmpty(Flux.defer(() -> { // create an empty page if there is no result - return Flux.just(BridgeInternal.createFeedResponse(Utils.immutableListOf(), - headerResponse(tracker.getAndResetCharge()))); + return Flux.just(BridgeInternal.createFeedResponseWithQueryMetrics(Utils.immutableListOf(), + headerResponse(tracker.getAndResetCharge()), emptyPageQueryMetricsMap, null)); })); } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java index f0ef952aacc03..49012196b5e40 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.lang.reflect.Field; @@ -77,7 +78,18 @@ public void afterClass() { } } - @Test(groups = {"simple"}) + @DataProvider(name = "query") + private Object[][] query() { + return new Object[][]{ + {"Select * from c where c.id = 'wrongId'"}, + {"Select top 1 * from c where c.id = 'wrongId'"}, + {"Select * from c where c.id = 'wrongId' order by c.id"}, + {"Select count(1) from c where c.id = 'wrongId' group by c.pk"}, + {"Select distinct c.pk from c where c.id = 'wrongId'"}, + }; + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) public void gatewayDiagnostics() { CosmosClient testGatewayClient = null; try { @@ -108,7 +120,7 @@ public void gatewayDiagnostics() { } } - @Test(groups = {"simple"}) + @Test(groups = {"simple"}, timeOut = TIMEOUT) public void gatewayDiagnosticsOnException() { InternalObjectNode internalObjectNode = getInternalObjectNode(); CosmosItemResponse createResponse = null; @@ -135,7 +147,7 @@ public void gatewayDiagnosticsOnException() { } } - @Test(groups = {"simple"}) + @Test(groups = {"simple"}, timeOut = TIMEOUT) public void systemDiagnosticsForSystemStateInformation() { InternalObjectNode internalObjectNode = getInternalObjectNode(); CosmosItemResponse createResponse = this.container.createItem(internalObjectNode); @@ -149,7 +161,7 @@ public void systemDiagnosticsForSystemStateInformation() { assertThat(createResponse.getDiagnostics().getDuration()).isNotNull(); } - @Test(groups = {"simple"}) + @Test(groups = {"simple"}, timeOut = TIMEOUT) public void directDiagnostics() { CosmosClient testDirectClient = null; try { @@ -183,7 +195,7 @@ public void directDiagnostics() { } } - @Test(groups = {"simple"}) + @Test(groups = {"simple"}, timeOut = TIMEOUT) public void queryPlanDiagnostics() { CosmosContainer cosmosContainer = directClient.getDatabase(cosmosAsyncContainer.getDatabase().getId()).getContainer(cosmosAsyncContainer.getId()); List itemIdList = new ArrayList<>(); @@ -232,7 +244,32 @@ public void queryPlanDiagnostics() { } } - @Test(groups = {"simple"}) + @Test(groups = {"simple"}, dataProvider = "query", timeOut = TIMEOUT) + public void queryMetrics(String query) { + CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); + options.setQueryMetricsEnabled(true); + boolean qroupByFirstResponse = true; // TODO https://github.com/Azure/azure-sdk-for-java/issues/14142 + Iterator> iterator = this.container.queryItems(query, options, + InternalObjectNode.class).iterableByPage().iterator(); + assertThat(iterator.hasNext()).isTrue(); + while (iterator.hasNext()) { + FeedResponse feedResponse = iterator.next(); + String queryDiagnostics = feedResponse.getCosmosDiagnostics().toString(); + assertThat(feedResponse.getResults().size()).isEqualTo(0); + if (!query.contains("group by") || qroupByFirstResponse) { // TODO https://github + // .com/Azure/azure-sdk-for-java/issues/14142 + assertThat(queryDiagnostics).contains("Retrieved Document Count"); + assertThat(queryDiagnostics).contains("Query Preparation Times"); + assertThat(queryDiagnostics).contains("Runtime Execution Times"); + assertThat(queryDiagnostics).contains("Partition Execution Timeline"); + if (query.contains("group by")) { + qroupByFirstResponse = false; + } + } + } + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) public void directDiagnosticsOnException() { CosmosContainer cosmosContainer = directClient.getDatabase(cosmosAsyncContainer.getDatabase().getId()).getContainer(cosmosAsyncContainer.getId()); InternalObjectNode internalObjectNode = getInternalObjectNode(); @@ -270,7 +307,7 @@ public void directDiagnosticsOnException() { } } - @Test(groups = {"simple"}) + @Test(groups = {"simple"}, timeOut = TIMEOUT) public void supplementalResponseStatisticsList() throws Exception { ClientSideRequestStatistics clientSideRequestStatistics = new ClientSideRequestStatistics(); for (int i = 0; i < 15; i++) { @@ -320,7 +357,7 @@ public void supplementalResponseStatisticsList() throws Exception { } } - @Test(groups = {"simple"}) + @Test(groups = {"simple"}, timeOut = TIMEOUT) public void serializationOnVariousScenarios() { //checking database serialization CosmosDatabaseResponse cosmosDatabase = gatewayClient.getDatabase(cosmosAsyncContainer.getDatabase().getId()).read();