Skip to content

Commit

Permalink
Add more logs to identify why shard indexint pressure limit not breached
Browse files Browse the repository at this point in the history
  • Loading branch information
dreamer-89 committed Jan 14, 2022
1 parent 78c02dd commit 03b0bf7
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ public void testShardIndexingPressureEnforcedEnabledDisabledSetting() throws Exc
String primaryName = primaryReplicaNodeNames.v1();
String coordinatingOnlyNode = getCoordinatingOnlyNode();

logger.info("Coordinating only node ===> " + coordinatingOnlyNode);
logger.info("primaryName ===> " + primaryName);
logger.info("coordinatingOnlyNode ===> " + coordinatingOnlyNode);

IndexService indexService = internalCluster().getInstance(IndicesService.class, primaryName).iterator().next();
Index index = indexService.getIndexSettings().getIndex();
Expand All @@ -340,9 +341,11 @@ public void testShardIndexingPressureEnforcedEnabledDisabledSetting() throws Exc
successFuture.actionGet();

// Send couple of more requests which remains outstanding
logger.info("Sending bulk requests ==>");
Releasable primaryReleasable = blockPrimary(primaryName);
successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
ActionFuture<BulkResponse> secondSuccessFuture = client(coordinatingOnlyNode).bulk(bulkRequest);

// Delay to breach the success time stamp threshold
Thread.sleep(25);

Expand Down Expand Up @@ -374,6 +377,7 @@ public void testShardIndexingPressureEnforcedEnabledDisabledSetting() throws Exc
successFuture.actionGet();

// Send couple of requests which remains outstanding
logger.info("Sending bulk requests post disabling shard indexing pressure");
primaryReleasable = blockPrimary(primaryName);
successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
secondSuccessFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
Expand Down Expand Up @@ -411,14 +415,11 @@ public void testShardIndexingPressureEnforcedEnabledDisabledSetting() throws Exc
successFuture.actionGet();

// Send couple of requests which remains outstanding
logger.info("Sending bulk requests post re-enabling shard indexing pressure");
primaryReleasable = blockPrimary(primaryName);
successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
secondSuccessFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
Thread.sleep(10);

// logger.info("=================== " + new Throwable().getStackTrace()[0].getLineNumber() + " ================================");
// logger.info(this.threadDump());
// logger.info("===================================================");
Thread.sleep(25);

// This request breaches the threshold and hence will be rejected
expectThrows(OpenSearchRejectedExecutionException.class, () -> client(coordinatingOnlyNode).bulk(bulkRequest).actionGet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ protected void doRun() {
}
// Add the shard level accounting for coordinating and supply the listener
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
logger.info("isShardIndexingPressureEnabled() ==> " + indexingPressureService.isShardIndexingPressureEnabled());
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(
shardId,
bulkShardRequest.ramBytesUsed(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public ShardIndexingPressureStats shardStats(CommonStatsFlags statsFlags) {
return shardIndexingPressure.shardStats(statsFlags);
}

private boolean isShardIndexingPressureEnabled() {
public boolean isShardIndexingPressureEnabled() {
return shardIndexingPressure.isShardIndexingPressureEnabled();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes,
shardLevelLimitBreached = memoryManager.isCoordinatingShardLimitBreached(tracker, nodeTotalBytes, requestStartTime);
}

logger.info("nodeLevelLimitBreached => " + nodeLevelLimitBreached + " , shardLevelLimitBreached ==> " + shardLevelLimitBreached);

if (shouldRejectRequest(nodeLevelLimitBreached, shardLevelLimitBreached)) {
coordinatingRejections.getAndIncrement();
currentCombinedCoordinatingAndPrimaryBytes.addAndGet(-bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,18 +390,22 @@ private boolean onShardLimitBreached(
// Secondary Parameters (i.e. LastSuccessfulRequestDuration and Throughput) is taken into consideration when
// the current node utilization is greater than primary_parameter.node.soft_limit of total node limits.
if (((double) nodeTotalBytes / nodeLimit) < this.nodeSoftLimit) {
logger.info("Soft limit not breached");
boolean isShardLimitsIncreased = increaseShardLimitSupplier.getAsBoolean();
if (isShardLimitsIncreased == false) {
incrementNodeLimitBreachedRejectionCount(operationTracker.getRejectionTracker());
}
return !isShardLimitsIncreased;
} else {
logger.info("Soft limit breached");
logger.info("nodeTotalBytes ==> " + nodeTotalBytes + ", nodeLimit ==> " + nodeLimit);
boolean shardLastSuccessfulRequestDurationLimitsBreached = evaluateLastSuccessfulRequestDurationLimitsBreached(
operationTracker.getPerformanceTracker(),
requestStartTime
);

if (shardLastSuccessfulRequestDurationLimitsBreached) {
logger.info("shardLastSuccessfulRequestDurationLimitsBreached ==============");
operationTracker.getRejectionTracker().incrementLastSuccessfulRequestLimitsBreachedRejections();
this.totalLastSuccessfulRequestLimitsBreachedRejections.incrementAndGet();
return true;
Expand Down

0 comments on commit 03b0bf7

Please sign in to comment.