diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 1a57b6a5d9500..4398c56f26c77 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -202,7 +202,7 @@ protected abstract PrimaryResult shardOperationOnPrima /** * Synchronously execute the specified replica operation. This is done under a permit from - * {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String)}. + * {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String, Object)}. * * @param shardRequest the request to the replica shard * @param replica the replica shard to perform the operation on @@ -317,7 +317,7 @@ class AsyncPrimaryAction extends AbstractRunnable implements ActionListener onReferenceAcquired) { + ActionListener onReferenceAcquired, Object debugInfo) { IndexShard indexShard = getIndexShard(shardId); // we may end up here if the cluster state used to route the primary is so stale that the underlying // index shard was replaced with a replica. For example - in a two node cluster, if the primary fails @@ -981,7 +981,7 @@ public void onFailure(Exception e) { } }; - indexShard.acquirePrimaryOperationPermit(onAcquired, executor); + indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo); } class ShardReference implements Releasable { diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 0285dcf93c1ea..cdcefa5e02bf2 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -775,7 +775,7 @@ private void maybeSyncGlobalCheckpoints() { e); } }), - ThreadPool.Names.SAME); + ThreadPool.Names.SAME, "background global checkpoint sync"); } catch (final AlreadyClosedException | IndexShardClosedException e) { // the shard was closed concurrently, continue } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8a203cb430888..29cd4abc8236b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2192,19 +2192,23 @@ private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, final boole * Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided * ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided * ActionListener will then be called using the provided executor. + * + * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled + * the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object + * isn't used */ - public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay) { + public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay, Object debugInfo) { verifyNotClosed(); verifyPrimary(); - indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false); + indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo); } private final Object primaryTermMutex = new Object(); /** * Acquire a replica operation permit whenever the shard is ready for indexing (see - * {@link #acquirePrimaryOperationPermit(ActionListener, String)}). If the given primary term is lower than then one in + * {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an * {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified * name. @@ -2213,9 +2217,13 @@ public void acquirePrimaryOperationPermit(ActionListener onPermitAcq * @param globalCheckpoint the global checkpoint associated with the request * @param onPermitAcquired the listener for permit acquisition * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed + * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled + * the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object + * isn't used */ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final long globalCheckpoint, - final ActionListener onPermitAcquired, final String executorOnDelay) { + final ActionListener onPermitAcquired, final String executorOnDelay, + final Object debugInfo) { verifyNotClosed(); verifyReplicationTarget(); final boolean globalCheckpointUpdated; @@ -2301,13 +2309,21 @@ public void onFailure(final Exception e) { } }, executorOnDelay, - true); + true, debugInfo); } public int getActiveOperationsCount() { return indexShardOperationPermits.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close } + /** + * @return a list of containing an exception for each operation permit that wasn't released yet. The stack traces of the exceptions + * was captured when the operation acquired the permit and their message contains the debug information supplied at the time. + */ + public List getActiveOperations() { + return indexShardOperationPermits.getActiveOperations(); + } + private final AsyncIOProcessor translogSyncProcessor = new AsyncIOProcessor(logger, 1024) { @Override protected void write(List>> candidates) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 75645198f5b6a..338b330a39418 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.shard; -import org.apache.logging.log4j.Logger; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Assertions; import org.elasticsearch.action.ActionListener; @@ -33,6 +32,8 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -53,10 +54,14 @@ final class IndexShardOperationPermits implements Closeable { static final int TOTAL_PERMITS = Integer.MAX_VALUE; final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved - private final List> delayedOperations = new ArrayList<>(); // operations that are delayed + private final List delayedOperations = new ArrayList<>(); // operations that are delayed private volatile boolean closed; private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this + // only valid when assertions are enabled. Key is AtomicBoolean associated with each permit to ensure close once semantics. Value is an + // exception with some extra info in the message + a stack trace of the acquirer + private final Map issuedPermits; + /** * Construct operation permits for the specified shards. * @@ -66,6 +71,11 @@ final class IndexShardOperationPermits implements Closeable { IndexShardOperationPermits(final ShardId shardId, final ThreadPool threadPool) { this.shardId = shardId; this.threadPool = threadPool; + if (Assertions.ENABLED) { + issuedPermits = new ConcurrentHashMap<>(); + } else { + issuedPermits = null; + } } @Override @@ -167,7 +177,7 @@ private void doBlockOperations( } private void releaseDelayedOperations() { - final List> queuedActions; + final List queuedActions; synchronized (this) { assert delayed; queuedActions = new ArrayList<>(delayedOperations); @@ -185,8 +195,8 @@ private void releaseDelayedOperations() { * recovery */ threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - for (ActionListener queuedAction : queuedActions) { - acquire(queuedAction, null, false); + for (DelayedOperation queuedAction : queuedActions) { + acquire(queuedAction.listener, null, false, queuedAction.debugInfo); } }); } @@ -204,8 +214,24 @@ private void releaseDelayedOperations() { * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed * @param executorOnDelay executor to use for the possibly delayed {@link ActionListener#onResponse(Object)} call * @param forceExecution whether the runnable should force its execution in case it gets rejected + * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled + * the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object + * isn't used + * */ - public void acquire(final ActionListener onAcquired, final String executorOnDelay, final boolean forceExecution) { + public void acquire(final ActionListener onAcquired, final String executorOnDelay, final boolean forceExecution, + final Object debugInfo) { + final Throwable debugInfoWithStackTrace; + if (Assertions.ENABLED) { + debugInfoWithStackTrace = new Throwable(debugInfo.toString()); + } else { + debugInfoWithStackTrace = null; + } + acquire(onAcquired, executorOnDelay, forceExecution, debugInfoWithStackTrace); + } + + private void acquire(final ActionListener onAcquired, final String executorOnDelay, final boolean forceExecution, + final Throwable debugInfo) { if (closed) { onAcquired.onFailure(new IndexShardClosedException(shardId)); return; @@ -215,16 +241,18 @@ public void acquire(final ActionListener onAcquired, final String ex synchronized (this) { if (delayed) { final Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(false); + final ActionListener wrappedListener; if (executorOnDelay != null) { - delayedOperations.add( - new PermitAwareThreadedActionListener(threadPool, executorOnDelay, - new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution)); + wrappedListener = + new PermitAwareThreadedActionListener(threadPool, executorOnDelay, + new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution); } else { - delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired)); + wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired); } + delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo)); return; } else { - releasable = acquire(); + releasable = acquire(debugInfo); } } } catch (final InterruptedException e) { @@ -235,15 +263,23 @@ public void acquire(final ActionListener onAcquired, final String ex onAcquired.onResponse(releasable); } - private Releasable acquire() throws InterruptedException { + private Releasable acquire(Throwable debugInfo) throws InterruptedException { assert Thread.holdsLock(this); if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting final AtomicBoolean closed = new AtomicBoolean(); - return () -> { + final Releasable releasable = () -> { if (closed.compareAndSet(false, true)) { + if (Assertions.ENABLED) { + Throwable e = issuedPermits.remove(closed); + assert e != null; + } semaphore.release(1); } }; + if (Assertions.ENABLED) { + issuedPermits.put(closed, debugInfo); + } + return releasable; } else { // this should never happen, if it does something is deeply wrong throw new IllegalStateException("failed to obtain permit but operations are not delayed"); @@ -269,6 +305,28 @@ int getActiveOperationsCount() { } } + /** + * @return a list of containing an exception for each permit that wasn't released yet. The stack traces of the exceptions + * was captured when the operation acquired the permit and their message contains the debug information supplied at the time. + */ + List getActiveOperations() { + return new ArrayList<>(issuedPermits.values()); + } + + private static class DelayedOperation { + private final ActionListener listener; + private final Throwable debugInfo; + + private DelayedOperation(ActionListener listener, Throwable debugInfo) { + this.listener = listener; + if (Assertions.ENABLED) { + this.debugInfo = new Throwable("delayed", debugInfo); + } else { + this.debugInfo = null; + } + } + } + /** * A permit-aware action listener wrapper that spawns onResponse listener invocations off on a configurable thread-pool. * Being permit-aware, it also releases the permit when hitting thread-pool rejections and falls back to the diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 47c3d073f1076..3f0408a7c2957 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -142,7 +142,7 @@ public RecoveryResponse recoverToTarget() throws IOException { throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - }); + }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered "); try (Closeable ignored = shard.acquireTranslogRetentionLock()) { @@ -198,7 +198,8 @@ public RecoveryResponse recoverToTarget() throws IOException { * make sure to do this before sampling the max sequence number in the next step, to ensure that we send * all documents up to maxSeqNo in phase2. */ - runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); + runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), + shardId + " initiating tracking of " + request.targetAllocationId()); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); /* @@ -229,10 +230,10 @@ private boolean isTargetSameHistory() { return targetHistoryUUID != null && targetHistoryUUID.equals(shard.getHistoryUUID()); } - private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) { + private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason) { cancellableThreads.execute(() -> { final PlainActionFuture onAcquired = new PlainActionFuture<>(); - shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME); + shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason); try (Releasable ignored = onAcquired.actionGet()) { // check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent // races, as IndexShard will change to RELOCATED only when it holds all operation permits, see IndexShard.relocated() @@ -493,10 +494,12 @@ public void finalizeRecovery(final long targetLocalCheckpoint) throws IOExceptio * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire * the permit then the state of the shard will be relocated and this recovery will fail. */ - runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint)); + runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), + shardId + " marking " + request.targetAllocationId() + " as in sync"); final long globalCheckpoint = shard.getGlobalCheckpoint(); cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint)); - runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint)); + runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), + shardId + " updating " + request.targetAllocationId() + "'s global checkpoint"); if (request.isPrimaryRelocation()) { logger.trace("performing relocation hand-off"); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 2112a231d37a7..b9688053fba2d 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -117,6 +117,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -694,7 +695,7 @@ public void testSeqNoIsSetOnPrimary() throws Exception { doAnswer(invocation -> { ((ActionListener)invocation.getArguments()[0]).onResponse(() -> {}); return null; - }).when(shard).acquirePrimaryOperationPermit(any(), anyString()); + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); AtomicBoolean closed = new AtomicBoolean(); Releasable releasable = () -> { @@ -1194,7 +1195,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; ActionListener callback = (ActionListener) invocation.getArguments()[2]; @@ -1206,7 +1207,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 805553b4a6103..bed1b5de03750 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -80,6 +80,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -449,7 +450,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; ActionListener callback = (ActionListener) invocation.getArguments()[1]; @@ -461,7 +462,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index a67e843e46862..f74ffdc4b4dc4 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -535,7 +535,7 @@ public void onFailure(Exception e) { listener.onFailure(e); } }, - ThreadPool.Names.INDEX); + ThreadPool.Names.INDEX, request); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index ab5192dfc3e32..169f18da09abf 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -35,6 +35,7 @@ import org.junit.BeforeClass; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; @@ -48,10 +49,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.stream.Collectors; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; @@ -122,7 +128,7 @@ public void onResponse(Releasable releasable) { public void run() { latch.countDown(); try { - permits.acquire(future, threadPoolName, forceExecution); + permits.acquire(future, threadPoolName, forceExecution, ""); } catch (DummyException dummyException) { // ok, notify future assertTrue(failingListener); @@ -176,7 +182,7 @@ public void run() { public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionException, InterruptedException { PlainActionFuture future = new PlainActionFuture<>(); - permits.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); assertTrue(future.isDone()); future.get().close(); } @@ -184,7 +190,7 @@ public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionExceptio public void testOperationsIfClosed() throws ExecutionException, InterruptedException { PlainActionFuture future = new PlainActionFuture<>(); permits.close(); - permits.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); ExecutionException exception = expectThrows(ExecutionException.class, future::get); assertThat(exception.getCause(), instanceOf(IndexShardClosedException.class)); } @@ -198,7 +204,7 @@ public void testBlockIfClosed() throws ExecutionException, InterruptedException public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException { PlainActionFuture future = new PlainActionFuture<>(); try (Releasable ignored = blockAndWait()) { - permits.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); assertFalse(future.isDone()); } future.get(1, TimeUnit.HOURS).close(); @@ -245,8 +251,8 @@ public void onResponse(Releasable releasable) { context.putHeader("foo", "bar"); context.putTransient("bar", "baz"); // test both with and without a executor name - permits.acquire(future, ThreadPool.Names.GENERIC, true); - permits.acquire(future2, null, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); + permits.acquire(future2, null, true, ""); } assertFalse(future.isDone()); } @@ -329,7 +335,7 @@ public void onFailure(Exception e) { } }, ThreadPool.Names.GENERIC, - false)); + false, "")); thread.start(); assertFalse(delayed.get()); releaseBlock.countDown(); @@ -387,7 +393,7 @@ public void onFailure(Exception e) { } }, ThreadPool.Names.GENERIC, - false); + false, ""); }); secondOperationThread.start(); @@ -436,7 +442,7 @@ public void onFailure(Exception e) { } }, ThreadPool.Names.GENERIC, - false); + false, ""); }); thread.start(); threads.add(thread); @@ -490,12 +496,12 @@ public void onFailure(Exception e) { public void testActiveOperationsCount() throws ExecutionException, InterruptedException { PlainActionFuture future1 = new PlainActionFuture<>(); - permits.acquire(future1, ThreadPool.Names.GENERIC, true); + permits.acquire(future1, ThreadPool.Names.GENERIC, true, ""); assertTrue(future1.isDone()); assertThat(permits.getActiveOperationsCount(), equalTo(1)); PlainActionFuture future2 = new PlainActionFuture<>(); - permits.acquire(future2, ThreadPool.Names.GENERIC, true); + permits.acquire(future2, ThreadPool.Names.GENERIC, true, ""); assertTrue(future2.isDone()); assertThat(permits.getActiveOperationsCount(), equalTo(2)); @@ -511,7 +517,7 @@ public void testActiveOperationsCount() throws ExecutionException, InterruptedEx } PlainActionFuture future3 = new PlainActionFuture<>(); - permits.acquire(future3, ThreadPool.Names.GENERIC, true); + permits.acquire(future3, ThreadPool.Names.GENERIC, true, ""); assertTrue(future3.isDone()); assertThat(permits.getActiveOperationsCount(), equalTo(1)); future3.get().close(); @@ -594,7 +600,7 @@ public void onFailure(Exception e) { } }, ThreadPool.Names.GENERIC, - false)); + false, "")); assertThat(e, hasToString(containsString("failed to obtain permit but operations are not delayed"))); permits.semaphore.release(IndexShardOperationPermits.TOTAL_PERMITS); } @@ -645,8 +651,37 @@ public void onFailure(Exception e) { } }, ThreadPool.Names.GENERIC, - false); + false, ""); }; } + public void testPermitTraceCapturing() throws ExecutionException, InterruptedException { + final PlainActionFuture listener1 = new PlainActionFuture<>(); + permits.acquire(listener1, null, false, "listener1"); + final PlainActionFuture listener2 = new PlainActionFuture<>(); + permits.acquire(listener2, null, false, "listener2"); + + assertThat(permits.getActiveOperationsCount(), equalTo(2)); + List messages = permits.getActiveOperations().stream().map(Throwable::getMessage).collect(Collectors.toList()); + assertThat(messages, hasSize(2)); + assertThat(messages, containsInAnyOrder(Arrays.asList(containsString("listener1"), containsString("listener2")))); + + if (randomBoolean()) { + listener1.get().close(); + assertThat(permits.getActiveOperationsCount(), equalTo(1)); + messages = permits.getActiveOperations().stream().map(Throwable::getMessage).collect(Collectors.toList()); + assertThat(messages, hasSize(1)); + assertThat(messages, contains(containsString("listener2"))); + listener2.get().close(); + } else { + listener2.get().close(); + assertThat(permits.getActiveOperationsCount(), equalTo(1)); + messages = permits.getActiveOperations().stream().map(Throwable::getMessage).collect(Collectors.toList()); + assertThat(messages, hasSize(1)); + assertThat(messages, contains(containsString("listener1"))); + listener1.get().close(); + } + assertThat(permits.getActiveOperationsCount(), equalTo(0)); + assertThat(permits.getActiveOperations(), emptyIterable()); + } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 77f6149c4a3c4..1370dad06d7df 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -150,7 +150,6 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; @@ -282,14 +281,14 @@ public void testClosesPreventsNewOperations() throws InterruptedException, Execu closeShards(indexShard); assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); try { - indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX); + indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX, ""); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected } try { indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, null, - ThreadPool.Names.INDEX); + ThreadPool.Names.INDEX, ""); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected @@ -300,7 +299,7 @@ public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOExc IndexShard indexShard = newShard(false); expectThrows(IndexShardNotStartedException.class, () -> indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100), - SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX)); + SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX, "")); closeShards(indexShard); } @@ -313,6 +312,7 @@ public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBar final CountDownLatch operationLatch = new CountDownLatch(1); final List threads = new ArrayList<>(); for (int i = 0; i < operations; i++) { + final String id = "t_" + i; final Thread thread = new Thread(() -> { try { barrier.await(); @@ -339,7 +339,7 @@ public void onFailure(Exception e) { throw new RuntimeException(e); } }, - ThreadPool.Names.INDEX); + ThreadPool.Names.INDEX, id); }); thread.start(); threads.add(thread); @@ -369,6 +369,7 @@ public void onFailure(Exception e) { final AtomicLong counter = new AtomicLong(); final List delayedThreads = new ArrayList<>(); for (int i = 0; i < delayedOperations; i++) { + final String id = "d_" + i; final Thread thread = new Thread(() -> { try { delayedOperationsBarrier.await(); @@ -389,7 +390,7 @@ public void onFailure(Exception e) { throw new RuntimeException(e); } }, - ThreadPool.Names.INDEX); + ThreadPool.Names.INDEX, id); }); thread.start(); delayedThreads.add(thread); @@ -503,7 +504,7 @@ public void onFailure(Exception e) { throw new RuntimeException(e); } }, - ThreadPool.Names.GENERIC); + ThreadPool.Names.GENERIC, ""); latch.await(); assertThat(indexShard.getLocalCheckpoint(), equalTo((long) maxSeqNo)); @@ -547,7 +548,7 @@ public void onFailure(Exception e) { throw new RuntimeException(e); } }, - ThreadPool.Names.GENERIC); + ThreadPool.Names.GENERIC, ""); latch.await(); assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(currentTranslogGeneration + 1)); @@ -580,7 +581,7 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E assertEquals(0, indexShard.getActiveOperationsCount()); if (indexShard.routingEntry().isRelocationTarget() == false) { try { - indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.INDEX, ""); fail("shard shouldn't accept operations as replica"); } catch (IllegalStateException ignored) { @@ -599,14 +600,14 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX); + indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX, ""); return fut.get(); } private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.INDEX, ""); return fut.get(); } @@ -653,7 +654,8 @@ public void testOperationPermitOnReplicaShards() throws Exception { assertEquals(0, indexShard.getActiveOperationsCount()); if (shardRouting.primary() == false) { final IllegalStateException e = - expectThrows(IllegalStateException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX)); + expectThrows(IllegalStateException.class, + () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX, "")); assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary"))); } @@ -690,7 +692,7 @@ public void onFailure(Exception e) { }; indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO, onLockAcquired, - ThreadPool.Names.INDEX); + ThreadPool.Names.INDEX, ""); assertFalse(onResponse.get()); assertTrue(onFailure.get()); @@ -762,7 +764,7 @@ private void finish() { newPrimaryTerm, newGlobalCheckPoint, listener, - ThreadPool.Names.SAME); + ThreadPool.Names.SAME, ""); } catch (Exception e) { listener.onFailure(e); } @@ -900,7 +902,7 @@ public void onFailure(Exception e) { } }, - ThreadPool.Names.SAME); + ThreadPool.Names.SAME, ""); latch.await(); @@ -954,7 +956,7 @@ public void onFailure(final Exception e) { } }, - ThreadPool.Names.SAME); + ThreadPool.Names.SAME, ""); latch.await(); if (globalCheckpointOnReplica == SequenceNumbers.UNASSIGNED_SEQ_NO @@ -1006,7 +1008,7 @@ public void onFailure(Exception e) { latch.countDown(); } }, - ThreadPool.Names.INDEX); + ThreadPool.Names.INDEX, ""); }; final long firstIncrement = 1 + (randomBoolean() ? 0 : 1); @@ -1367,7 +1369,7 @@ public void onResponse(Releasable releasable) { super.onResponse(releasable); } }; - shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.INDEX); + shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.INDEX, "i_" + i); onLockAcquiredActions.add(onLockAcquired); } diff --git a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java index 3d7ef82ea1256..6561001ad7d86 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -38,7 +37,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { @@ -115,7 +113,7 @@ public void testSyncFailsIfOperationIsInFlight() throws InterruptedException, Ex SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); PlainActionFuture fut = new PlainActionFuture<>(); - shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX); + shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX, ""); try (Releasable operationLock = fut.get()) { SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); flushService.attemptSyncedFlush(shardId, listener); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 7ab6925ce57b9..4312757f03cc0 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -88,6 +88,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -400,7 +401,7 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE doAnswer(invocation -> { ((ActionListener)invocation.getArguments()[0]).onResponse(() -> {}); return null; - }).when(shard).acquirePrimaryOperationPermit(any(), anyString()); + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); final AtomicBoolean phase1Called = new AtomicBoolean(); final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean(); final AtomicBoolean phase2Called = new AtomicBoolean(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 29b0abc51581c..5d9cc7e1d32b0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -28,11 +28,10 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; -import org.elasticsearch.action.support.replication.ReplicationTask; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.ClusterName; @@ -66,8 +65,6 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; @@ -93,8 +90,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchService; -import org.elasticsearch.tasks.TaskInfo; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.MockTransportClient; @@ -1106,7 +1101,7 @@ public void beforeIndexDeletion() throws Exception { // test that have ongoing write operations after the test (for example because ttl is used // and not all docs have been purged after the test) and inherit from // ElasticsearchIntegrationTest must override beforeIndexDeletion() to avoid failures. - assertShardIndexCounter(); + assertNoPendingIndexOperations(); //check that shards that have same sync id also contain same number of documents assertSameSyncIdSameDocs(); assertOpenTranslogReferences(); @@ -1136,30 +1131,19 @@ private void assertSameSyncIdSameDocs() { } } - private void assertShardIndexCounter() throws Exception { + private void assertNoPendingIndexOperations() throws Exception { assertBusy(() -> { final Collection nodesAndClients = nodes.values(); for (NodeAndClient nodeAndClient : nodesAndClients) { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - int activeOperationsCount = indexShard.getActiveOperationsCount(); - if (activeOperationsCount > 0) { - TaskManager taskManager = getInstance(TransportService.class, nodeAndClient.name).getTaskManager(); - DiscoveryNode localNode = getInstance(ClusterService.class, nodeAndClient.name).localNode(); - List taskInfos = taskManager.getTasks().values().stream() - .filter(task -> task instanceof ReplicationTask) - .map(task -> task.taskInfo(localNode.getId(), true)) - .collect(Collectors.toList()); - ListTasksResponse response = new ListTasksResponse(taskInfos, Collections.emptyList(), Collections.emptyList()); - try { - XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint().value(response); - throw new AssertionError("expected index shard counter on shard " + indexShard.shardId() + " on node " + - nodeAndClient.name + " to be 0 but was " + activeOperationsCount + ". Current replication tasks on node:\n" + - builder.string()); - } catch (IOException e) { - throw new RuntimeException("caught exception while building response [" + response + "]", e); - } + List operations = indexShard.getActiveOperations(); + if (operations.size() > 0) { + throw new AssertionError( + "shard " + indexShard.shardId() + " on node [" + nodeAndClient.name + "] has pending operations:\n" + + operations.stream().map(e -> "--> " + ExceptionsHelper.stackTrace(e)).collect(Collectors.joining("\n")) + ); } } }