Skip to content

Commit

Permalink
AddSuportForLoggingEmptyPageDiagnostics- shortTermFix (#26869)
Browse files Browse the repository at this point in the history
* logEmptyPageDiagnostics

Co-authored-by: annie-mac <[email protected]>
Co-authored-by: annie-mac <[email protected]>
  • Loading branch information
3 people authored Feb 8, 2022
1 parent 071d6b3 commit 76d1704
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public class Configs {
private static final String OPEN_CONNECTIONS_RETRIES_COUNT_NAME = "COSMOS.OPEN_CONNECTIONS_RETRIES_COUNT";
private static final int DEFAULT_OPEN_CONNECTIONS_RETRIES_COUNT = 1;

// whether to allow query empty page diagnostics logging
private static final String QUERY_EMPTY_PAGE_DIAGNOSTICS_ENABLED = "COSMOS.QUERY_EMPTY_PAGE_DIAGNOSTICS_ENABLED";
private static final boolean DEFAULT_QUERY_EMPTY_PAGE_DIAGNOSTICS_ENABLED = false;

public Configs() {
this.sslContext = sslContextInit();
}
Expand Down Expand Up @@ -272,6 +276,12 @@ public static int getOpenConnectionsRetriesCount() {
DEFAULT_OPEN_CONNECTIONS_RETRIES_COUNT);
}

public static boolean isEmptyPageDiagnosticsEnabled() {
return getJVMConfigAsBoolean(
QUERY_EMPTY_PAGE_DIAGNOSTICS_ENABLED,
DEFAULT_QUERY_EMPTY_PAGE_DIAGNOSTICS_ENABLED);
}

private static int getJVMConfigAsInt(String propName, int defaultValue) {
String propValue = System.getProperty(propName);
return getIntValue(propValue, defaultValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ public interface CosmosQueryRequestOptionsAccessor {
Map<String, String> getHeader(CosmosQueryRequestOptions queryRequestOptions);
boolean isQueryPlanRetrievalDisallowed(CosmosQueryRequestOptions queryRequestOptions);
CosmosQueryRequestOptions disallowQueryPlanRetrieval(CosmosQueryRequestOptions queryRequestOptions);
boolean isEmptyPageDiagnosticsEnabled(CosmosQueryRequestOptions queryRequestOptions);
CosmosQueryRequestOptions setEmptyPageDiagnosticsEnabled(CosmosQueryRequestOptions queryRequestOptions, boolean emptyPageDiagnosticsEnabled);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.RequestChargeTracker;
Expand All @@ -23,6 +25,9 @@
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.SqlQuerySpec;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.concurrent.Queues;
Expand All @@ -44,6 +49,8 @@
*/
public class ParallelDocumentQueryExecutionContext<T extends Resource>
extends ParallelDocumentQueryExecutionContextBase<T> {
private static final Logger logger = LoggerFactory.getLogger(ParallelDocumentQueryExecutionContext.class);

private final CosmosQueryRequestOptions cosmosQueryRequestOptions;
private final Map<FeedRangeEpkImpl, String> partitionKeyRangeToContinuationTokenMap;

Expand Down Expand Up @@ -213,10 +220,11 @@ private static class EmptyPagesFilterTransformer<T extends Resource>
private final RequestChargeTracker tracker;
private DocumentProducer<T>.DocumentProducerFeedResponse previousPage;
private final CosmosQueryRequestOptions cosmosQueryRequestOptions;
private final UUID correlatedActivityId;
private ConcurrentMap<String, QueryMetrics> emptyPageQueryMetricsMap = new ConcurrentHashMap<>();
private CosmosDiagnostics cosmosDiagnostics;

public EmptyPagesFilterTransformer(RequestChargeTracker tracker, CosmosQueryRequestOptions options) {
public EmptyPagesFilterTransformer(RequestChargeTracker tracker, CosmosQueryRequestOptions options, UUID correlatedActivityId) {

if (tracker == null) {
throw new IllegalArgumentException("Request Charge Tracker must not be null.");
Expand All @@ -225,6 +233,7 @@ public EmptyPagesFilterTransformer(RequestChargeTracker tracker, CosmosQueryRequ
this.tracker = tracker;
this.previousPage = null;
this.cosmosQueryRequestOptions = options;
this.correlatedActivityId = correlatedActivityId;
}

private DocumentProducer<T>.DocumentProducerFeedResponse plusCharge(
Expand Down Expand Up @@ -286,6 +295,18 @@ public Flux<FeedResponse<T>> apply(Flux<DocumentProducer<T>.DocumentProducerFeed
BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult);
QueryMetrics.mergeQueryMetricsMap(emptyPageQueryMetricsMap, currentQueryMetrics);
cosmosDiagnostics = documentProducerFeedResponse.pageResult.getCosmosDiagnostics();

if (ImplementationBridgeHelpers
.CosmosQueryRequestOptionsHelper
.getCosmosQueryRequestOptionsAccessor()
.isEmptyPageDiagnosticsEnabled(cosmosQueryRequestOptions)) {

logEmptyPageDiagnostics(
cosmosDiagnostics,
this.correlatedActivityId,
documentProducerFeedResponse.pageResult.getActivityId());
}

return false;
}
return true;
Expand Down Expand Up @@ -366,6 +387,27 @@ public Flux<FeedResponse<T>> apply(Flux<DocumentProducer<T>.DocumentProducerFeed
}
}

static void logEmptyPageDiagnostics(
CosmosDiagnostics cosmosDiagnostics,
UUID correlatedActivityId,
String activityId) {
List<ClientSideRequestStatistics> requestStatistics =
BridgeInternal.getClientSideRequestStatisticsList(cosmosDiagnostics);

try {
if (logger.isInfoEnabled()) {
logger.info(
"Empty page request diagnostics for correlatedActivityId [{}] - activityId [{}] - [{}]",
correlatedActivityId,
activityId,
Utils.getSimpleObjectMapper().writeValueAsString(requestStatistics));
}

} catch (JsonProcessingException e) {
logger.warn("Failed to log empty page diagnostics. ", e);
}
}

@Override
public Flux<FeedResponse<T>> drainAsync(
int maxPageSize) {
Expand All @@ -386,7 +428,7 @@ public Flux<FeedResponse<T>> drainAsync(
logger.debug("ParallelQuery: flux mergeSequential" +
" concurrency {}, prefetch {}", fluxConcurrency, fluxPrefetch);
return Flux.mergeSequential(obs, fluxConcurrency, fluxPrefetch)
.transformDeferred(new EmptyPagesFilterTransformer<>(new RequestChargeTracker(), this.cosmosQueryRequestOptions));
.transformDeferred(new EmptyPagesFilterTransformer<>(new RequestChargeTracker(), this.cosmosQueryRequestOptions, correlatedActivityId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -37,7 +35,6 @@
public abstract class ParallelDocumentQueryExecutionContextBase<T extends Resource>
extends DocumentQueryExecutionContextBase<T> implements IDocumentQueryExecutionComponent<T> {

protected final Logger logger;
protected final List<DocumentProducer<T>> documentProducers;
protected final SqlQuerySpec querySpec;
protected int pageSize;
Expand All @@ -51,7 +48,6 @@ protected ParallelDocumentQueryExecutionContextBase(DiagnosticsClientContext dia
super(diagnosticsClientContext, client, resourceTypeEnum, resourceType, query, cosmosQueryRequestOptions, resourceLink, getLazyFeedResponse,
correlatedActivityId);

logger = LoggerFactory.getLogger(this.getClass());
documentProducers = new ArrayList<>();
if (!Strings.isNullOrEmpty(rewrittenQuery)) {
this.querySpec = new SqlQuerySpec(rewrittenQuery, super.query.getParameters());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.cosmos.models;

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
import com.azure.cosmos.util.Beta;
Expand Down Expand Up @@ -39,12 +40,15 @@ public class CosmosQueryRequestOptions {
private Map<String, String> customOptions;
private boolean indexMetricsEnabled;
private boolean queryPlanRetrievalDisallowed;
private boolean emptyPageDiagnosticsEnabled;

/**
* Instantiates a new query request options.
*/
public CosmosQueryRequestOptions() {

this.queryMetricsEnabled = true;
this.emptyPageDiagnosticsEnabled = Configs.isEmptyPageDiagnosticsEnabled();
}

/**
Expand Down Expand Up @@ -72,6 +76,7 @@ public CosmosQueryRequestOptions() {
this.customOptions = options.customOptions;
this.indexMetricsEnabled = options.indexMetricsEnabled;
this.queryPlanRetrievalDisallowed = options.queryPlanRetrievalDisallowed;
this.emptyPageDiagnosticsEnabled = options.emptyPageDiagnosticsEnabled;
}

void setOperationContextAndListenerTuple(OperationContextAndListenerTuple operationContextAndListenerTuple) {
Expand Down Expand Up @@ -550,6 +555,13 @@ boolean isQueryPlanRetrievalDisallowed() {
return this.queryPlanRetrievalDisallowed;
}

boolean isEmptyPageDiagnosticsEnabled() { return this.emptyPageDiagnosticsEnabled; }

CosmosQueryRequestOptions setEmptyPageDiagnosticsEnabled(boolean emptyPageDiagnosticsEnabled) {
this.emptyPageDiagnosticsEnabled = emptyPageDiagnosticsEnabled;
return this;
}

///////////////////////////////////////////////////////////////////////////////////////////
// the following helper/accessor only helps to access this class outside of this package.//
///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -591,6 +603,16 @@ public CosmosQueryRequestOptions disallowQueryPlanRetrieval(
public boolean isQueryPlanRetrievalDisallowed(CosmosQueryRequestOptions queryRequestOptions) {
return queryRequestOptions.isQueryPlanRetrievalDisallowed();
}

@Override
public boolean isEmptyPageDiagnosticsEnabled(CosmosQueryRequestOptions queryRequestOptions) {
return queryRequestOptions.isEmptyPageDiagnosticsEnabled();
}

@Override
public CosmosQueryRequestOptions setEmptyPageDiagnosticsEnabled(CosmosQueryRequestOptions queryRequestOptions, boolean emptyPageDiagnosticsEnabled) {
return queryRequestOptions.setEmptyPageDiagnosticsEnabled(emptyPageDiagnosticsEnabled);
}
});
}
}

0 comments on commit 76d1704

Please sign in to comment.