diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java index 4d5c4aa06d68a..ddecedd0aab40 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java @@ -50,7 +50,9 @@ import java.util.stream.Stream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1) @@ -83,7 +85,7 @@ protected int numberOfShards() { return 1; } - public void testWriteBytesAreIncremented() throws Exception { + public void testWriteIndexingPressureMetricsAreIncremented() throws Exception { assertAcked(prepareCreate(INDEX_NAME, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); @@ -129,6 +131,7 @@ public void testWriteBytesAreIncremented() throws Exception { final long bulkRequestSize = bulkRequest.ramBytesUsed(); final long bulkShardRequestSize = totalRequestSize; + final long bulkOps = bulkRequest.numberOfActions(); try { final ActionFuture<BulkResponse> successFuture = client(coordinatingOnlyNode).bulk(bulkRequest); @@ -138,20 +141,29 @@ public void testWriteBytesAreIncremented() throws Exception { IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName); IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); - assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); - assertThat(primaryWriteLimits.getCurrentPrimaryBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getCurrentCoordinatingBytes()); - assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); - - assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getCurrentCoordinatingBytes()); - assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); - - assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentPrimaryBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); + assertThat(primaryWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertThat(primaryWriteLimits.stats().getCurrentPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertThat(primaryWriteLimits.stats().getCurrentPrimaryOps(), greaterThanOrEqualTo(bulkOps)); + assertEquals(0, primaryWriteLimits.stats().getCurrentCoordinatingBytes()); + assertEquals(0, primaryWriteLimits.stats().getCurrentCoordinatingOps()); + assertEquals(0, primaryWriteLimits.stats().getCurrentReplicaBytes()); + assertEquals(0, primaryWriteLimits.stats().getCurrentReplicaOps()); + + assertEquals(0, replicaWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentCoordinatingBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentCoordinatingOps()); + assertEquals(0, replicaWriteLimits.stats().getCurrentPrimaryBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentPrimaryOps()); + assertEquals(0, replicaWriteLimits.stats().getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentReplicaOps()); + + assertEquals(bulkRequestSize, coordinatingWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(bulkRequestSize, coordinatingWriteLimits.stats().getCurrentCoordinatingBytes()); + assertEquals(bulkOps, coordinatingWriteLimits.stats().getCurrentCoordinatingOps()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentPrimaryOps()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentReplicaBytes()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentReplicaOps()); latchBlockingReplicationSend.countDown(); @@ -171,49 +183,68 @@ public void testWriteBytesAreIncremented() throws Exception { final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed(); final long secondBulkShardRequestSize = request.ramBytesUsed(); + final long secondBulkOps = secondBulkRequest.numberOfActions(); if (usePrimaryAsCoordinatingNode) { assertBusy(() -> { - assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), + assertThat(primaryWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize)); - assertEquals(secondBulkRequestSize, primaryWriteLimits.getCurrentCoordinatingBytes()); - assertThat(primaryWriteLimits.getCurrentPrimaryBytes(), + assertEquals(secondBulkRequestSize, primaryWriteLimits.stats().getCurrentCoordinatingBytes()); + assertEquals(secondBulkOps, primaryWriteLimits.stats().getCurrentCoordinatingOps()); + assertThat(primaryWriteLimits.stats().getCurrentPrimaryBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize)); - - assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getCurrentCoordinatingBytes()); - assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes()); + assertThat(primaryWriteLimits.stats().getCurrentPrimaryOps(), + equalTo(bulkOps + secondBulkOps)); + + assertEquals(0, replicaWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentCoordinatingBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentCoordinatingOps()); + assertEquals(0, replicaWriteLimits.stats().getCurrentPrimaryBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentPrimaryOps()); }); } else { - assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertThat(primaryWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCoordinatingBytes()); - assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes()); + assertEquals(secondBulkRequestSize, replicaWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(secondBulkRequestSize, replicaWriteLimits.stats().getCurrentCoordinatingBytes()); + assertEquals(secondBulkOps, replicaWriteLimits.stats().getCurrentCoordinatingOps()); + assertEquals(0, replicaWriteLimits.stats().getCurrentPrimaryBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentPrimaryOps()); } - assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertBusy(() -> assertThat(replicaWriteLimits.getCurrentReplicaBytes(), + assertEquals(bulkRequestSize, coordinatingWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertBusy(() -> assertThat(replicaWriteLimits.stats().getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize + secondBulkShardRequestSize))); + assertBusy(() -> assertThat(replicaWriteLimits.stats().getCurrentReplicaOps(), + equalTo(bulkOps + secondBulkOps))); replicaRelease.close(); successFuture.actionGet(); secondFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(0, primaryWriteLimits.getCurrentCoordinatingBytes()); - assertEquals(0, primaryWriteLimits.getCurrentPrimaryBytes()); - assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); - - assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getCurrentCoordinatingBytes()); - assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); - - assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentPrimaryBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, primaryWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.stats().getCurrentCoordinatingBytes()); + assertEquals(0, primaryWriteLimits.stats().getCurrentCoordinatingOps()); + assertEquals(0, primaryWriteLimits.stats().getCurrentPrimaryBytes()); + assertEquals(0, primaryWriteLimits.stats().getCurrentPrimaryOps()); + assertEquals(0, primaryWriteLimits.stats().getCurrentReplicaBytes()); + assertEquals(0, primaryWriteLimits.stats().getCurrentReplicaOps()); + + assertEquals(0, replicaWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentCoordinatingBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentCoordinatingOps()); + assertEquals(0, replicaWriteLimits.stats().getCurrentPrimaryBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentPrimaryOps()); + assertEquals(0, replicaWriteLimits.stats().getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentReplicaOps()); + + assertEquals(0, coordinatingWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentCoordinatingBytes()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentCoordinatingOps()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentPrimaryOps()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentReplicaBytes()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentReplicaOps()); } finally { if (replicationSendPointReached.getCount() > 0) { replicationSendPointReached.countDown(); @@ -262,12 +293,12 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception { IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); assertBusy(() -> { - assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); + assertThat(primaryWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.stats().getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertThat(replicaWriteLimits.stats().getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(bulkRequestSize, coordinatingWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentReplicaBytes()); }); expectThrows(EsRejectedExecutionException.class, () -> { @@ -284,12 +315,12 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception { successFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, primaryWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.stats().getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentReplicaBytes()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentReplicaBytes()); } } @@ -326,12 +357,12 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception { IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); assertBusy(() -> { - assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); + assertThat(primaryWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.stats().getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertThat(replicaWriteLimits.stats().getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentReplicaBytes()); }); BulkResponse responses = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); @@ -342,12 +373,12 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception { successFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); + assertEquals(0, primaryWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.stats().getCurrentReplicaBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.stats().getCurrentReplicaBytes()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.stats().getCurrentReplicaBytes()); } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 1948e86193b57..106086b17e1fe 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -159,9 +159,10 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque @Override protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) { + final int indexingOps = bulkRequest.numberOfActions(); final long indexingBytes = bulkRequest.ramBytesUsed(); final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices); - final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes, isOnlySystem); + final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem); final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close); final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE; try { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 56a6fe63cc0b8..bd3c272214e21 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -155,6 +155,11 @@ protected long primaryOperationSize(BulkShardRequest request) { return request.ramBytesUsed(); } + @Override + protected int primaryOperationCount(BulkShardRequest request) { + return request.items().length; + } + public static void performOnPrimary( BulkShardRequest request, IndexShard primary, @@ -439,6 +444,11 @@ protected long replicaOperationSize(BulkShardRequest request) { return request.ramBytesUsed(); } + @Override + protected int replicaOperationCount(BulkShardRequest request) { + return request.items().length; + } + public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { Translog.Location location = null; for (int i = 0; i < request.items().length; i++) { diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index e0e2e8331c607..1c865839ab334 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -113,6 +113,11 @@ protected long primaryOperationSize(ResyncReplicationRequest request) { return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); } + @Override + protected int primaryOperationCount(ResyncReplicationRequest request) { + return request.getOperations().length; + } + public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request) { return request; } @@ -131,6 +136,11 @@ protected long replicaOperationSize(ResyncReplicationRequest request) { return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); } + @Override + protected int replicaOperationCount(ResyncReplicationRequest request) { + return request.getOperations().length; + } + public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception { Translog.Location location = null; /* diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 108f2159300f9..77a1e21f37c00 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -88,7 +88,7 @@ protected String executor(IndexShard shard) { @Override protected Releasable checkOperationLimits(Request request) { - return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request)); + return indexingPressure.markPrimaryOperationStarted(primaryOperationCount(request), primaryOperationSize(request), force(request)); } protected boolean force(ReplicatedWriteRequest<?> request) { @@ -106,7 +106,8 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal // If this primary request was received from a local reroute initiated by the node client, we // must mark a new primary operation local to the coordinating node. if (localRerouteInitiatedByNodeClient) { - return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(primaryOperationSize(request)); + return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(primaryOperationCount(request), + primaryOperationSize(request)); } else { return () -> {}; } @@ -114,7 +115,8 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal // If this primary request was received directly from the network, we must mark a new primary // operation. This happens if the write action skips the reroute step (ex: rsync) or during // primary delegation, after the primary relocation hand-off. - return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), force(request)); + return indexingPressure.markPrimaryOperationStarted(primaryOperationCount(request), primaryOperationSize(request), + force(request)); } } @@ -122,15 +124,23 @@ protected long primaryOperationSize(Request request) { return 0; } + protected int primaryOperationCount(Request request) { + return 0; + } + @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), force(request)); + return indexingPressure.markReplicaOperationStarted(replicaOperationCount(request), replicaOperationSize(request), force(request)); } protected long replicaOperationSize(ReplicaRequest request) { return 0; } + protected int replicaOperationCount(ReplicaRequest request) { + return 0; + } + /** Syncs operation result to the translog or throws a shard not available failure */ protected static Location syncOperationResultOrThrow(final Engine.Result operationResult, final Location currentLocation) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java index 585a0b7f4571e..7dc3401b3c221 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java +++ b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java @@ -38,11 +38,19 @@ public class IndexingPressure { private final AtomicLong currentPrimaryBytes = new AtomicLong(0); private final AtomicLong currentReplicaBytes = new AtomicLong(0); + private final AtomicLong currentCoordinatingOps = new AtomicLong(0); + private final AtomicLong currentPrimaryOps = new AtomicLong(0); + private final AtomicLong currentReplicaOps = new AtomicLong(0); + private final AtomicLong totalCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0); private final AtomicLong totalCoordinatingBytes = new AtomicLong(0); private final AtomicLong totalPrimaryBytes = new AtomicLong(0); private final AtomicLong totalReplicaBytes = new AtomicLong(0); + private final AtomicLong totalCoordinatingOps = new AtomicLong(0); + private final AtomicLong totalPrimaryOps = new AtomicLong(0); + private final AtomicLong totalReplicaOps = new AtomicLong(0); + private final AtomicLong coordinatingRejections = new AtomicLong(0); private final AtomicLong primaryRejections = new AtomicLong(0); private final AtomicLong replicaRejections = new AtomicLong(0); @@ -55,7 +63,7 @@ public IndexingPressure(Settings settings) { this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5); } - public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) { + public Releasable markCoordinatingOperationStarted(int operations, long bytes, boolean forceExecution) { long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); long replicaWriteBytes = this.currentReplicaBytes.get(); long totalBytes = combinedBytes + replicaWriteBytes; @@ -72,21 +80,29 @@ public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExec "max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false); } currentCoordinatingBytes.getAndAdd(bytes); + currentCoordinatingOps.getAndAdd(operations); totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes); totalCoordinatingBytes.getAndAdd(bytes); + totalCoordinatingOps.getAndAdd(operations); return () -> { this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes); this.currentCoordinatingBytes.getAndAdd(-bytes); + this.currentCoordinatingOps.getAndAdd(-operations); }; } - public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(long bytes) { + public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(int operations, long bytes) { currentPrimaryBytes.getAndAdd(bytes); + currentPrimaryOps.getAndAdd(operations); totalPrimaryBytes.getAndAdd(bytes); - return () -> this.currentPrimaryBytes.getAndAdd(-bytes); + totalPrimaryOps.getAndAdd(operations); + return () -> { + this.currentPrimaryBytes.getAndAdd(-bytes); + this.currentPrimaryOps.getAndAdd(-operations); + }; } - public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution) { + public Releasable markPrimaryOperationStarted(int operations, long bytes, boolean forceExecution) { long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); long replicaWriteBytes = this.currentReplicaBytes.get(); long totalBytes = combinedBytes + replicaWriteBytes; @@ -103,15 +119,18 @@ public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution "max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false); } currentPrimaryBytes.getAndAdd(bytes); + currentPrimaryOps.getAndAdd(operations); totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes); totalPrimaryBytes.getAndAdd(bytes); + totalPrimaryOps.getAndAdd(operations); return () -> { this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes); this.currentPrimaryBytes.getAndAdd(-bytes); + this.currentPrimaryOps.getAndAdd(-operations); }; } - public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) { + public Releasable markReplicaOperationStarted(int operations, long bytes, boolean forceExecution) { long replicaWriteBytes = this.currentReplicaBytes.addAndGet(bytes); if (forceExecution == false && replicaWriteBytes > replicaLimits) { long replicaBytesWithoutOperation = replicaWriteBytes - bytes; @@ -122,30 +141,20 @@ public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution "replica_operation_bytes=" + bytes + ", " + "max_replica_bytes=" + replicaLimits + "]", false); } + currentReplicaOps.getAndAdd(operations); totalReplicaBytes.getAndAdd(bytes); - return () -> this.currentReplicaBytes.getAndAdd(-bytes); - } - - public long getCurrentCombinedCoordinatingAndPrimaryBytes() { - return currentCombinedCoordinatingAndPrimaryBytes.get(); - } - - public long getCurrentCoordinatingBytes() { - return currentCoordinatingBytes.get(); - } - - public long getCurrentPrimaryBytes() { - return currentPrimaryBytes.get(); - } - - public long getCurrentReplicaBytes() { - return currentReplicaBytes.get(); + totalReplicaOps.getAndAdd(operations); + return () -> { + this.currentReplicaBytes.getAndAdd(-bytes); + this.currentReplicaOps.getAndAdd(-operations); + }; } public IndexingPressureStats stats() { return new IndexingPressureStats(totalCombinedCoordinatingAndPrimaryBytes.get(), totalCoordinatingBytes.get(), totalPrimaryBytes.get(), totalReplicaBytes.get(), currentCombinedCoordinatingAndPrimaryBytes.get(), currentCoordinatingBytes.get(), currentPrimaryBytes.get(), currentReplicaBytes.get(), coordinatingRejections.get(), - primaryRejections.get(), replicaRejections.get(), primaryAndCoordinatingLimits); + primaryRejections.get(), replicaRejections.get(), primaryAndCoordinatingLimits, totalCoordinatingOps.get(), + totalPrimaryOps.get(), totalReplicaOps.get(), currentCoordinatingOps.get(), currentPrimaryOps.get(), currentReplicaOps.get()); } } diff --git a/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java b/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java index 27afc8dfa6fd9..28ca2c7cbdae4 100644 --- a/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java +++ b/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java @@ -46,6 +46,14 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment { private final long replicaRejections; private final long memoryLimit; + // These fields will be used for additional back-pressure and metrics in the future + private final long totalCoordinatingOps; + private final long totalPrimaryOps; + private final long totalReplicaOps; + private final long currentCoordinatingOps; + private final long currentPrimaryOps; + private final long currentReplicaOps; + public IndexingPressureStats(StreamInput in) throws IOException { totalCombinedCoordinatingAndPrimaryBytes = in.readVLong(); totalCoordinatingBytes = in.readVLong(); @@ -66,12 +74,21 @@ public IndexingPressureStats(StreamInput in) throws IOException { } else { memoryLimit = -1L; } + + // These are not currently propagated across the network yet + this.totalCoordinatingOps = 0; + this.totalPrimaryOps = 0; + this.totalReplicaOps = 0; + this.currentCoordinatingOps = 0; + this.currentPrimaryOps = 0; + this.currentReplicaOps = 0; } public IndexingPressureStats(long totalCombinedCoordinatingAndPrimaryBytes, long totalCoordinatingBytes, long totalPrimaryBytes, long totalReplicaBytes, long currentCombinedCoordinatingAndPrimaryBytes, long currentCoordinatingBytes, long currentPrimaryBytes, long currentReplicaBytes, long coordinatingRejections, long primaryRejections, - long replicaRejections, long memoryLimit) { + long replicaRejections, long memoryLimit, long totalCoordinatingOps, long totalPrimaryOps, + long totalReplicaOps, long currentCoordinatingOps, long currentPrimaryOps, long currentReplicaOps) { this.totalCombinedCoordinatingAndPrimaryBytes = totalCombinedCoordinatingAndPrimaryBytes; this.totalCoordinatingBytes = totalCoordinatingBytes; this.totalPrimaryBytes = totalPrimaryBytes; @@ -84,6 +101,13 @@ public IndexingPressureStats(long totalCombinedCoordinatingAndPrimaryBytes, long this.primaryRejections = primaryRejections; this.replicaRejections = replicaRejections; this.memoryLimit = memoryLimit; + + this.totalCoordinatingOps = totalCoordinatingOps; + this.totalPrimaryOps = totalPrimaryOps; + this.totalReplicaOps = totalReplicaOps; + this.currentCoordinatingOps = currentCoordinatingOps; + this.currentPrimaryOps = currentPrimaryOps; + this.currentReplicaOps = currentReplicaOps; } @Override @@ -151,6 +175,30 @@ public long getReplicaRejections() { return replicaRejections; } + public long getTotalCoordinatingOps() { + return totalCoordinatingOps; + } + + public long getTotalPrimaryOps() { + return totalPrimaryOps; + } + + public long getTotalReplicaOps() { + return totalReplicaOps; + } + + public long getCurrentCoordinatingOps() { + return currentCoordinatingOps; + } + + public long getCurrentPrimaryOps() { + return currentPrimaryOps; + } + + public long getCurrentReplicaOps() { + return currentReplicaOps; + } + private static final String COMBINED = "combined_coordinating_and_primary"; private static final String COMBINED_IN_BYTES = "combined_coordinating_and_primary_in_bytes"; private static final String COORDINATING = "coordinating"; diff --git a/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java b/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java index 6cfcc4f329c96..b3119736f508e 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java @@ -29,35 +29,48 @@ public class IndexingPressureTests extends ESTestCase { private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB").build(); - public void testMemoryBytesMarkedAndReleased() { + public void testMemoryBytesAndOpsMarkedAndReleased() { IndexingPressure indexingPressure = new IndexingPressure(settings); - try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(10, false); - Releasable coordinating2 = indexingPressure.markCoordinatingOperationStarted(50, false); - Releasable primary = indexingPressure.markPrimaryOperationStarted(15, true); - Releasable primary2 = indexingPressure.markPrimaryOperationStarted(5, false); - Releasable replica = indexingPressure.markReplicaOperationStarted(25, true); - Releasable replica2 = indexingPressure.markReplicaOperationStarted(10, false)) { + try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1, 10, false); + Releasable coordinating2 = indexingPressure.markCoordinatingOperationStarted(5, 50, false); + Releasable primary = indexingPressure.markPrimaryOperationStarted(2, 15, true); + Releasable primary2 = indexingPressure.markPrimaryOperationStarted(1, 5, false); + Releasable replica = indexingPressure.markReplicaOperationStarted(3, 25, true); + Releasable replica2 = indexingPressure.markReplicaOperationStarted(1, 10, false)) { IndexingPressureStats stats = indexingPressure.stats(); assertEquals(60, stats.getCurrentCoordinatingBytes()); assertEquals(20, stats.getCurrentPrimaryBytes()); assertEquals(80, stats.getCurrentCombinedCoordinatingAndPrimaryBytes()); assertEquals(35, stats.getCurrentReplicaBytes()); + + assertEquals(6, stats.getTotalCoordinatingOps()); + assertEquals(3, stats.getTotalPrimaryOps()); + assertEquals(4, stats.getTotalReplicaOps()); } IndexingPressureStats stats = indexingPressure.stats(); assertEquals(0, stats.getCurrentCoordinatingBytes()); assertEquals(0, stats.getCurrentPrimaryBytes()); assertEquals(0, stats.getCurrentCombinedCoordinatingAndPrimaryBytes()); assertEquals(0, stats.getCurrentReplicaBytes()); + + assertEquals(0, stats.getCurrentCoordinatingOps()); + assertEquals(0, stats.getCurrentPrimaryOps()); + assertEquals(0, stats.getCurrentReplicaOps()); + assertEquals(60, stats.getTotalCoordinatingBytes()); assertEquals(20, stats.getTotalPrimaryBytes()); assertEquals(80, stats.getTotalCombinedCoordinatingAndPrimaryBytes()); assertEquals(35, stats.getTotalReplicaBytes()); + + assertEquals(6, stats.getTotalCoordinatingOps()); + assertEquals(3, stats.getTotalPrimaryOps()); + assertEquals(4, stats.getTotalReplicaOps()); } - public void testAvoidDoubleAccounting() { + public void testAvoidDoubleMemoryAccounting() { IndexingPressure indexingPressure = new IndexingPressure(settings); - try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(10, false); - Releasable primary = indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(15)) { + try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1, 10, false); + Releasable primary = indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(1, 15)) { IndexingPressureStats stats = indexingPressure.stats(); assertEquals(10, stats.getCurrentCoordinatingBytes()); assertEquals(15, stats.getCurrentPrimaryBytes()); @@ -74,30 +87,31 @@ public void testAvoidDoubleAccounting() { public void testCoordinatingPrimaryRejections() { IndexingPressure indexingPressure = new IndexingPressure(settings); - try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1024 * 3, false); - Releasable primary = indexingPressure.markPrimaryOperationStarted(1024 * 3, false); - Releasable replica = indexingPressure.markReplicaOperationStarted(1024 * 3, false)) { + try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1, 1024 * 3, false); + Releasable primary = indexingPressure.markPrimaryOperationStarted(1, 1024 * 3, false); + Releasable replica = indexingPressure.markReplicaOperationStarted(1, 1024 * 3, false)) { if (randomBoolean()) { - expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markCoordinatingOperationStarted(1024 * 2, false)); + expectThrows(EsRejectedExecutionException.class, + () -> indexingPressure.markCoordinatingOperationStarted(1, 1024 * 2, false)); IndexingPressureStats stats = indexingPressure.stats(); assertEquals(1, stats.getCoordinatingRejections()); assertEquals(1024 * 6, stats.getCurrentCombinedCoordinatingAndPrimaryBytes()); } else { - expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markPrimaryOperationStarted(1024 * 2, false)); + expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markPrimaryOperationStarted(1, 1024 * 2, false)); IndexingPressureStats stats = indexingPressure.stats(); assertEquals(1, stats.getPrimaryRejections()); assertEquals(1024 * 6, stats.getCurrentCombinedCoordinatingAndPrimaryBytes()); } long preForceRejections = indexingPressure.stats().getPrimaryRejections(); // Primary can be forced - Releasable forced = indexingPressure.markPrimaryOperationStarted(1024 * 2, true); + Releasable forced = indexingPressure.markPrimaryOperationStarted(1, 1024 * 2, true); assertEquals(preForceRejections, indexingPressure.stats().getPrimaryRejections()); assertEquals(1024 * 8, indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); forced.close(); // Local to coordinating node primary actions not rejected IndexingPressureStats preLocalStats = indexingPressure.stats(); - Releasable local = indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(1024 * 2); + Releasable local = indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(1, 1024 * 2); assertEquals(preLocalStats.getPrimaryRejections(), indexingPressure.stats().getPrimaryRejections()); assertEquals(1024 * 6, indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); assertEquals(preLocalStats.getCurrentPrimaryBytes() + 1024 * 2, indexingPressure.stats().getCurrentPrimaryBytes()); @@ -109,20 +123,20 @@ public void testCoordinatingPrimaryRejections() { public void testReplicaRejections() { IndexingPressure indexingPressure = new IndexingPressure(settings); - try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1024 * 3, false); - Releasable primary = indexingPressure.markPrimaryOperationStarted(1024 * 3, false); - Releasable replica = indexingPressure.markReplicaOperationStarted(1024 * 3, false)) { + try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1, 1024 * 3, false); + Releasable primary = indexingPressure.markPrimaryOperationStarted(1, 1024 * 3, false); + Releasable replica = indexingPressure.markReplicaOperationStarted(1, 1024 * 3, false)) { // Replica will not be rejected until replica bytes > 15KB - Releasable replica2 = indexingPressure.markReplicaOperationStarted(1024 * 11, false); + Releasable replica2 = indexingPressure.markReplicaOperationStarted(1, 1024 * 11, false); assertEquals(1024 * 14, indexingPressure.stats().getCurrentReplicaBytes()); // Replica will be rejected once we cross 15KB - expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markReplicaOperationStarted(1024 * 2, false)); + expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markReplicaOperationStarted(1, 1024 * 2, false)); IndexingPressureStats stats = indexingPressure.stats(); assertEquals(1, stats.getReplicaRejections()); assertEquals(1024 * 14, stats.getCurrentReplicaBytes()); // Replica can be forced - Releasable forced = indexingPressure.markPrimaryOperationStarted(1024 * 2, true); + Releasable forced = indexingPressure.markPrimaryOperationStarted(1, 1024 * 2, true); assertEquals(1, indexingPressure.stats().getReplicaRejections()); assertEquals(1024 * 14, indexingPressure.stats().getCurrentReplicaBytes()); forced.close(); @@ -136,8 +150,8 @@ public void testReplicaRejections() { public void testForceExecutionOnCoordinating() { IndexingPressure indexingPressure = new IndexingPressure(settings); - expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markCoordinatingOperationStarted(1024 * 11, false)); - try (Releasable ignore = indexingPressure.markCoordinatingOperationStarted(1024 * 11, true)) { + expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markCoordinatingOperationStarted(1, 1024 * 11, false)); + try (Releasable ignore = indexingPressure.markCoordinatingOperationStarted(1, 1024 * 11, true)) { assertEquals(1024 * 11, indexingPressure.stats().getCurrentCoordinatingBytes()); } assertEquals(0, indexingPressure.stats().getCurrentCoordinatingBytes()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 835e957e46483..a85d7a12c51eb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1171,22 +1171,22 @@ private void assertAllPendingWriteLimitsReleased() throws Exception { assertBusy(() -> { for (NodeAndClient nodeAndClient : nodes.values()) { IndexingPressure indexingPressure = getInstance(IndexingPressure.class, nodeAndClient.name); - final long combinedBytes = indexingPressure.getCurrentCombinedCoordinatingAndPrimaryBytes(); + final long combinedBytes = indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(); if (combinedBytes > 0) { throw new AssertionError("pending combined bytes [" + combinedBytes + "] bytes on node [" + nodeAndClient.name + "]."); } - final long coordinatingBytes = indexingPressure.getCurrentCoordinatingBytes(); + final long coordinatingBytes = indexingPressure.stats().getCurrentCoordinatingBytes(); if (coordinatingBytes > 0) { throw new AssertionError("pending coordinating bytes [" + coordinatingBytes + "] bytes on node [" + nodeAndClient.name + "]."); } - final long primaryBytes = indexingPressure.getCurrentPrimaryBytes(); + final long primaryBytes = indexingPressure.stats().getCurrentPrimaryBytes(); if (primaryBytes > 0) { throw new AssertionError("pending primary bytes [" + primaryBytes + "] bytes on node [" + nodeAndClient.name + "]."); } - final long replicaWriteBytes = indexingPressure.getCurrentReplicaBytes(); + final long replicaWriteBytes = indexingPressure.stats().getCurrentReplicaBytes(); if (replicaWriteBytes > 0) { throw new AssertionError("pending replica write bytes [" + combinedBytes + "] bytes on node [" + nodeAndClient.name + "]."); diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index 82f3d9c6872bc..419b813e25dae 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -82,7 +82,7 @@ public void testFollowIndex() throws Exception { ensureEmptyWriteBuffers(); } - public void testWriteLimitsIncremented() throws Exception { + public void testIndexingMetricsIncremented() throws Exception { final String leaderIndexSettings = getIndexSettings(1, 0, Collections.emptyMap()); assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); ensureGreen("leader"); @@ -120,12 +120,13 @@ public void testWriteLimitsIncremented() throws Exception { final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower"); client().execute(PutFollowAction.INSTANCE, followRequest).get(); - IndexingPressure memoryLimits = getInstanceFromNode(IndexingPressure.class); + IndexingPressure indexingPressure = getInstanceFromNode(IndexingPressure.class); final long finalSourceSize = sourceSize; assertBusy(() -> { // The actual write bytes will be greater due to other request fields. However, this test is // just spot checking that the bytes are incremented at all. - assertTrue(memoryLimits.getCurrentCombinedCoordinatingAndPrimaryBytes() > finalSourceSize); + assertTrue(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes() > finalSourceSize); + assertEquals(firstBatchNumDocs, indexingPressure.stats().getCurrentPrimaryOps()); }); blocker.countDown(); assertBusy(() -> { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index d8c4e29097f16..bec4975f11bb5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -77,7 +77,8 @@ public TransportBulkShardOperationsAction( @Override protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener<BulkShardOperationsResponse> listener) { // This is executed on the follower coordinator node and we need to mark the bytes. - Releasable releasable = indexingPressure.markCoordinatingOperationStarted(primaryOperationSize(request), false); + Releasable releasable = indexingPressure.markCoordinatingOperationStarted(primaryOperationCount(request), + primaryOperationSize(request), false); ActionListener<BulkShardOperationsResponse> releasingListener = ActionListener.runBefore(listener, releasable::close); try { super.doExecute(task, request, releasingListener); @@ -101,6 +102,11 @@ protected long primaryOperationSize(BulkShardOperationsRequest request) { return request.getOperations().stream().mapToLong(Translog.Operation::estimateSize).sum(); } + @Override + protected int primaryOperationCount(BulkShardOperationsRequest request) { + return request.getOperations().size(); + } + public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) { final Translog.Operation operationWithPrimaryTerm; switch (operation.opType()) { @@ -205,6 +211,11 @@ protected long replicaOperationSize(BulkShardOperationsRequest request) { return request.getOperations().stream().mapToLong(Translog.Operation::estimateSize).sum(); } + @Override + protected int replicaOperationCount(BulkShardOperationsRequest request) { + return request.getOperations().size(); + } + // public for testing purposes only public static WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica( final BulkShardOperationsRequest request, final IndexShard replica, final Logger logger) throws IOException {