diff --git a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java index b096107e0a327..6e796e0f9162a 100644 --- a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java +++ b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java @@ -76,32 +76,44 @@ public void testIndexingPressureStats() throws IOException { ArrayList values = new ArrayList<>(((Map) nodeStatsMap.get("nodes")).values()); assertThat(values.size(), equalTo(2)); XContentTestUtils.JsonMapView node1 = new XContentTestUtils.JsonMapView((Map) values.get(0)); - Integer node1IndexingBytes = node1.get("indexing_pressure.total.coordinating_and_primary_bytes"); + Integer node1CombinedBytes = node1.get("indexing_pressure.total.combined_coordinating_and_primary_bytes"); + Integer node1PrimaryBytes = node1.get("indexing_pressure.total.primary_bytes"); Integer node1ReplicaBytes = node1.get("indexing_pressure.total.replica_bytes"); - Integer node1Rejections = node1.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + Integer node1CoordinatingRejections = node1.get("indexing_pressure.total.coordinating_rejections"); + Integer node1PrimaryRejections = node1.get("indexing_pressure.total.primary_rejections"); XContentTestUtils.JsonMapView node2 = new XContentTestUtils.JsonMapView((Map) values.get(1)); - Integer node2IndexingBytes = node2.get("indexing_pressure.total.coordinating_and_primary_bytes"); + Integer node2IndexingBytes = node2.get("indexing_pressure.total.combined_coordinating_and_primary_bytes"); + Integer node2PrimaryBytes = node2.get("indexing_pressure.total.primary_bytes"); Integer node2ReplicaBytes = node2.get("indexing_pressure.total.replica_bytes"); - Integer node2Rejections = node2.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + Integer node2CoordinatingRejections = node2.get("indexing_pressure.total.coordinating_rejections"); + Integer node2PrimaryRejections = node2.get("indexing_pressure.total.primary_rejections"); - if (node1IndexingBytes == 0) { + if (node1CombinedBytes == 0) { assertThat(node2IndexingBytes, greaterThan(0)); assertThat(node2IndexingBytes, lessThan(1024)); } else { - assertThat(node1IndexingBytes, greaterThan(0)); - assertThat(node1IndexingBytes, lessThan(1024)); + assertThat(node1CombinedBytes, greaterThan(0)); + assertThat(node1CombinedBytes, lessThan(1024)); } if (node1ReplicaBytes == 0) { + assertThat(node1PrimaryBytes, greaterThan(0)); + assertThat(node1PrimaryBytes, lessThan(1024)); + assertThat(node2ReplicaBytes, greaterThan(0)); assertThat(node2ReplicaBytes, lessThan(1024)); } else { + assertThat(node2PrimaryBytes, greaterThan(0)); + assertThat(node2PrimaryBytes, lessThan(1024)); + assertThat(node2ReplicaBytes, equalTo(0)); assertThat(node1ReplicaBytes, lessThan(1024)); } - assertThat(node1Rejections, equalTo(0)); - assertThat(node2Rejections, equalTo(0)); + assertThat(node1CoordinatingRejections, equalTo(0)); + assertThat(node1PrimaryRejections, equalTo(0)); + assertThat(node2CoordinatingRejections, equalTo(0)); + assertThat(node2PrimaryRejections, equalTo(0)); Request failedIndexingRequest = new Request("POST", "/index_name/_doc/"); String largeString = randomAlphaOfLength(10000); @@ -116,14 +128,19 @@ public void testIndexingPressureStats() throws IOException { ArrayList values2 = new ArrayList<>(((Map) nodeStatsMap2.get("nodes")).values()); assertThat(values2.size(), equalTo(2)); XContentTestUtils.JsonMapView node1AfterRejection = new XContentTestUtils.JsonMapView((Map) values2.get(0)); - node1Rejections = node1AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + node1CoordinatingRejections = node1AfterRejection.get("indexing_pressure.total.coordinating_rejections"); + node1PrimaryRejections = node1.get("indexing_pressure.total.primary_rejections"); XContentTestUtils.JsonMapView node2AfterRejection = new XContentTestUtils.JsonMapView((Map) values2.get(1)); - node2Rejections = node2AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections"); + node2CoordinatingRejections = node2AfterRejection.get("indexing_pressure.total.coordinating_rejections"); + node2PrimaryRejections = node2AfterRejection.get("indexing_pressure.total.primary_rejections"); - if (node1Rejections == 0) { - assertThat(node2Rejections, equalTo(1)); + if (node1CoordinatingRejections == 0) { + assertThat(node2CoordinatingRejections, equalTo(1)); } else { - assertThat(node1Rejections, equalTo(1)); + assertThat(node1CoordinatingRejections, equalTo(1)); } + + assertThat(node1PrimaryRejections, equalTo(0)); + assertThat(node2PrimaryRejections, equalTo(0)); } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml index bf85c9e2ac1af..f990f2d69921d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml @@ -14,12 +14,19 @@ nodes.stats: metric: [ indexing_pressure ] - - gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.combined_coordinating_and_primary_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.coordinating_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.primary_bytes: 0 } - gte: { nodes.$node_id.indexing_pressure.total.replica_bytes: 0 } - gte: { nodes.$node_id.indexing_pressure.total.all_bytes: 0 } - - gte: { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_memory_limit_rejections: 0 } - - gte: { nodes.$node_id.indexing_pressure.total.replica_memory_limit_rejections: 0 } - - gte: { nodes.$node_id.indexing_pressure.current.coordinating_and_primary_bytes: 0 } + + - gte: { nodes.$node_id.indexing_pressure.total.coordinating_rejections: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.primary_rejections: 0 } + - gte: { nodes.$node_id.indexing_pressure.total.replica_rejections: 0 } + + - gte: { nodes.$node_id.indexing_pressure.current.combined_coordinating_and_primary_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.current.coordinating_bytes: 0 } + - gte: { nodes.$node_id.indexing_pressure.current.primary_bytes: 0 } - gte: { nodes.$node_id.indexing_pressure.current.replica_bytes: 0 } - gte: { nodes.$node_id.indexing_pressure.current.all_bytes: 0 } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java index 43ccf33db1e00..4d5c4aa06d68a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java @@ -56,8 +56,6 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1) public class IndexingPressureIT extends ESIntegTestCase { - // TODO: Add additional REST tests when metrics are exposed - public static final String INDEX_NAME = "test"; private static final Settings unboundedWriteQueue = Settings.builder().put("thread_pool.write.queue_size", -1).build(); @@ -140,11 +138,19 @@ public void testWriteBytesAreIncremented() throws Exception { IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName); IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); - assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertThat(primaryWriteLimits.getCurrentPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getCurrentCoordinatingBytes()); assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + + assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingBytes()); + assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes()); assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); - assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + + assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentPrimaryBytes()); assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); latchBlockingReplicationSend.countDown(); @@ -167,14 +173,25 @@ public void testWriteBytesAreIncremented() throws Exception { final long secondBulkShardRequestSize = request.ramBytesUsed(); if (usePrimaryAsCoordinatingNode) { - assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), - greaterThan(bulkShardRequestSize + secondBulkRequestSize)); - assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertBusy(() -> { + assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), + greaterThan(bulkShardRequestSize + secondBulkRequestSize)); + assertEquals(secondBulkRequestSize, primaryWriteLimits.getCurrentCoordinatingBytes()); + assertThat(primaryWriteLimits.getCurrentPrimaryBytes(), + greaterThan(bulkShardRequestSize + secondBulkRequestSize)); + + assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingBytes()); + assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes()); + }); } else { - assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + + assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCoordinatingBytes()); + assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes()); } - assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); assertBusy(() -> assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize + secondBulkShardRequestSize))); @@ -183,11 +200,19 @@ public void testWriteBytesAreIncremented() throws Exception { successFuture.actionGet(); secondFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getCurrentCoordinatingBytes()); + assertEquals(0, primaryWriteLimits.getCurrentPrimaryBytes()); assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + + assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCoordinatingBytes()); + assertEquals(0, replicaWriteLimits.getCurrentPrimaryBytes()); assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + + assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentPrimaryBytes()); assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); } finally { if (replicationSendPointReached.getCount() > 0) { @@ -237,11 +262,11 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception { IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); assertBusy(() -> { - assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); }); @@ -259,11 +284,11 @@ public void testWriteCanBeRejectedAtCoordinatingLevel() throws Exception { successFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); } } @@ -301,11 +326,11 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception { IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode); assertBusy(() -> { - assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertThat(primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize)); assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); }); @@ -317,11 +342,11 @@ public void testWriteCanBeRejectedAtPrimaryLevel() throws Exception { successFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes()); - assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes()); + assertEquals(0, coordinatingWriteLimits.getCurrentCombinedCoordinatingAndPrimaryBytes()); assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes()); } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 3500bad94476a..8b8804f57685e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -163,7 +163,7 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest docWriteReque @Override protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { long indexingBytes = bulkRequest.ramBytesUsed(); - final Releasable releasable = indexingPressure.markIndexingOperationStarted(indexingBytes); + final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes); final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); try { doInternalExecute(task, bulkRequest, releasingListener); diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index bce9517e3ef64..0daafbf4ce887 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -179,7 +179,15 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans @Override protected void doExecute(Task task, Request request, ActionListener listener) { assert request.shardId() != null : "request shardId must be set"; - new ReroutePhase((ReplicationTask) task, request, listener).run(); + runReroutePhase(task, request, listener, true); + } + + private void runReroutePhase(Task task, Request request, ActionListener listener, boolean initiatedByNodeClient) { + try { + new ReroutePhase((ReplicationTask) task, request, listener, initiatedByNodeClient).run(); + } catch (RuntimeException e) { + listener.onFailure(e); + } } protected ReplicationOperation.Replicas newReplicasProxy() { @@ -277,7 +285,7 @@ private void handleOperationRequest(final Request request, final TransportChanne Releasable releasable = checkOperationLimits(request); ActionListener listener = ActionListener.runBefore(new ChannelActionListener<>(channel, actionName, request), releasable::close); - execute(task, request, listener); + runReroutePhase(task, request, listener, false); } protected Releasable checkOperationLimits(final Request request) { @@ -285,7 +293,8 @@ protected Releasable checkOperationLimits(final Request request) { } protected void handlePrimaryRequest(final ConcreteShardRequest request, final TransportChannel channel, final Task task) { - Releasable releasable = checkPrimaryLimits(request.getRequest(), request.sentFromLocalReroute()); + Releasable releasable = checkPrimaryLimits(request.getRequest(), request.sentFromLocalReroute(), + request.localRerouteInitiatedByNodeClient()); ActionListener listener = ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close); @@ -296,7 +305,7 @@ protected void handlePrimaryRequest(final ConcreteShardRequest request, } } - protected Releasable checkPrimaryLimits(final Request request, boolean rerouteWasLocal) { + protected Releasable checkPrimaryLimits(final Request request, boolean rerouteWasLocal, boolean localRerouteInitiatedByNodeClient) { return () -> {}; } @@ -658,12 +667,18 @@ private IndexShard getIndexShard(final ShardId shardId) { final class ReroutePhase extends AbstractRunnable { private final ActionListener listener; private final Request request; + private final boolean initiatedByNodeClient; private final ReplicationTask task; private final ClusterStateObserver observer; private final AtomicBoolean finished = new AtomicBoolean(); ReroutePhase(ReplicationTask task, Request request, ActionListener listener) { + this(task, request, listener, false); + } + + ReroutePhase(ReplicationTask task, Request request, ActionListener listener, boolean initiatedByNodeClient) { this.request = request; + this.initiatedByNodeClient = initiatedByNodeClient; if (task != null) { this.request.setParentTask(clusterService.localNode().getId(), task.getId()); } @@ -749,7 +764,8 @@ private void performLocalAction(ClusterState state, ShardRouting primary, Discov transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId()); } performAction(node, transportPrimaryAction, true, - new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()), true)); + new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()), true, + initiatedByNodeClient)); } private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) { @@ -1103,25 +1119,31 @@ public static class ConcreteShardRequest extends Tra private final R request; // Indicates if this primary shard request originated by a reroute on this local node. private final boolean sentFromLocalReroute; + // Indicates if this local reroute was initiated by the NodeClient executing a transport action. This + // is only true if sentFromLocalReroute is true. + private final boolean localRerouteInitiatedByNodeClient; public ConcreteShardRequest(Writeable.Reader requestReader, StreamInput in) throws IOException { targetAllocationID = in.readString(); primaryTerm = in.readVLong(); sentFromLocalReroute = false; + localRerouteInitiatedByNodeClient = false; request = requestReader.read(in); } public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) { - this(request, targetAllocationID, primaryTerm, false); + this(request, targetAllocationID, primaryTerm, false, false); } - public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean sentFromLocalReroute) { + public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean sentFromLocalReroute, + boolean localRerouteInitiatedByNodeClient) { Objects.requireNonNull(request); Objects.requireNonNull(targetAllocationID); this.request = request; this.targetAllocationID = targetAllocationID; this.primaryTerm = primaryTerm; this.sentFromLocalReroute = sentFromLocalReroute; + this.localRerouteInitiatedByNodeClient = localRerouteInitiatedByNodeClient; } @Override @@ -1154,6 +1176,7 @@ public void writeTo(StreamOutput out) throws IOException { // the local transport. It should never be serialized to be sent over the wire. If it is sent over // the wire, then it was NOT sent from a local reroute. assert sentFromLocalReroute == false; + assert localRerouteInitiatedByNodeClient == false; out.writeString(targetAllocationID); out.writeVLong(primaryTerm); request.writeTo(out); @@ -1163,6 +1186,10 @@ public boolean sentFromLocalReroute() { return sentFromLocalReroute; } + public boolean localRerouteInitiatedByNodeClient() { + return localRerouteInitiatedByNodeClient; + } + public R getRequest() { return request; } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index e33857eb560a7..5a2820fd6de00 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -80,17 +80,24 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe @Override protected Releasable checkOperationLimits(Request request) { - return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution); + return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution); } @Override - protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal) { - // If this primary request was submitted by a reroute performed on this local node, we have already - // accounted the bytes. + protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal, boolean localRerouteInitiatedByNodeClient) { if (rerouteWasLocal) { - return () -> {}; + // If this primary request was received from a local reroute initiated by the node client, we + // must mark a new primary operation local to the coordinating node. + if (localRerouteInitiatedByNodeClient) { + return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(primaryOperationSize(request)); + } else { + return () -> {}; + } } else { - return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution); + // If this primary request was received directly from the network, we must mark a new primary + // operation. This happens if the write action skips the reroute step (ex: rsync) or during + // primary delegation, after the primary relocation hand-off. + return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution); } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java index 9c8fb83fe4ffc..d3e56362a9b9e 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java +++ b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java @@ -33,11 +33,18 @@ public class IndexingPressure { public static final Setting MAX_INDEXING_BYTES = Setting.memorySizeSetting("indexing_pressure.memory.limit", "10%", Setting.Property.NodeScope); - private final AtomicLong currentCoordinatingAndPrimaryBytes = new AtomicLong(0); + private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0); + private final AtomicLong currentCoordinatingBytes = new AtomicLong(0); + private final AtomicLong currentPrimaryBytes = new AtomicLong(0); private final AtomicLong currentReplicaBytes = new AtomicLong(0); - private final AtomicLong totalCoordinatingAndPrimaryBytes = new AtomicLong(0); + + private final AtomicLong totalCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0); + private final AtomicLong totalCoordinatingBytes = new AtomicLong(0); + private final AtomicLong totalPrimaryBytes = new AtomicLong(0); private final AtomicLong totalReplicaBytes = new AtomicLong(0); - private final AtomicLong coordinatingAndPrimaryRejections = new AtomicLong(0); + + private final AtomicLong coordinatingRejections = new AtomicLong(0); + private final AtomicLong primaryRejections = new AtomicLong(0); private final AtomicLong replicaRejections = new AtomicLong(0); private final long primaryAndCoordinatingLimits; @@ -48,28 +55,60 @@ public IndexingPressure(Settings settings) { this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5); } - public Releasable markIndexingOperationStarted(long bytes) { - return markIndexingOperationStarted(bytes, false); + public Releasable markCoordinatingOperationStarted(long bytes) { + long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); + long replicaWriteBytes = this.currentReplicaBytes.get(); + long totalBytes = combinedBytes + replicaWriteBytes; + if (totalBytes > primaryAndCoordinatingLimits) { + long bytesWithoutOperation = combinedBytes - bytes; + long totalBytesWithoutOperation = totalBytes - bytes; + this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes); + this.coordinatingRejections.getAndIncrement(); + throw new EsRejectedExecutionException("rejected execution of coordinating operation [" + + "coordinating_and_primary_bytes=" + bytesWithoutOperation + ", " + + "replica_bytes=" + replicaWriteBytes + ", " + + "all_bytes=" + totalBytesWithoutOperation + ", " + + "operation_bytes=" + bytes + ", " + + "max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false); + } + currentCoordinatingBytes.getAndAdd(bytes); + totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes); + totalCoordinatingBytes.getAndAdd(bytes); + return () -> { + this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes); + this.currentCoordinatingBytes.getAndAdd(-bytes); + }; + } + + public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(long bytes) { + currentPrimaryBytes.getAndAdd(bytes); + totalPrimaryBytes.getAndAdd(bytes); + return () -> this.currentPrimaryBytes.getAndAdd(-bytes); } - public Releasable markIndexingOperationStarted(long bytes, boolean forceExecution) { - long writeBytes = this.currentCoordinatingAndPrimaryBytes.addAndGet(bytes); + public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution) { + long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); long replicaWriteBytes = this.currentReplicaBytes.get(); - long totalBytes = writeBytes + replicaWriteBytes; + long totalBytes = combinedBytes + replicaWriteBytes; if (forceExecution == false && totalBytes > primaryAndCoordinatingLimits) { - long bytesWithoutOperation = writeBytes - bytes; + long bytesWithoutOperation = combinedBytes - bytes; long totalBytesWithoutOperation = totalBytes - bytes; - this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes); - this.coordinatingAndPrimaryRejections.getAndIncrement(); - throw new EsRejectedExecutionException("rejected execution of operation [" + + this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes); + this.primaryRejections.getAndIncrement(); + throw new EsRejectedExecutionException("rejected execution of primary operation [" + "coordinating_and_primary_bytes=" + bytesWithoutOperation + ", " + "replica_bytes=" + replicaWriteBytes + ", " + "all_bytes=" + totalBytesWithoutOperation + ", " + "operation_bytes=" + bytes + ", " + "max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false); } - totalCoordinatingAndPrimaryBytes.getAndAdd(bytes); - return () -> this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes); + currentPrimaryBytes.getAndAdd(bytes); + totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes); + totalPrimaryBytes.getAndAdd(bytes); + return () -> { + this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes); + this.currentPrimaryBytes.getAndAdd(-bytes); + }; } public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) { @@ -87,25 +126,26 @@ public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution return () -> this.currentReplicaBytes.getAndAdd(-bytes); } - public long getCurrentCoordinatingAndPrimaryBytes() { - return currentCoordinatingAndPrimaryBytes.get(); + public long getCurrentCombinedCoordinatingAndPrimaryBytes() { + return currentCombinedCoordinatingAndPrimaryBytes.get(); } - public long getCurrentReplicaBytes() { - return currentReplicaBytes.get(); + public long getCurrentCoordinatingBytes() { + return currentCoordinatingBytes.get(); } - public long getTotalCoordinatingAndPrimaryBytes() { - return totalCoordinatingAndPrimaryBytes.get(); + public long getCurrentPrimaryBytes() { + return currentPrimaryBytes.get(); } - public long getTotalReplicaBytes() { - return totalReplicaBytes.get(); + public long getCurrentReplicaBytes() { + return currentReplicaBytes.get(); } public IndexingPressureStats stats() { - return new IndexingPressureStats(totalCoordinatingAndPrimaryBytes.get(), totalReplicaBytes.get(), - currentCoordinatingAndPrimaryBytes.get(), currentReplicaBytes.get(), coordinatingAndPrimaryRejections.get(), - replicaRejections.get()); + return new IndexingPressureStats(totalCombinedCoordinatingAndPrimaryBytes.get(), totalCoordinatingBytes.get(), + totalPrimaryBytes.get(), totalReplicaBytes.get(), currentCombinedCoordinatingAndPrimaryBytes.get(), + currentCoordinatingBytes.get(), currentPrimaryBytes.get(), currentReplicaBytes.get(), coordinatingRejections.get(), + primaryRejections.get(), replicaRejections.get()); } } diff --git a/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java b/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java index 309cf863b6324..df08ee9f1cd3b 100644 --- a/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java +++ b/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java @@ -29,39 +29,67 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment { - private final long totalCoordinatingAndPrimaryBytes; + + private final long totalCombinedCoordinatingAndPrimaryBytes; + private final long totalCoordinatingBytes; + private final long totalPrimaryBytes; private final long totalReplicaBytes; - private final long currentCoordinatingAndPrimaryBytes; + + private final long currentCombinedCoordinatingAndPrimaryBytes; + private final long currentCoordinatingBytes; + private final long currentPrimaryBytes; private final long currentReplicaBytes; - private final long coordinatingAndPrimaryRejections; + private final long coordinatingRejections; + private final long primaryRejections; private final long replicaRejections; public IndexingPressureStats(StreamInput in) throws IOException { - totalCoordinatingAndPrimaryBytes = in.readVLong(); + totalCombinedCoordinatingAndPrimaryBytes = in.readVLong(); + totalCoordinatingBytes = in.readVLong(); + totalPrimaryBytes = in.readVLong(); totalReplicaBytes = in.readVLong(); - currentCoordinatingAndPrimaryBytes = in.readVLong(); + + currentCombinedCoordinatingAndPrimaryBytes = in.readVLong(); + currentCoordinatingBytes = in.readVLong(); + currentPrimaryBytes = in.readVLong(); currentReplicaBytes = in.readVLong(); - coordinatingAndPrimaryRejections = in.readVLong(); + + coordinatingRejections = in.readVLong(); + primaryRejections = in.readVLong(); replicaRejections = in.readVLong(); } - public IndexingPressureStats(long totalCoordinatingAndPrimaryBytes, long totalReplicaBytes, long currentCoordinatingAndPrimaryBytes, - long currentReplicaBytes, long coordinatingAndPrimaryRejections, long replicaRejections) { - this.totalCoordinatingAndPrimaryBytes = totalCoordinatingAndPrimaryBytes; + public IndexingPressureStats(long totalCombinedCoordinatingAndPrimaryBytes, long totalCoordinatingBytes, long totalPrimaryBytes, + long totalReplicaBytes, long currentCombinedCoordinatingAndPrimaryBytes, long currentCoordinatingBytes, + long currentPrimaryBytes, long currentReplicaBytes, long coordinatingRejections, long primaryRejections, + long replicaRejections) { + this.totalCombinedCoordinatingAndPrimaryBytes = totalCombinedCoordinatingAndPrimaryBytes; + this.totalCoordinatingBytes = totalCoordinatingBytes; + this.totalPrimaryBytes = totalPrimaryBytes; this.totalReplicaBytes = totalReplicaBytes; - this.currentCoordinatingAndPrimaryBytes = currentCoordinatingAndPrimaryBytes; + this.currentCombinedCoordinatingAndPrimaryBytes = currentCombinedCoordinatingAndPrimaryBytes; + this.currentCoordinatingBytes = currentCoordinatingBytes; + this.currentPrimaryBytes = currentPrimaryBytes; this.currentReplicaBytes = currentReplicaBytes; - this.coordinatingAndPrimaryRejections = coordinatingAndPrimaryRejections; + this.coordinatingRejections = coordinatingRejections; + this.primaryRejections = primaryRejections; this.replicaRejections = replicaRejections; } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(totalCoordinatingAndPrimaryBytes); + out.writeVLong(totalCombinedCoordinatingAndPrimaryBytes); + out.writeVLong(totalCoordinatingBytes); + out.writeVLong(totalPrimaryBytes); out.writeVLong(totalReplicaBytes); - out.writeVLong(currentCoordinatingAndPrimaryBytes); + + out.writeVLong(currentCombinedCoordinatingAndPrimaryBytes); + out.writeVLong(currentCoordinatingBytes); + out.writeVLong(currentPrimaryBytes); out.writeVLong(currentReplicaBytes); - out.writeVLong(coordinatingAndPrimaryRejections); + + out.writeVLong(coordinatingRejections); + out.writeVLong(primaryRejections); out.writeVLong(replicaRejections); } @@ -69,16 +97,21 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("indexing_pressure"); builder.startObject("total"); - builder.field("coordinating_and_primary_bytes", totalCoordinatingAndPrimaryBytes); + builder.field("combined_coordinating_and_primary_bytes", totalCombinedCoordinatingAndPrimaryBytes); + builder.field("coordinating_bytes", totalCoordinatingBytes); + builder.field("primary_bytes", totalPrimaryBytes); builder.field("replica_bytes", totalReplicaBytes); - builder.field("all_bytes", totalReplicaBytes + totalCoordinatingAndPrimaryBytes); - builder.field("coordinating_and_primary_memory_limit_rejections", coordinatingAndPrimaryRejections); - builder.field("replica_memory_limit_rejections", replicaRejections); + builder.field("all_bytes", totalReplicaBytes + totalCombinedCoordinatingAndPrimaryBytes); + builder.field("coordinating_rejections", coordinatingRejections); + builder.field("primary_rejections", primaryRejections); + builder.field("replica_rejections", replicaRejections); builder.endObject(); builder.startObject("current"); - builder.field("coordinating_and_primary_bytes", currentCoordinatingAndPrimaryBytes); + builder.field("combined_coordinating_and_primary_bytes", currentCombinedCoordinatingAndPrimaryBytes); + builder.field("coordinating_bytes", currentCoordinatingBytes); + builder.field("primary_bytes", currentPrimaryBytes); builder.field("replica_bytes", currentReplicaBytes); - builder.field("all_bytes", currentCoordinatingAndPrimaryBytes + currentReplicaBytes); + builder.field("all_bytes", currentCombinedCoordinatingAndPrimaryBytes + currentReplicaBytes); builder.endObject(); return builder.endObject(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 4f14ac5815507..3ec52bef0a1d4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1166,14 +1166,24 @@ private void assertAllPendingWriteLimitsReleased() throws Exception { assertBusy(() -> { for (NodeAndClient nodeAndClient : nodes.values()) { IndexingPressure indexingPressure = getInstance(IndexingPressure.class, nodeAndClient.name); - final long writeBytes = indexingPressure.getCurrentCoordinatingAndPrimaryBytes(); - if (writeBytes > 0) { - throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node [" + final long combinedBytes = indexingPressure.getCurrentCombinedCoordinatingAndPrimaryBytes(); + if (combinedBytes > 0) { + throw new AssertionError("pending combined bytes [" + combinedBytes + "] bytes on node [" + + nodeAndClient.name + "]."); + } + final long coordinatingBytes = indexingPressure.getCurrentCoordinatingBytes(); + if (coordinatingBytes > 0) { + throw new AssertionError("pending coordinating bytes [" + coordinatingBytes + "] bytes on node [" + + nodeAndClient.name + "]."); + } + final long primaryBytes = indexingPressure.getCurrentPrimaryBytes(); + if (primaryBytes > 0) { + throw new AssertionError("pending primary bytes [" + primaryBytes + "] bytes on node [" + nodeAndClient.name + "]."); } final long replicaWriteBytes = indexingPressure.getCurrentReplicaBytes(); if (replicaWriteBytes > 0) { - throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node [" + throw new AssertionError("pending replica write bytes [" + combinedBytes + "] bytes on node [" + nodeAndClient.name + "]."); } } diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index 08e309cf443c9..9adab9f64302d 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -122,7 +122,7 @@ public void testWriteLimitsIncremented() throws Exception { assertBusy(() -> { // The actual write bytes will be greater due to other request fields. However, this test is // just spot checking that the bytes are incremented at all. - assertTrue(memoryLimits.getCurrentCoordinatingAndPrimaryBytes() > finalSourceSize); + assertTrue(memoryLimits.getCurrentCombinedCoordinatingAndPrimaryBytes() > finalSourceSize); }); blocker.countDown(); assertBusy(() -> { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 9da48580702fa..48fc9f5a96954 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -68,7 +68,7 @@ public TransportBulkShardOperationsAction( @Override protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener listener) { // This is executed on the follower coordinator node and we need to mark the bytes. - Releasable releasable = indexingPressure.markIndexingOperationStarted(primaryOperationSize(request)); + Releasable releasable = indexingPressure.markCoordinatingOperationStarted(primaryOperationSize(request)); ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); try { super.doExecute(task, request, releasingListener);