Skip to content

Commit

Permalink
Capture stack traces while issuing IndexShard operations permits to e…
Browse files Browse the repository at this point in the history
…asy debugging (#28567)

Today we acquire a permit from the shard to coordinate between indexing operations, recoveries and other state transitions. When we leak an  permit it's practically impossible to find who the culprit is. This PR add stack traces capturing for each permit so we can identify which part of the code is responsible for acquiring the unreleased permit. This code is only active when assertions are active. 

The output is something like:
```
java.lang.AssertionError: shard [test][1] on node [node_s0] has pending operations:
--> java.lang.RuntimeException: something helpful 2
	at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:223)
	at org.elasticsearch.index.shard.IndexShard.<init>(IndexShard.java:322)
	at org.elasticsearch.index.IndexService.createShard(IndexService.java:382)
	at org.elasticsearch.indices.IndicesService.createShard(IndicesService.java:514)
	at org.elasticsearch.indices.IndicesService.createShard(IndicesService.java:143)
	at org.elasticsearch.indices.cluster.IndicesClusterStateService.createShard(IndicesClusterStateService.java:552)
	at org.elasticsearch.indices.cluster.IndicesClusterStateService.createOrUpdateShards(IndicesClusterStateService.java:529)
	at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:231)
	at org.elasticsearch.cluster.service.ClusterApplierService.lambda$callClusterStateAppliers$6(ClusterApplierService.java:498)
	at java.base/java.lang.Iterable.forEach(Iterable.java:75)
	at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:495)
	at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:482)
	at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:432)
	at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:161)
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:566)
	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:244)
	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:207)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
	at java.base/java.lang.Thread.run(Thread.java:844)

--> java.lang.RuntimeException: something helpful
	at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:223)
	at org.elasticsearch.index.shard.IndexShard.<init>(IndexShard.java:311)
	at org.elasticsearch.index.IndexService.createShard(IndexService.java:382)
	at org.elasticsearch.indices.IndicesService.createShard(IndicesService.java:514)
	at org.elasticsearch.indices.IndicesService.createShard(IndicesService.java:143)
	at org.elasticsearch.indices.cluster.IndicesClusterStateService.createShard(IndicesClusterStateService.java:552)
	at org.elasticsearch.indices.cluster.IndicesClusterStateService.createOrUpdateShards(IndicesClusterStateService.java:529)
	at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:231)
	at org.elasticsearch.cluster.service.ClusterApplierService.lambda$callClusterStateAppliers$6(ClusterApplierService.java:498)
	at java.base/java.lang.Iterable.forEach(Iterable.java:75)
	at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:495)
	at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:482)
	at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:432)
	at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:161)
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:566)
	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:244)
	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:207)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
	at java.base/java.lang.Thread.run(Thread.java:844)

```
  • Loading branch information
bleskes authored Feb 8, 2018
1 parent 5b8870f commit ba59cf1
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrima

/**
* Synchronously execute the specified replica operation. This is done under a permit from
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String)}.
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String, Object)}.
*
* @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on
Expand Down Expand Up @@ -317,7 +317,7 @@ class AsyncPrimaryAction extends AbstractRunnable implements ActionListener<Prim

@Override
protected void doRun() throws Exception {
acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this);
acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this, request);
}

@Override
Expand Down Expand Up @@ -638,7 +638,7 @@ protected void doRun() throws Exception {
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor);
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor, request);
}

/**
Expand Down Expand Up @@ -950,7 +950,7 @@ void retryBecauseUnavailable(ShardId shardId, String message) {
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}).
*/
private void acquirePrimaryShardReference(ShardId shardId, String allocationId, long primaryTerm,
ActionListener<PrimaryShardReference> onReferenceAcquired) {
ActionListener<PrimaryShardReference> onReferenceAcquired, Object debugInfo) {
IndexShard indexShard = getIndexShard(shardId);
// we may end up here if the cluster state used to route the primary is so stale that the underlying
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
Expand Down Expand Up @@ -981,7 +981,7 @@ public void onFailure(Exception e) {
}
};

indexShard.acquirePrimaryOperationPermit(onAcquired, executor);
indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo);
}

class ShardReference implements Releasable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ private void maybeSyncGlobalCheckpoints() {
e);
}
}),
ThreadPool.Names.SAME);
ThreadPool.Names.SAME, "background global checkpoint sync");
} catch (final AlreadyClosedException | IndexShardClosedException e) {
// the shard was closed concurrently, continue
}
Expand Down
26 changes: 21 additions & 5 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2192,19 +2192,23 @@ private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, final boole
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
* ActionListener will then be called using the provided executor.
*
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled
* the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object
* isn't used
*/
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay) {
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo) {
verifyNotClosed();
verifyPrimary();

indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false);
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
}

private final Object primaryTermMutex = new Object();

/**
* Acquire a replica operation permit whenever the shard is ready for indexing (see
* {@link #acquirePrimaryOperationPermit(ActionListener, String)}). If the given primary term is lower than then one in
* {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
* {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified
* name.
Expand All @@ -2213,9 +2217,13 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
* @param globalCheckpoint the global checkpoint associated with the request
* @param onPermitAcquired the listener for permit acquisition
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled
* the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object
* isn't used
*/
public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final long globalCheckpoint,
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
final Object debugInfo) {
verifyNotClosed();
verifyReplicationTarget();
final boolean globalCheckpointUpdated;
Expand Down Expand Up @@ -2301,13 +2309,21 @@ public void onFailure(final Exception e) {
}
},
executorOnDelay,
true);
true, debugInfo);
}

public int getActiveOperationsCount() {
return indexShardOperationPermits.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close
}

/**
* @return a list of containing an exception for each operation permit that wasn't released yet. The stack traces of the exceptions
* was captured when the operation acquired the permit and their message contains the debug information supplied at the time.
*/
public List<Throwable> getActiveOperations() {
return indexShardOperationPermits.getActiveOperations();
}

private final AsyncIOProcessor<Translog.Location> translogSyncProcessor = new AsyncIOProcessor<Translog.Location>(logger, 1024) {
@Override
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Assertions;
import org.elasticsearch.action.ActionListener;
Expand All @@ -33,6 +32,8 @@
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -53,10 +54,14 @@ final class IndexShardOperationPermits implements Closeable {

static final int TOTAL_PERMITS = Integer.MAX_VALUE;
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved
private final List<ActionListener<Releasable>> delayedOperations = new ArrayList<>(); // operations that are delayed
private final List<DelayedOperation> delayedOperations = new ArrayList<>(); // operations that are delayed
private volatile boolean closed;
private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this

// only valid when assertions are enabled. Key is AtomicBoolean associated with each permit to ensure close once semantics. Value is an
// exception with some extra info in the message + a stack trace of the acquirer
private final Map<AtomicBoolean, Throwable> issuedPermits;

/**
* Construct operation permits for the specified shards.
*
Expand All @@ -66,6 +71,11 @@ final class IndexShardOperationPermits implements Closeable {
IndexShardOperationPermits(final ShardId shardId, final ThreadPool threadPool) {
this.shardId = shardId;
this.threadPool = threadPool;
if (Assertions.ENABLED) {
issuedPermits = new ConcurrentHashMap<>();
} else {
issuedPermits = null;
}
}

@Override
Expand Down Expand Up @@ -167,7 +177,7 @@ private <E extends Exception> void doBlockOperations(
}

private void releaseDelayedOperations() {
final List<ActionListener<Releasable>> queuedActions;
final List<DelayedOperation> queuedActions;
synchronized (this) {
assert delayed;
queuedActions = new ArrayList<>(delayedOperations);
Expand All @@ -185,8 +195,8 @@ private void releaseDelayedOperations() {
* recovery
*/
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
for (ActionListener<Releasable> queuedAction : queuedActions) {
acquire(queuedAction, null, false);
for (DelayedOperation queuedAction : queuedActions) {
acquire(queuedAction.listener, null, false, queuedAction.debugInfo);
}
});
}
Expand All @@ -204,8 +214,24 @@ private void releaseDelayedOperations() {
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
* @param executorOnDelay executor to use for the possibly delayed {@link ActionListener#onResponse(Object)} call
* @param forceExecution whether the runnable should force its execution in case it gets rejected
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled
* the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object
* isn't used
*
*/
public void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution) {
public void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution,
final Object debugInfo) {
final Throwable debugInfoWithStackTrace;
if (Assertions.ENABLED) {
debugInfoWithStackTrace = new Throwable(debugInfo.toString());
} else {
debugInfoWithStackTrace = null;
}
acquire(onAcquired, executorOnDelay, forceExecution, debugInfoWithStackTrace);
}

private void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution,
final Throwable debugInfo) {
if (closed) {
onAcquired.onFailure(new IndexShardClosedException(shardId));
return;
Expand All @@ -215,16 +241,18 @@ public void acquire(final ActionListener<Releasable> onAcquired, final String ex
synchronized (this) {
if (delayed) {
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
final ActionListener<Releasable> wrappedListener;
if (executorOnDelay != null) {
delayedOperations.add(
new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
wrappedListener =
new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution);
} else {
delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
}
delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo));
return;
} else {
releasable = acquire();
releasable = acquire(debugInfo);
}
}
} catch (final InterruptedException e) {
Expand All @@ -235,15 +263,23 @@ public void acquire(final ActionListener<Releasable> onAcquired, final String ex
onAcquired.onResponse(releasable);
}

private Releasable acquire() throws InterruptedException {
private Releasable acquire(Throwable debugInfo) throws InterruptedException {
assert Thread.holdsLock(this);
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting
final AtomicBoolean closed = new AtomicBoolean();
return () -> {
final Releasable releasable = () -> {
if (closed.compareAndSet(false, true)) {
if (Assertions.ENABLED) {
Throwable e = issuedPermits.remove(closed);
assert e != null;
}
semaphore.release(1);
}
};
if (Assertions.ENABLED) {
issuedPermits.put(closed, debugInfo);
}
return releasable;
} else {
// this should never happen, if it does something is deeply wrong
throw new IllegalStateException("failed to obtain permit but operations are not delayed");
Expand All @@ -269,6 +305,28 @@ int getActiveOperationsCount() {
}
}

/**
* @return a list of containing an exception for each permit that wasn't released yet. The stack traces of the exceptions
* was captured when the operation acquired the permit and their message contains the debug information supplied at the time.
*/
List<Throwable> getActiveOperations() {
return new ArrayList<>(issuedPermits.values());
}

private static class DelayedOperation {
private final ActionListener<Releasable> listener;
private final Throwable debugInfo;

private DelayedOperation(ActionListener<Releasable> listener, Throwable debugInfo) {
this.listener = listener;
if (Assertions.ENABLED) {
this.debugInfo = new Throwable("delayed", debugInfo);
} else {
this.debugInfo = null;
}
}
}

/**
* A permit-aware action listener wrapper that spawns onResponse listener invocations off on a configurable thread-pool.
* Being permit-aware, it also releases the permit when hitting thread-pool rejections and falls back to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
});
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ");

try (Closeable ignored = shard.acquireTranslogRetentionLock()) {

Expand Down Expand Up @@ -198,7 +198,8 @@ public RecoveryResponse recoverToTarget() throws IOException {
* make sure to do this before sampling the max sequence number in the next step, to ensure that we send
* all documents up to maxSeqNo in phase2.
*/
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
shardId + " initiating tracking of " + request.targetAllocationId());

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
Expand Down Expand Up @@ -229,10 +230,10 @@ private boolean isTargetSameHistory() {
return targetHistoryUUID != null && targetHistoryUUID.equals(shard.getHistoryUUID());
}

private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) {
private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason) {
cancellableThreads.execute(() -> {
final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();
shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME);
shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
try (Releasable ignored = onAcquired.actionGet()) {
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
// races, as IndexShard will change to RELOCATED only when it holds all operation permits, see IndexShard.relocated()
Expand Down Expand Up @@ -493,10 +494,12 @@ public void finalizeRecovery(final long targetLocalCheckpoint) throws IOExceptio
* marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire
* the permit then the state of the shard will be relocated and this recovery will fail.
*/
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint));
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
shardId + " marking " + request.targetAllocationId() + " as in sync");
final long globalCheckpoint = shard.getGlobalCheckpoint();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint));
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint");

if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
Expand Down
Loading

0 comments on commit ba59cf1

Please sign in to comment.