diff --git a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java index d3e56362a9b9e..8c2c9cb757d62 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java +++ b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java @@ -112,7 +112,7 @@ public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution } public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) { - long replicaWriteBytes = this.currentReplicaBytes.getAndAdd(bytes); + long replicaWriteBytes = this.currentReplicaBytes.addAndGet(bytes); if (forceExecution == false && replicaWriteBytes > replicaLimits) { long replicaBytesWithoutOperation = replicaWriteBytes - bytes; this.currentReplicaBytes.getAndAdd(-bytes); diff --git a/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java b/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java index 5654a84251ce1..2396e047da243 100644 --- a/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java +++ b/server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java @@ -94,6 +94,50 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(replicaRejections); } + public long getTotalCombinedCoordinatingAndPrimaryBytes() { + return totalCombinedCoordinatingAndPrimaryBytes; + } + + public long getTotalCoordinatingBytes() { + return totalCoordinatingBytes; + } + + public long getTotalPrimaryBytes() { + return totalPrimaryBytes; + } + + public long getTotalReplicaBytes() { + return totalReplicaBytes; + } + + public long getCurrentCombinedCoordinatingAndPrimaryBytes() { + return currentCombinedCoordinatingAndPrimaryBytes; + } + + public long getCurrentCoordinatingBytes() { + return currentCoordinatingBytes; + } + + public long getCurrentPrimaryBytes() { + return currentPrimaryBytes; + } + + public long getCurrentReplicaBytes() { + return currentReplicaBytes; + } + + public long getCoordinatingRejections() { + return coordinatingRejections; + } + + public long getPrimaryRejections() { + return primaryRejections; + } + + public long getReplicaRejections() { + return replicaRejections; + } + private static final String COMBINED = "combined_coordinating_and_primary"; private static final String COMBINED_IN_BYTES = "combined_coordinating_and_primary_in_bytes"; private static final String COORDINATING = "coordinating"; diff --git a/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java b/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java new file mode 100644 index 0000000000000..ad4c78ad730cb --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java @@ -0,0 +1,136 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.stats.IndexingPressureStats; +import org.elasticsearch.test.ESTestCase; + +public class IndexingPressureTests extends ESTestCase { + + private final Settings settings = Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "10KB").build(); + + public void testMemoryBytesMarkedAndReleased() { + IndexingPressure indexingPressure = new IndexingPressure(settings); + try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(10); + Releasable coordinating2 = indexingPressure.markCoordinatingOperationStarted(50); + Releasable primary = indexingPressure.markPrimaryOperationStarted(15, true); + Releasable primary2 = indexingPressure.markPrimaryOperationStarted(5, false); + Releasable replica = indexingPressure.markReplicaOperationStarted(25, true); + Releasable replica2 = indexingPressure.markReplicaOperationStarted(10, false)) { + IndexingPressureStats stats = indexingPressure.stats(); + assertEquals(60, stats.getCurrentCoordinatingBytes()); + assertEquals(20, stats.getCurrentPrimaryBytes()); + assertEquals(80, stats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(35, stats.getCurrentReplicaBytes()); + } + IndexingPressureStats stats = indexingPressure.stats(); + assertEquals(0, stats.getCurrentCoordinatingBytes()); + assertEquals(0, stats.getCurrentPrimaryBytes()); + assertEquals(0, stats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(0, stats.getCurrentReplicaBytes()); + assertEquals(60, stats.getTotalCoordinatingBytes()); + assertEquals(20, stats.getTotalPrimaryBytes()); + assertEquals(80, stats.getTotalCombinedCoordinatingAndPrimaryBytes()); + assertEquals(35, stats.getTotalReplicaBytes()); + } + + public void testAvoidDoubleAccounting() { + IndexingPressure indexingPressure = new IndexingPressure(settings); + try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(10); + Releasable primary = indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(15)) { + IndexingPressureStats stats = indexingPressure.stats(); + assertEquals(10, stats.getCurrentCoordinatingBytes()); + assertEquals(15, stats.getCurrentPrimaryBytes()); + assertEquals(10, stats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + } + IndexingPressureStats stats = indexingPressure.stats(); + assertEquals(0, stats.getCurrentCoordinatingBytes()); + assertEquals(0, stats.getCurrentPrimaryBytes()); + assertEquals(0, stats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(10, stats.getTotalCoordinatingBytes()); + assertEquals(15, stats.getTotalPrimaryBytes()); + assertEquals(10, stats.getTotalCombinedCoordinatingAndPrimaryBytes()); + } + + public void testCoordinatingPrimaryRejections() { + IndexingPressure indexingPressure = new IndexingPressure(settings); + try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1024 * 3); + Releasable primary = indexingPressure.markPrimaryOperationStarted(1024 * 3, false); + Releasable replica = indexingPressure.markReplicaOperationStarted(1024 * 3, false)) { + if (randomBoolean()) { + expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markCoordinatingOperationStarted(1024 * 2)); + IndexingPressureStats stats = indexingPressure.stats(); + assertEquals(1, stats.getCoordinatingRejections()); + assertEquals(1024 * 6, stats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + } else { + expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markPrimaryOperationStarted(1024 * 2, false)); + IndexingPressureStats stats = indexingPressure.stats(); + assertEquals(1, stats.getPrimaryRejections()); + assertEquals(1024 * 6, stats.getCurrentCombinedCoordinatingAndPrimaryBytes()); + } + long preForceRejections = indexingPressure.stats().getPrimaryRejections(); + // Primary can be forced + Releasable forced = indexingPressure.markPrimaryOperationStarted(1024 * 2, true); + assertEquals(preForceRejections, indexingPressure.stats().getPrimaryRejections()); + assertEquals(1024 * 8, indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + forced.close(); + + // Local to coordinating node primary actions not rejected + IndexingPressureStats preLocalStats = indexingPressure.stats(); + Releasable local = indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(1024 * 2); + assertEquals(preLocalStats.getPrimaryRejections(), indexingPressure.stats().getPrimaryRejections()); + assertEquals(1024 * 6, indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes()); + assertEquals(preLocalStats.getCurrentPrimaryBytes() + 1024 * 2, indexingPressure.stats().getCurrentPrimaryBytes()); + local.close(); + } + + assertEquals(1024 * 8, indexingPressure.stats().getTotalCombinedCoordinatingAndPrimaryBytes()); + } + + public void testReplicaRejections() { + IndexingPressure indexingPressure = new IndexingPressure(settings); + try (Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1024 * 3); + Releasable primary = indexingPressure.markPrimaryOperationStarted(1024 * 3, false); + Releasable replica = indexingPressure.markReplicaOperationStarted(1024 * 3, false)) { + // Replica will not be rejected until replica bytes > 15KB + Releasable replica2 = indexingPressure.markReplicaOperationStarted(1024 * 11, false); + assertEquals(1024 * 14, indexingPressure.stats().getCurrentReplicaBytes()); + // Replica will be rejected once we cross 15KB + expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markReplicaOperationStarted(1024 * 2, false)); + IndexingPressureStats stats = indexingPressure.stats(); + assertEquals(1, stats.getReplicaRejections()); + assertEquals(1024 * 14, stats.getCurrentReplicaBytes()); + + // Replica can be forced + Releasable forced = indexingPressure.markPrimaryOperationStarted(1024 * 2, true); + assertEquals(1, indexingPressure.stats().getReplicaRejections()); + assertEquals(1024 * 14, indexingPressure.stats().getCurrentReplicaBytes()); + forced.close(); + + replica2.close(); + forced.close(); + } + + assertEquals(1024 * 14, indexingPressure.stats().getTotalReplicaBytes()); + } +}