Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture stack traces while issuing IndexShard operations permits to easy debugging #28567

Merged
merged 4 commits into from
Feb 8, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrima

/**
* Synchronously execute the specified replica operation. This is done under a permit from
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String)}.
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String, Object)}.
*
* @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on
Expand Down Expand Up @@ -317,7 +317,7 @@ class AsyncPrimaryAction extends AbstractRunnable implements ActionListener<Prim

@Override
protected void doRun() throws Exception {
acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this);
acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this, request);
}

@Override
Expand Down Expand Up @@ -638,7 +638,7 @@ protected void doRun() throws Exception {
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor);
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor, request);
}

/**
Expand Down Expand Up @@ -950,7 +950,7 @@ void retryBecauseUnavailable(ShardId shardId, String message) {
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}).
*/
private void acquirePrimaryShardReference(ShardId shardId, String allocationId, long primaryTerm,
ActionListener<PrimaryShardReference> onReferenceAcquired) {
ActionListener<PrimaryShardReference> onReferenceAcquired, Object debugInfo) {
IndexShard indexShard = getIndexShard(shardId);
// we may end up here if the cluster state used to route the primary is so stale that the underlying
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
Expand Down Expand Up @@ -981,7 +981,7 @@ public void onFailure(Exception e) {
}
};

indexShard.acquirePrimaryOperationPermit(onAcquired, executor);
indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo);
}

class ShardReference implements Releasable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ private void maybeSyncGlobalCheckpoints() {
e);
}
}),
ThreadPool.Names.SAME);
ThreadPool.Names.SAME, "background global checkpoint sync");
} catch (final AlreadyClosedException | IndexShardClosedException e) {
// the shard was closed concurrently, continue
}
Expand Down
26 changes: 21 additions & 5 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2192,19 +2192,23 @@ private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, final boole
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
* ActionListener will then be called using the provided executor.
*
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled
* the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object
* isn't used
*/
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay) {
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo) {
verifyNotClosed();
verifyPrimary();

indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false);
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
}

private final Object primaryTermMutex = new Object();

/**
* Acquire a replica operation permit whenever the shard is ready for indexing (see
* {@link #acquirePrimaryOperationPermit(ActionListener, String)}). If the given primary term is lower than then one in
* {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
* {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified
* name.
Expand All @@ -2213,9 +2217,13 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
* @param globalCheckpoint the global checkpoint associated with the request
* @param onPermitAcquired the listener for permit acquisition
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled
* the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object
* isn't used
*/
public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final long globalCheckpoint,
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
final Object debugInfo) {
verifyNotClosed();
verifyReplicationTarget();
final boolean globalCheckpointUpdated;
Expand Down Expand Up @@ -2301,13 +2309,21 @@ public void onFailure(final Exception e) {
}
},
executorOnDelay,
true);
true, debugInfo);
}

public int getActiveOperationsCount() {
return indexShardOperationPermits.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close
}

/**
* @return a list of containing an exceptions for each operation permit that wasn't released yet. The stack traces of the exceptions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there seems to be a missing word

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an unneeded s in exceptions and a missing s in contain. Can you clarify?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I thought it meant to say "a list of items/things/XYZ containing an exception ..."

* was captured when the operation acquired the permit and their message contain the debug information supplied at the time.
*/
public List<RuntimeException> getActiveOperations() {
return indexShardOperationPermits.getActiveOperations();
}

private final AsyncIOProcessor<Translog.Location> translogSyncProcessor = new AsyncIOProcessor<Translog.Location>(logger, 1024) {
@Override
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Assertions;
import org.elasticsearch.action.ActionListener;
Expand All @@ -33,6 +32,8 @@
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -53,10 +54,14 @@ final class IndexShardOperationPermits implements Closeable {

static final int TOTAL_PERMITS = Integer.MAX_VALUE;
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved
private final List<ActionListener<Releasable>> delayedOperations = new ArrayList<>(); // operations that are delayed
private final List<DelayedOperation> delayedOperations = new ArrayList<>(); // operations that are delayed
private volatile boolean closed;
private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this

// only valid when assertions are enabled. Key is AtomicBoolean associated with each permit to ensure close once semantics. Value is an
// exception with some extra info in the message + a stack trace of the acquirer
private final Map<AtomicBoolean, RuntimeException> issuedPermits;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any particular reason to use RuntimeException, and not just Throwable like in MockPageCacheRecycler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jason scared me into never using Throwable. will switch.


/**
* Construct operation permits for the specified shards.
*
Expand All @@ -66,6 +71,11 @@ final class IndexShardOperationPermits implements Closeable {
IndexShardOperationPermits(final ShardId shardId, final ThreadPool threadPool) {
this.shardId = shardId;
this.threadPool = threadPool;
if (Assertions.ENABLED) {
issuedPermits = new ConcurrentHashMap<>();
} else {
issuedPermits = null;
}
}

@Override
Expand Down Expand Up @@ -167,7 +177,7 @@ private <E extends Exception> void doBlockOperations(
}

private void releaseDelayedOperations() {
final List<ActionListener<Releasable>> queuedActions;
final List<DelayedOperation> queuedActions;
synchronized (this) {
assert delayed;
queuedActions = new ArrayList<>(delayedOperations);
Expand All @@ -185,8 +195,8 @@ private void releaseDelayedOperations() {
* recovery
*/
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
for (ActionListener<Releasable> queuedAction : queuedActions) {
acquire(queuedAction, null, false);
for (DelayedOperation queuedAction : queuedActions) {
acquire(queuedAction.listener, null, false, queuedAction.debugInfo);
}
});
}
Expand All @@ -204,8 +214,24 @@ private void releaseDelayedOperations() {
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
* @param executorOnDelay executor to use for the possibly delayed {@link ActionListener#onResponse(Object)} call
* @param forceExecution whether the runnable should force its execution in case it gets rejected
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled
* the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object
* isn't used
*
*/
public void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution) {
public void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution,
final Object debugInfo) {
final RuntimeException debugInfoWithStackTrace;
if (Assertions.ENABLED) {
debugInfoWithStackTrace = new RuntimeException(debugInfo.toString());
} else {
debugInfoWithStackTrace = null;
}
acquire(onAcquired, executorOnDelay, forceExecution, debugInfoWithStackTrace);
}

private void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution,
final RuntimeException debugInfo) {
if (closed) {
onAcquired.onFailure(new IndexShardClosedException(shardId));
return;
Expand All @@ -215,16 +241,18 @@ public void acquire(final ActionListener<Releasable> onAcquired, final String ex
synchronized (this) {
if (delayed) {
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
final ActionListener<Releasable> wrappedListener;
if (executorOnDelay != null) {
delayedOperations.add(
new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
wrappedListener =
new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution);
} else {
delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
}
delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo));
return;
} else {
releasable = acquire();
releasable = acquire(debugInfo);
}
}
} catch (final InterruptedException e) {
Expand All @@ -235,15 +263,23 @@ public void acquire(final ActionListener<Releasable> onAcquired, final String ex
onAcquired.onResponse(releasable);
}

private Releasable acquire() throws InterruptedException {
private Releasable acquire(RuntimeException debugInfo) throws InterruptedException {
assert Thread.holdsLock(this);
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting
final AtomicBoolean closed = new AtomicBoolean();
return () -> {
final Releasable releasable = () -> {
if (closed.compareAndSet(false, true)) {
if (Assertions.ENABLED) {
RuntimeException e = issuedPermits.remove(closed);
assert e != null;
}
semaphore.release(1);
}
};
if (Assertions.ENABLED) {
issuedPermits.put(closed, debugInfo);
}
return releasable;
} else {
// this should never happen, if it does something is deeply wrong
throw new IllegalStateException("failed to obtain permit but operations are not delayed");
Expand All @@ -269,6 +305,28 @@ int getActiveOperationsCount() {
}
}

/**
* @return a list of containing an exceptions for each permit that wasn't released yet. The stack traces of the exceptions
* was captured when the operation acquired the permit and their message contain the debug information supplied at the time.
*/
List<RuntimeException> getActiveOperations() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you test this method in IndexShardOperationPermitsTests that it returns something meaningful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, so I was doubting about that one. What is meaningful? I can check there is something for each open op and also the message contains the debug info. Is that what you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. To check that the active operations are actually captured.

return new ArrayList<>(issuedPermits.values());
}

private static class DelayedOperation {
private final ActionListener<Releasable> listener;
private final RuntimeException debugInfo;

private DelayedOperation(ActionListener<Releasable> listener, RuntimeException debugInfo) {
this.listener = listener;
if (Assertions.ENABLED) {
this.debugInfo = new RuntimeException("delayed", debugInfo);
} else {
this.debugInfo = null;
}
}
}

/**
* A permit-aware action listener wrapper that spawns onResponse listener invocations off on a configurable thread-pool.
* Being permit-aware, it also releases the permit when hitting thread-pool rejections and falls back to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
});
}, shardId + " validating recovery target registered");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add targetShardRouting??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


try (Closeable ignored = shard.acquireTranslogRetentionLock()) {

Expand Down Expand Up @@ -198,7 +198,8 @@ public RecoveryResponse recoverToTarget() throws IOException {
* make sure to do this before sampling the max sequence number in the next step, to ensure that we send
* all documents up to maxSeqNo in phase2.
*/
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
shardId + " initiating tracking of " + request.targetAllocationId());

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
Expand Down Expand Up @@ -229,10 +230,10 @@ private boolean isTargetSameHistory() {
return targetHistoryUUID != null && targetHistoryUUID.equals(shard.getHistoryUUID());
}

private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) {
private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason) {
cancellableThreads.execute(() -> {
final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();
shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME);
shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
try (Releasable ignored = onAcquired.actionGet()) {
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
// races, as IndexShard will change to RELOCATED only when it holds all operation permits, see IndexShard.relocated()
Expand Down Expand Up @@ -493,10 +494,12 @@ public void finalizeRecovery(final long targetLocalCheckpoint) throws IOExceptio
* marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire
* the permit then the state of the shard will be relocated and this recovery will fail.
*/
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint));
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
shardId + " marking " + request.targetAllocationId() + " as in sync");
final long globalCheckpoint = shard.getGlobalCheckpoint();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint));
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint");

if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
Expand Down
Loading