Skip to content

Commit

Permalink
Adding support for dcount queries. (#23725)
Browse files Browse the repository at this point in the history
* Adding support for dcount queries.

* cleanup
  • Loading branch information
mbhaskar authored Aug 31, 2021
1 parent 218cc1d commit 6dabf5b
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.Constants;
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;

/**
* Execution component that is able to aggregate COUNT(DISTINCT) from multiple continuations and partitions.
*
* @param <T> Resource generic type
*/
public class DCountDocumentQueryExecutionContext<T extends Resource> implements IDocumentQueryExecutionComponent<T> {
private final IDocumentQueryExecutionComponent<T> component;
private final QueryInfo info;
private long count;
private ConcurrentMap<String, QueryMetrics> queryMetricsMap = new ConcurrentHashMap<>();

private DCountDocumentQueryExecutionContext(
IDocumentQueryExecutionComponent<T> component,
QueryInfo info,
long count) {

if (component == null) {
throw new IllegalArgumentException("documentQueryExecutionComponent cannot be null");
}

this.component = component;
this.count = count;
this.info = info;
}

public static <T extends Resource> Flux<IDocumentQueryExecutionComponent<T>> createAsync(
BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createSourceComponentFunction,
QueryInfo info,
String continuationToken,
PipelinedDocumentQueryParams<T> documentQueryParams) {

return createSourceComponentFunction
.apply(continuationToken, documentQueryParams)
.map(component -> new DCountDocumentQueryExecutionContext<T>(component, info, 0 /*default count*/));
}

IDocumentQueryExecutionComponent<T> getComponent() {
return this.component;
}

@SuppressWarnings("unchecked")
@Override
public Flux<FeedResponse<T>> drainAsync(int maxPageSize) {
return this.component.drainAsync(maxPageSize)
.collectList()
.map(superList -> {
double requestCharge = 0;
Map<String, String> headers = new HashMap<>();
List<ClientSideRequestStatistics> diagnosticsList = new ArrayList<>();

for (FeedResponse<T> page : superList) {
diagnosticsList.addAll(BridgeInternal
.getClientSideRequestStatisticsList(page
.getCosmosDiagnostics()));
count += page.getResults().size();
requestCharge += page.getRequestCharge();
QueryMetrics.mergeQueryMetricsMap(queryMetricsMap,
BridgeInternal.queryMetricsFromFeedResponse(page));
}

Document result = new Document();
if (Strings.isNullOrEmpty(info.getDCountAlias())) {
if (info.hasSelectValue()) {
result.set(Constants.Properties.VALUE, count);
} else {
// Setting $1 as the key to be consistent with service results
result.set("$1", count);
}
} else {
result.set(info.getDCountAlias(), count);
}
headers.put(HttpConstants.HttpHeaders.REQUEST_CHARGE, Double.toString(requestCharge));
FeedResponse<Document> frp =
BridgeInternal.createFeedResponseWithQueryMetrics(Collections.singletonList(result), headers,
queryMetricsMap, null, false,
false, null);

BridgeInternal.addClientSideDiagnosticsToFeed(frp.getCosmosDiagnostics(), diagnosticsList);
return (FeedResponse<T>) BridgeInternal
.createFeedResponseWithQueryMetrics(Collections
.singletonList(result),
headers,
BridgeInternal
.queryMetricsFromFeedResponse(frp),
ModelBridgeInternal
.getQueryPlanDiagnosticsContext(frp),
false,
false,
frp.getCosmosDiagnostics());
})
.flux();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.query;

import com.fasterxml.jackson.annotation.JsonProperty;

public class DCountInfo {
@JsonProperty("dCountAlias")
private String dCountAlias;

public String getDCountAlias() {
return dCountAlias;
}

public void setDCountAlias(String dCountAlias) {
this.dCountAlias = dCountAlias;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ public static <T extends Resource> Flux<PipelinedDocumentQueryExecutionContext<T
createTakeComponentFunction = createTopComponentFunction;
}

BiFunction<String, PipelinedDocumentQueryParams<T>, Flux<IDocumentQueryExecutionComponent<T>>> createDCountComponentFunction;
if (queryInfo.hasDCount()) {
createDCountComponentFunction = (continuationToken, documentQueryParams) -> {
return DCountDocumentQueryExecutionContext.createAsync(createTakeComponentFunction,
queryInfo,
continuationToken,
documentQueryParams);
};
} else {
createDCountComponentFunction = createTakeComponentFunction;
}

int actualPageSize = Utils.getValueOrDefault(ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(cosmosQueryRequestOptions),
ParallelQueryConfig.ClientInternalPageSize);

Expand All @@ -167,7 +179,7 @@ public static <T extends Resource> Flux<PipelinedDocumentQueryExecutionContext<T
}

int pageSize = Math.min(actualPageSize, Utils.getValueOrDefault(queryInfo.getTop(), (actualPageSize)));
return createTakeComponentFunction.apply(ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(cosmosQueryRequestOptions), initParams)
return createDCountComponentFunction.apply(ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(cosmosQueryRequestOptions), initParams)
.map(c -> new PipelinedDocumentQueryExecutionContext<>(c, pageSize, correlatedActivityId, queryInfo));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ public enum QueryFeature {
OffsetAndLimit,
OrderBy,
Top,
NonValueAggregate
NonValueAggregate,
DCount
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.Strings;

import java.time.Instant;
import java.util.Collection;
Expand All @@ -32,6 +33,7 @@ public final class QueryInfo extends JsonSerializable {
private Integer limit;
private DistinctQueryType distinctQueryType;
private QueryPlanDiagnosticsContext queryPlanDiagnosticsContext;
private DCountInfo dCountInfo;

public QueryInfo() {
}
Expand Down Expand Up @@ -166,6 +168,23 @@ public List<String> getGroupByAliases() {
return super.getList("groupByAliases", String.class);
}

public boolean hasDCount() {
return this.getDCountInfo() != null;
}

public DCountInfo getDCountInfo() {
return this.dCountInfo != null ?
this.dCountInfo : (this.dCountInfo = super.getObject("dCountInfo", DCountInfo.class));
}

public String getDCountAlias() {
return this.dCountInfo.getDCountAlias();
}

public boolean isValueAggregate() {
return Strings.isNullOrEmpty(this.getDCountAlias());
}

public QueryPlanDiagnosticsContext getQueryPlanDiagnosticsContext() {
return queryPlanDiagnosticsContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class QueryPlanRetriever {
QueryFeature.Distinct.name() + ", " +
QueryFeature.GroupBy.name() + ", " +
QueryFeature.Top.name() + ", " +
QueryFeature.DCount.name() + ", " +
QueryFeature.NonValueAggregate.name();

static Mono<PartitionedQueryExecutionInfo> getQueryPlanThroughGatewayAsync(DiagnosticsClientContext diagnosticsClientContext,
Expand Down
Loading

0 comments on commit 6dabf5b

Please sign in to comment.