diff --git a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java index 585a0b7f4571e..f7bd40d009dbf 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java +++ b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java @@ -19,6 +19,8 @@ package org.elasticsearch.index; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -26,6 +28,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.stats.IndexingPressureStats; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class IndexingPressure { @@ -33,6 +36,8 @@ public class IndexingPressure { public static final Setting MAX_INDEXING_BYTES = Setting.memorySizeSetting("indexing_pressure.memory.limit", "10%", Setting.Property.NodeScope); + private static final Logger logger = LogManager.getLogger(IndexingPressure.class); + private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0); private final AtomicLong currentCoordinatingBytes = new AtomicLong(0); private final AtomicLong currentPrimaryBytes = new AtomicLong(0); @@ -55,6 +60,19 @@ public IndexingPressure(Settings settings) { this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5); } + + private static Releasable wrapReleasable(Releasable releasable) { + final AtomicBoolean called = new AtomicBoolean(); + return () -> { + if (called.compareAndSet(false, true)) { + releasable.close(); + } else { + logger.error("IndexingPressure memory is adjusted twice", new IllegalStateException("Releasable is called twice")); + assert false : "IndexingPressure is adjusted twice"; + } + }; + } + public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) { long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes); long replicaWriteBytes = this.currentReplicaBytes.get(); @@ -74,16 +92,16 @@ public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExec currentCoordinatingBytes.getAndAdd(bytes); totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes); totalCoordinatingBytes.getAndAdd(bytes); - return () -> { + return wrapReleasable(() -> { this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes); this.currentCoordinatingBytes.getAndAdd(-bytes); - }; + }); } public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(long bytes) { currentPrimaryBytes.getAndAdd(bytes); totalPrimaryBytes.getAndAdd(bytes); - return () -> this.currentPrimaryBytes.getAndAdd(-bytes); + return wrapReleasable(() -> this.currentPrimaryBytes.getAndAdd(-bytes)); } public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution) { @@ -105,10 +123,10 @@ public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution currentPrimaryBytes.getAndAdd(bytes); totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes); totalPrimaryBytes.getAndAdd(bytes); - return () -> { + return wrapReleasable(() -> { this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes); this.currentPrimaryBytes.getAndAdd(-bytes); - }; + }); } public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) { @@ -123,7 +141,7 @@ public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution "max_replica_bytes=" + replicaLimits + "]", false); } totalReplicaBytes.getAndAdd(bytes); - return () -> this.currentReplicaBytes.getAndAdd(-bytes); + return wrapReleasable(() -> this.currentReplicaBytes.getAndAdd(-bytes)); } public long getCurrentCombinedCoordinatingAndPrimaryBytes() { diff --git a/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java b/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java index 6cfcc4f329c96..bf38f90fe3ffd 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java @@ -126,9 +126,7 @@ public void testReplicaRejections() { assertEquals(1, indexingPressure.stats().getReplicaRejections()); assertEquals(1024 * 14, indexingPressure.stats().getCurrentReplicaBytes()); forced.close(); - replica2.close(); - forced.close(); } assertEquals(1024 * 14, indexingPressure.stats().getTotalReplicaBytes());