Skip to content

Commit

Permalink
Ensure IndexingPressure memory is re-adjusted once (#67673)
Browse files Browse the repository at this point in the history
We have seen a case where the memory of IndexingPressure was
re-adjusted twice. With this commit, we will log that error with a
stack trace so that we can figure out the source of the issue.
  • Loading branch information
dnhatn committed Jan 25, 2021
1 parent 847b6f4 commit 968e68d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
30 changes: 24 additions & 6 deletions server/src/main/java/org/elasticsearch/index/IndexingPressure.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,25 @@

package org.elasticsearch.index;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.stats.IndexingPressureStats;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class IndexingPressure {

public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES =
Setting.memorySizeSetting("indexing_pressure.memory.limit", "10%", Setting.Property.NodeScope);

private static final Logger logger = LogManager.getLogger(IndexingPressure.class);

private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
private final AtomicLong currentCoordinatingBytes = new AtomicLong(0);
private final AtomicLong currentPrimaryBytes = new AtomicLong(0);
Expand All @@ -55,6 +60,19 @@ public IndexingPressure(Settings settings) {
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
}


private static Releasable wrapReleasable(Releasable releasable) {
final AtomicBoolean called = new AtomicBoolean();
return () -> {
if (called.compareAndSet(false, true)) {
releasable.close();
} else {
logger.error("IndexingPressure memory is adjusted twice", new IllegalStateException("Releasable is called twice"));
assert false : "IndexingPressure is adjusted twice";
}
};
}

public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) {
long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
long replicaWriteBytes = this.currentReplicaBytes.get();
Expand All @@ -74,16 +92,16 @@ public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExec
currentCoordinatingBytes.getAndAdd(bytes);
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
totalCoordinatingBytes.getAndAdd(bytes);
return () -> {
return wrapReleasable(() -> {
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
this.currentCoordinatingBytes.getAndAdd(-bytes);
};
});
}

public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(long bytes) {
currentPrimaryBytes.getAndAdd(bytes);
totalPrimaryBytes.getAndAdd(bytes);
return () -> this.currentPrimaryBytes.getAndAdd(-bytes);
return wrapReleasable(() -> this.currentPrimaryBytes.getAndAdd(-bytes));
}

public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution) {
Expand All @@ -105,10 +123,10 @@ public Releasable markPrimaryOperationStarted(long bytes, boolean forceExecution
currentPrimaryBytes.getAndAdd(bytes);
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
totalPrimaryBytes.getAndAdd(bytes);
return () -> {
return wrapReleasable(() -> {
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
this.currentPrimaryBytes.getAndAdd(-bytes);
};
});
}

public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) {
Expand All @@ -123,7 +141,7 @@ public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution
"max_replica_bytes=" + replicaLimits + "]", false);
}
totalReplicaBytes.getAndAdd(bytes);
return () -> this.currentReplicaBytes.getAndAdd(-bytes);
return wrapReleasable(() -> this.currentReplicaBytes.getAndAdd(-bytes));
}

public long getCurrentCombinedCoordinatingAndPrimaryBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ public void testReplicaRejections() {
assertEquals(1, indexingPressure.stats().getReplicaRejections());
assertEquals(1024 * 14, indexingPressure.stats().getCurrentReplicaBytes());
forced.close();

replica2.close();
forced.close();
}

assertEquals(1024 * 14, indexingPressure.stats().getTotalReplicaBytes());
Expand Down

0 comments on commit 968e68d

Please sign in to comment.