diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index c639afb7fc5cf..24c54512b67c0 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -265,7 +265,7 @@ protected abstract void shardOperationOnPrimary( /** * Execute the specified replica operation. This is done under a permit from - * {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)}. + * {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String)}. * * @param shardRequest the request to the replica shard * @param replica the replica shard to perform the operation on @@ -1057,7 +1057,7 @@ protected void acquirePrimaryOperationPermit( final Request request, final ActionListener onAcquired ) { - primary.acquirePrimaryOperationPermit(onAcquired, executor, request, forceExecutionOnPrimary); + primary.acquirePrimaryOperationPermit(onAcquired, executor, forceExecutionOnPrimary); } /** @@ -1072,7 +1072,7 @@ protected void acquireReplicaOperationPermit( final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes ) { - replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request); + replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor); } class PrimaryShardReference diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 6f442d1bd84e6..71471b5be296e 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -978,7 +978,7 @@ private void sync(final Consumer sync, final String source) { && e instanceof ShardNotInPrimaryModeException == false) { logger.warn(() -> format("%s failed to execute %s sync", shard.shardId(), source), e); } - }, ThreadPool.Names.SAME, source + " sync"); + }, ThreadPool.Names.SAME); } catch (final AlreadyClosedException | IndexShardClosedException e) { // the shard was closed concurrently, continue } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java index 9df543f06e705..f6f5412526355 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -86,7 +86,7 @@ protected void asyncShardOperation(T request, ShardId shardId, final ActionListe try (Releasable ignore = releasable) { doRetentionLeaseAction(indexShard, request, delegatedListener); } - }), ThreadPool.Names.SAME, request); + }), ThreadPool.Names.SAME); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3b8f77d1872ec..94bd1caa30515 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2143,7 +2143,7 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn } else { assert origin == Engine.Operation.Origin.LOCAL_RESET; assert getActiveOperationsCount() == OPERATIONS_BLOCKED - : "locally resetting without blocking operations, active operations are [" + getActiveOperations() + "]"; + : "locally resetting without blocking operations, active operations [" + getActiveOperationsCount() + "]"; } if (writeAllowedStates.contains(state) == false) { throw new IllegalIndexShardStateException( @@ -3301,29 +3301,15 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { * ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided * ActionListener will then be called using the provided executor. * - * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled - * the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object - * isn't used */ - public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay, Object debugInfo) { - acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, debugInfo, false); + public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay) { + acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false); } - public void acquirePrimaryOperationPermit( - ActionListener onPermitAcquired, - String executorOnDelay, - Object debugInfo, - boolean forceExecution - ) { + public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay, boolean forceExecution) { verifyNotClosed(); assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting; - - indexShardOperationPermits.acquire( - wrapPrimaryOperationPermitListener(onPermitAcquired), - executorOnDelay, - forceExecution, - debugInfo - ); + indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, forceExecution); } /** @@ -3374,21 +3360,15 @@ private void asyncBlockOperations(ActionListener onPermitAcquired, l /** * Runs the specified runnable under a permit and otherwise calling back the specified failure callback. This method is really a - * convenience for {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)} where the listener equates to + * convenience for {@link #acquirePrimaryOperationPermit(ActionListener, String)} where the listener equates to * try-with-resources closing the releasable after executing the runnable on successfully acquiring the permit, an otherwise calling * back the failure callback. * * @param runnable the runnable to execute under permit * @param onFailure the callback on failure * @param executorOnDelay the executor to execute the runnable on if permit acquisition is blocked - * @param debugInfo debug info */ - public void runUnderPrimaryPermit( - final Runnable runnable, - final Consumer onFailure, - final String executorOnDelay, - final Object debugInfo - ) { + public void runUnderPrimaryPermit(final Runnable runnable, final Consumer onFailure, final String executorOnDelay) { verifyNotClosed(); assert shardRouting.primary() : "runUnderPrimaryPermit should only be called on primary shard but was " + shardRouting; final ActionListener onPermitAcquired = ActionListener.wrap(releasable -> { @@ -3396,7 +3376,7 @@ public void runUnderPrimaryPermit( runnable.run(); } }, onFailure); - acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, debugInfo); + acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay); } private void bumpPrimaryTerm( @@ -3461,7 +3441,7 @@ public void onResponse(final Releasable releasable) { /** * Acquire a replica operation permit whenever the shard is ready for indexing (see - * {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in + * {@link #acquirePrimaryOperationPermit(ActionListener, String)}). If the given primary term is lower than then one in * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an * {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified * name. @@ -3472,17 +3452,13 @@ public void onResponse(final Releasable releasable) { * after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()} * @param onPermitAcquired the listener for permit acquisition * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed - * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are - * enabled the tracing will capture the supplied object's {@link Object#toString()} value. - * Otherwise the object isn't used */ public void acquireReplicaOperationPermit( final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener onPermitAcquired, - final String executorOnDelay, - final Object debugInfo + final String executorOnDelay ) { innerAcquireReplicaOperationPermit( opPrimaryTerm, @@ -3490,7 +3466,7 @@ public void acquireReplicaOperationPermit( maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, false, - (listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo) + (listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true) ); } @@ -3621,14 +3597,6 @@ public int getActiveOperationsCount() { return indexShardOperationPermits.getActiveOperationsCount(); } - /** - * @return a list of describing each permit that wasn't released yet. The description consist of the debugInfo supplied - * when the permit was acquired plus a stack traces that was captured when the permit was request. - */ - public List getActiveOperations() { - return indexShardOperationPermits.getActiveOperations(); - } - /** * Syncs the given location with the underlying storage unless already synced. This method might return immediately without * actually fsyncing the location until the sync listener is called. Yet, unless there is already another thread fsyncing @@ -3978,7 +3946,7 @@ public void afterRefresh(boolean didRefresh) { void resetEngineToGlobalCheckpoint() throws IOException { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert getActiveOperationsCount() == OPERATIONS_BLOCKED - : "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']'; + : "resetting engine without blocking operations; active operations are [" + getActiveOperationsCount() + ']'; sync(); // persist the global checkpoint to disk final SeqNoStats seqNoStats = seqNoStats(); final TranslogStats translogStats = translogStats(); @@ -4085,7 +4053,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { * These transfers guarantee that every index/delete operation when executing on a replica engine will observe this marker a value * which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary. * - * @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object) + * @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String) * @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, long, ActionListener) */ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 2976d0f032acc..ad4c1ce9097fa 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ContextPreservingActionListener; @@ -22,18 +21,14 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.Tuple; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; /** @@ -51,14 +46,10 @@ final class IndexShardOperationPermits implements Closeable { static final int TOTAL_PERMITS = Integer.MAX_VALUE; final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved - private final List delayedOperations = new ArrayList<>(); // operations that are delayed + private final List> delayedOperations = new ArrayList<>(); // operations that are delayed private volatile boolean closed; private int queuedBlockOperations; // does not need to be volatile as all accesses are done under a lock on this - // only valid when assertions are enabled. Key is AtomicBoolean associated with each permit to ensure close once semantics. - // Value is a tuple, with a some debug information supplied by the caller and a stack trace of the acquiring thread - private final Map> issuedPermits; - /** * Construct operation permits for the specified shards. * @@ -68,11 +59,6 @@ final class IndexShardOperationPermits implements Closeable { IndexShardOperationPermits(final ShardId shardId, final ThreadPool threadPool) { this.shardId = shardId; this.threadPool = threadPool; - if (Assertions.ENABLED) { - issuedPermits = new ConcurrentHashMap<>(); - } else { - issuedPermits = null; - } } @Override @@ -150,18 +136,17 @@ private Releasable acquireAll(final long timeout, final TimeUnit timeUnit) throw } } if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { - final Releasable release = Releasables.releaseOnce(() -> { + return Releasables.releaseOnce(() -> { assert semaphore.availablePermits() == 0; semaphore.release(TOTAL_PERMITS); }); - return release; } else { throw new ElasticsearchTimeoutException("timeout while blocking operations after [" + new TimeValue(timeout, timeUnit) + "]"); } } private void releaseDelayedOperations() { - final List queuedActions; + final List> queuedActions; synchronized (this) { assert queuedBlockOperations > 0; queuedBlockOperations--; @@ -183,8 +168,8 @@ private void releaseDelayedOperations() { * recovery */ threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - for (DelayedOperation queuedAction : queuedActions) { - acquire(queuedAction.listener, null, false, queuedAction.debugInfo, queuedAction.stackTrace); + for (final var queuedAction : queuedActions) { + acquire(queuedAction, null, false); } }); } @@ -201,33 +186,12 @@ private void releaseDelayedOperations() { * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed * @param executorOnDelay executor to use for the possibly delayed {@link ActionListener#onResponse(Object)} call * @param forceExecution whether the runnable should force its execution in case it gets rejected - * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled - * the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object - * isn't used - * */ - public void acquire( - final ActionListener onAcquired, - final String executorOnDelay, - final boolean forceExecution, - final Object debugInfo - ) { - final StackTraceElement[] stackTrace; - if (Assertions.ENABLED) { - stackTrace = Thread.currentThread().getStackTrace(); - } else { - stackTrace = null; - } - acquire(ActionListener.assertOnce(onAcquired), executorOnDelay, forceExecution, debugInfo, stackTrace); + public void acquire(final ActionListener onAcquired, final String executorOnDelay, final boolean forceExecution) { + innerAcquire(ActionListener.assertOnce(onAcquired), executorOnDelay, forceExecution); } - private void acquire( - final ActionListener onAcquired, - final String executorOnDelay, - final boolean forceExecution, - final Object debugInfo, - final StackTraceElement[] stackTrace - ) { + private void innerAcquire(final ActionListener onAcquired, final String executorOnDelay, final boolean forceExecution) { if (closed) { onAcquired.onFailure(new IndexShardClosedException(shardId)); return; @@ -261,10 +225,10 @@ public void onRejection(Exception e) { } else { wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired); } - delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo, stackTrace)); + delayedOperations.add(wrappedListener); return; } else { - releasable = acquire(debugInfo, stackTrace); + releasable = acquire(); } } } catch (final InterruptedException e) { @@ -275,25 +239,13 @@ public void onRejection(Exception e) { onAcquired.onResponse(releasable); } - private Releasable acquire(Object debugInfo, StackTraceElement[] stackTrace) throws InterruptedException { + private Releasable acquire() throws InterruptedException { assert Thread.holdsLock(this); if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting - final AtomicBoolean closed = new AtomicBoolean(); - final Releasable releasable = () -> { - if (closed.compareAndSet(false, true)) { - if (Assertions.ENABLED) { - Tuple existing = issuedPermits.remove(closed); - assert existing != null; - } - semaphore.release(1); - } - }; - if (Assertions.ENABLED) { - issuedPermits.put(closed, new Tuple<>(debugInfo.toString(), stackTrace)); - } - return releasable; + return Releasables.releaseOnce(semaphore::release); } else { // this should never happen, if it does something is deeply wrong + assert false; throw new IllegalStateException("failed to obtain permit but operations are not delayed"); } } @@ -315,29 +267,4 @@ int getActiveOperationsCount() { synchronized boolean isBlocked() { return queuedBlockOperations > 0; } - - /** - * @return a list of describing each permit that wasn't released yet. The description consist of the debugInfo supplied - * when the permit was acquired plus a stack traces that was captured when the permit was request. - */ - List getActiveOperations() { - return issuedPermits.values().stream().map(t -> t.v1() + "\n" + ExceptionsHelper.formatStackTrace(t.v2())).toList(); - } - - private static class DelayedOperation { - private final ActionListener listener; - private final String debugInfo; - private final StackTraceElement[] stackTrace; - - private DelayedOperation(ActionListener listener, Object debugInfo, StackTraceElement[] stackTrace) { - this.listener = listener; - if (Assertions.ENABLED) { - this.debugInfo = "[delayed] " + debugInfo; - this.stackTrace = stackTrace; - } else { - this.debugInfo = null; - this.stackTrace = null; - } - } - } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 3fcc51a689174..533fb2cb527e4 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -200,7 +200,7 @@ public void recoverToTarget(ActionListener listener) { retentionLeaseRef.set( shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) ); - }, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads); + }, shard, cancellableThreads); final Closeable retentionLock = shard.acquireHistoryRetentionLock(); resources.add(retentionLock); final long startingSeqNo; @@ -287,7 +287,7 @@ && isTargetSameHistory() logger.debug("no peer-recovery retention lease for " + request.targetAllocationId()); deleteRetentionLeaseStep.onResponse(null); } - }, shardId + " removing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads); + }, shard, cancellableThreads); deleteRetentionLeaseStep.whenComplete(ignored -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); @@ -314,12 +314,7 @@ && isTargetSameHistory() * make sure to do this before sampling the max sequence number in the next step, to ensure that we send * all documents up to maxSeqNo in phase2. */ - runUnderPrimaryPermit( - () -> shard.initiateTracking(request.targetAllocationId()), - shardId + " initiating tracking of " + request.targetAllocationId(), - shard, - cancellableThreads - ); + runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), shard, cancellableThreads); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); logger.trace("snapshot for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo)); @@ -397,7 +392,6 @@ private int estimateNumberOfHistoryOperations(long startingSeqNo) throws IOExcep static void runUnderPrimaryPermit( CancellableThreads.Interruptible runnable, - String reason, IndexShard primary, CancellableThreads cancellableThreads ) { @@ -406,7 +400,7 @@ static void runUnderPrimaryPermit( final var future = new PlainActionFuture(); listener.addListener(future); - primary.acquirePrimaryOperationPermit(listener, ThreadPool.Names.SAME, reason); + primary.acquirePrimaryOperationPermit(listener, ThreadPool.Names.SAME); try (var ignored = FutureUtils.get(future)) { ensureNotRelocatedPrimary(primary); runnable.run(); @@ -423,7 +417,6 @@ static void runUnderPrimaryPermit( */ static void runUnderPrimaryPermit( Consumer> action, - String reason, IndexShard primary, CancellableThreads cancellableThreads, ActionListener listener @@ -451,7 +444,7 @@ public void onFailure(Exception e) { cancellableThreads.checkForCancel(); ensureNotRelocatedPrimary(primary); action.accept(l2); - })), ThreadPool.Names.GENERIC, reason); + })), ThreadPool.Names.GENERIC); } private static void ensureNotRelocatedPrimary(IndexShard indexShard) { @@ -1003,7 +996,7 @@ void createRetentionLease(final long startingSeqNo, ActionListener newLease)); logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint); } - }, shardId + " establishing retention lease for [" + request.targetAllocationId() + "]", shard, cancellableThreads, outerListener); + }, shard, cancellableThreads, outerListener); } boolean hasSameLegacySyncId(Store.MetadataSnapshot source, Store.MetadataSnapshot target) { @@ -1231,7 +1224,6 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis */ runUnderPrimaryPermit( () -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), - shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads ); @@ -1242,7 +1234,6 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis finalizeListener.whenComplete(r -> { runUnderPrimaryPermit( () -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), - shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads ); diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index 919737caf2c7a..daaaeb729ab13 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -62,7 +62,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -147,7 +146,7 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { acquiredPermits.incrementAndGet(); callback.onResponse(acquiredPermits::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationPermit(anyActionListener(), anyString(), any(), eq(true)); + }).when(indexShard).acquirePrimaryOperationPermit(anyActionListener(), anyString(), eq(true)); when(indexShard.getReplicationGroup()).thenReturn( new ReplicationGroup( shardRoutingTable, diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index e64dddff3cdd3..e269fda62bb37 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -828,7 +828,7 @@ public void testPrimaryReference() { primary.close(); assertTrue(closed.get()); - })), Assert::assertNotNull, null, null); + })), Assert::assertNotNull, null); } public void testReplicaProxy() throws InterruptedException, ExecutionException { @@ -944,7 +944,7 @@ public void testSeqNoIsSetOnPrimary() { ActionListener argument = (ActionListener) invocation.getArguments()[0]; argument.onResponse(count::decrementAndGet); return null; - }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), any(), eq(forceExecute)); + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), eq(forceExecute)); when(shard.getActiveOperationsCount()).thenAnswer(i -> count.get()); final IndexService indexService = mock(IndexService.class); @@ -1580,7 +1580,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED)); } return null; - }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), any(), eq(forceExecute)); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), eq(forceExecute)); doAnswer(invocation -> { long term = (Long) invocation.getArguments()[0]; ActionListener callback = (ActionListener) invocation.getArguments()[3]; @@ -1593,7 +1593,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), any(ActionListener.class), anyString(), any()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), any(ActionListener.class), anyString()); when(indexShard.getActiveOperationsCount()).thenAnswer(i -> count.get()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 6a25867f02d71..ff480b9979a83 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -547,7 +547,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), anyActionListener(), anyString(), any()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), anyActionListener(), anyString()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index 60a83219340f1..924ef5ec7acfb 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -27,7 +27,6 @@ import org.junit.BeforeClass; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; @@ -42,13 +41,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; -import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; @@ -117,7 +112,7 @@ public void onResponse(Releasable releasable) { }; Thread thread = new Thread(() -> { latch.countDown(); - permits.acquire(future, threadPoolName, forceExecution, ""); + permits.acquire(future, threadPoolName, forceExecution); }); futures.add(future); operationThreads.add(thread); @@ -166,7 +161,7 @@ public void onResponse(Releasable releasable) { public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionException, InterruptedException { PlainActionFuture future = new PlainActionFuture<>(); - permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); + permits.acquire(future, ThreadPool.Names.GENERIC, true); assertTrue(future.isDone()); future.get().close(); } @@ -174,7 +169,7 @@ public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionExceptio public void testOperationsIfClosed() { PlainActionFuture future = new PlainActionFuture<>(); permits.close(); - permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); + permits.acquire(future, ThreadPool.Names.GENERIC, true); ExecutionException exception = expectThrows(ExecutionException.class, future::get); assertThat(exception.getCause(), instanceOf(IndexShardClosedException.class)); } @@ -195,7 +190,7 @@ public void testBlockIfClosed() { public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException { PlainActionFuture future = new PlainActionFuture<>(); try (Releasable ignored = blockAndWait()) { - permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); + permits.acquire(future, ThreadPool.Names.GENERIC, true); assertFalse(future.isDone()); } future.get(1, TimeUnit.HOURS).close(); @@ -207,7 +202,7 @@ public void testGetBlockWhenBlocked() throws ExecutionException, InterruptedExce final CountDownLatch releaseBlock = new CountDownLatch(1); final AtomicBoolean blocked = new AtomicBoolean(); try (Releasable ignored = blockAndWait()) { - permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); + permits.acquire(future, ThreadPool.Names.GENERIC, true); permits.blockOperations(wrap(() -> { blocked.set(true); @@ -272,8 +267,8 @@ public void onResponse(Releasable releasable) { context.putHeader("foo", "bar"); context.putTransient("bar", "baz"); // test both with and without a executor name - permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); - permits.acquire(future2, null, true, ""); + permits.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future2, null, true); } assertFalse(future.isDone()); } @@ -346,7 +341,7 @@ public void onResponse(Releasable releasable) { public void onFailure(Exception e) { } - }, ThreadPool.Names.GENERIC, false, "")); + }, ThreadPool.Names.GENERIC, false)); thread.start(); assertFalse(delayed.get()); releaseBlock.countDown(); @@ -396,7 +391,7 @@ public void onResponse(Releasable releasable) { public void onFailure(Exception e) { throw new RuntimeException(e); } - }, ThreadPool.Names.GENERIC, false, ""); + }, ThreadPool.Names.GENERIC, false); }); secondOperationThread.start(); @@ -442,7 +437,7 @@ public void onResponse(Releasable releasable) { public void onFailure(Exception e) { } - }, ThreadPool.Names.GENERIC, false, ""); + }, ThreadPool.Names.GENERIC, false); }); thread.start(); threads.add(thread); @@ -492,12 +487,12 @@ public void onFailure(Exception e) { public void testActiveOperationsCount() throws ExecutionException, InterruptedException { PlainActionFuture future1 = new PlainActionFuture<>(); - permits.acquire(future1, ThreadPool.Names.GENERIC, true, ""); + permits.acquire(future1, ThreadPool.Names.GENERIC, true); assertTrue(future1.isDone()); assertThat(permits.getActiveOperationsCount(), equalTo(1)); PlainActionFuture future2 = new PlainActionFuture<>(); - permits.acquire(future2, ThreadPool.Names.GENERIC, true, ""); + permits.acquire(future2, ThreadPool.Names.GENERIC, true); assertTrue(future2.isDone()); assertThat(permits.getActiveOperationsCount(), equalTo(2)); @@ -513,7 +508,7 @@ public void testActiveOperationsCount() throws ExecutionException, InterruptedEx } PlainActionFuture future3 = new PlainActionFuture<>(); - permits.acquire(future3, ThreadPool.Names.GENERIC, true, ""); + permits.acquire(future3, ThreadPool.Names.GENERIC, true); assertTrue(future3.isDone()); assertThat(permits.getActiveOperationsCount(), equalTo(1)); future3.get().close(); @@ -523,11 +518,7 @@ public void testActiveOperationsCount() throws ExecutionException, InterruptedEx public void testAsyncBlockOperationsOnRejection() { final PlainActionFuture threadBlock = new PlainActionFuture<>(); try ( - Releasable firstPermit = PlainActionFuture.get( - f -> permits.acquire(f, ThreadPool.Names.GENERIC, false, ""), - 0, - TimeUnit.SECONDS - ) + Releasable firstPermit = PlainActionFuture.get(f -> permits.acquire(f, ThreadPool.Names.GENERIC, false), 0, TimeUnit.SECONDS) ) { assertNotNull(firstPermit); @@ -542,7 +533,7 @@ public void testAsyncBlockOperationsOnRejection() { // ensure that the exception means no block was put in place try ( Releasable secondPermit = PlainActionFuture.get( - f -> permits.acquire(f, ThreadPool.Names.GENERIC, false, ""), + f -> permits.acquire(f, ThreadPool.Names.GENERIC, false), 0, TimeUnit.SECONDS ) @@ -562,11 +553,7 @@ public void testAsyncBlockOperationsOnRejection() { public void testAsyncBlockOperationsOnTimeout() { final PlainActionFuture threadBlock = new PlainActionFuture<>(); try ( - Releasable firstPermit = PlainActionFuture.get( - f -> permits.acquire(f, ThreadPool.Names.GENERIC, false, ""), - 0, - TimeUnit.SECONDS - ) + Releasable firstPermit = PlainActionFuture.get(f -> permits.acquire(f, ThreadPool.Names.GENERIC, false), 0, TimeUnit.SECONDS) ) { assertNotNull(firstPermit); @@ -583,7 +570,7 @@ public void testAsyncBlockOperationsOnTimeout() { // ensure that the exception means no block was put in place try ( Releasable secondPermit = PlainActionFuture.get( - f -> permits.acquire(f, ThreadPool.Names.GENERIC, false, ""), + f -> permits.acquire(f, ThreadPool.Names.GENERIC, false), 0, TimeUnit.SECONDS ) @@ -652,7 +639,7 @@ public void onResponse(Releasable releasable) { public void onFailure(Exception e) { assert false; } - }, ThreadPool.Names.GENERIC, false, "") + }, ThreadPool.Names.GENERIC, false) ); assertThat(e, hasToString(containsString("failed to obtain permit but operations are not delayed"))); permits.semaphore.release(IndexShardOperationPermits.TOTAL_PERMITS); @@ -702,38 +689,28 @@ public void onResponse(Releasable releasable) { public void onFailure(Exception e) { throw new RuntimeException(e); } - }, ThreadPool.Names.GENERIC, false, ""); + }, ThreadPool.Names.GENERIC, false); }; } public void testPermitTraceCapturing() throws ExecutionException, InterruptedException { final PlainActionFuture listener1 = new PlainActionFuture<>(); - permits.acquire(listener1, null, false, "listener1"); + permits.acquire(listener1, null, false); final PlainActionFuture listener2 = new PlainActionFuture<>(); - permits.acquire(listener2, null, false, "listener2"); + permits.acquire(listener2, null, false); assertThat(permits.getActiveOperationsCount(), equalTo(2)); - List messages = permits.getActiveOperations().stream().toList(); - assertThat(messages, hasSize(2)); - assertThat(messages, containsInAnyOrder(Arrays.asList(containsString("listener1"), containsString("listener2")))); if (randomBoolean()) { listener1.get().close(); assertThat(permits.getActiveOperationsCount(), equalTo(1)); - messages = permits.getActiveOperations().stream().toList(); - assertThat(messages, hasSize(1)); - assertThat(messages, contains(containsString("listener2"))); listener2.get().close(); } else { listener2.get().close(); assertThat(permits.getActiveOperationsCount(), equalTo(1)); - messages = permits.getActiveOperations().stream().toList(); - assertThat(messages, hasSize(1)); - assertThat(messages, contains(containsString("listener1"))); listener1.get().close(); } assertThat(permits.getActiveOperationsCount(), equalTo(0)); - assertThat(permits.getActiveOperations(), emptyIterable()); } private static ActionListener wrap(final CheckedRunnable onResponse) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index becf95a828332..592dc23efe52d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -327,7 +327,7 @@ public void testClosesPreventsNewOperations() throws Exception { IndexShard indexShard = newStartedShard(); closeShards(indexShard); assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); - expectThrows(IndexShardClosedException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, "")); + expectThrows(IndexShardClosedException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE)); expectThrows( IndexShardClosedException.class, () -> indexShard.acquireAllPrimaryOperationsPermits(null, TimeValue.timeValueSeconds(30L)) @@ -339,8 +339,7 @@ public void testClosesPreventsNewOperations() throws Exception { UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, - ThreadPool.Names.WRITE, - "" + ThreadPool.Names.WRITE ) ); expectThrows( @@ -362,8 +361,7 @@ public void testRunUnderPrimaryPermitRunsUnderPrimaryPermit() throws IOException indexShard.runUnderPrimaryPermit( () -> assertThat(indexShard.getActiveOperationsCount(), equalTo(1)), e -> fail(e.toString()), - ThreadPool.Names.SAME, - "test" + ThreadPool.Names.SAME ); assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); } finally { @@ -379,7 +377,7 @@ public void testRunUnderPrimaryPermitOnFailure() throws IOException { assertThat(e, instanceOf(RuntimeException.class)); assertThat(e.getMessage(), equalTo("failure")); invoked.set(true); - }, ThreadPool.Names.SAME, "test"); + }, ThreadPool.Names.SAME); assertTrue(invoked.get()); } finally { closeShards(indexShard); @@ -405,7 +403,7 @@ public void testRunUnderPrimaryPermitDelaysToExecutorWhenBlocked() throws Except : executorOnDelay.toLowerCase(Locale.ROOT); assertThat(Thread.currentThread().getName(), containsString(expectedThreadPoolName)); latch.countDown(); - }, e -> fail(e.toString()), executorOnDelay, "test"); + }, e -> fail(e.toString()), executorOnDelay); permit.close(); latch.await(); // we could race and assert on the count before the permit is returned @@ -424,8 +422,7 @@ public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOExc indexShard.getPendingPrimaryTerm() + randomIntBetween(1, 100), UNASSIGNED_SEQ_NO, randomNonNegativeLong(), - PlainActionFuture.newFuture(), - "" + PlainActionFuture.newFuture() ) ); closeShards(indexShard); @@ -469,8 +466,7 @@ public void onFailure(Exception e) { throw new RuntimeException(e); } }, - ThreadPool.Names.WRITE, - id + ThreadPool.Names.WRITE ); }); thread.start(); @@ -514,7 +510,7 @@ public void onResponse(Releasable releasable) { public void onFailure(Exception e) { throw new RuntimeException(e); } - }, ThreadPool.Names.WRITE, id); + }, ThreadPool.Names.WRITE); }); thread.start(); delayedThreads.add(thread); @@ -560,7 +556,7 @@ public void testPublishingOrderOnPromotion() throws IOException, InterruptedExce if (indexShard.routingEntry().primary()) { assertThat(indexShard.getPendingPrimaryTerm(), equalTo(promotedTerm)); final PlainActionFuture permitAcquiredFuture = new PlainActionFuture<>(); - indexShard.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME, "bla"); + indexShard.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME); try (Releasable ignored = permitAcquiredFuture.actionGet()) { assertThat(indexShard.getReplicationGroup(), notNullValue()); } @@ -620,7 +616,7 @@ public void onResponse(Releasable releasable) { public void onFailure(Exception e) { throw new AssertionError(e); } - }, ThreadPool.Names.GENERIC, ""); + }, ThreadPool.Names.GENERIC); latch.await(); assertThat(indexShard.getLocalCheckpoint(), equalTo((long) maxSeqNo)); @@ -668,7 +664,7 @@ public void onResponse(Releasable releasable) { public void onFailure(Exception e) { throw new RuntimeException(e); } - }, ThreadPool.Names.GENERIC, ""); + }, ThreadPool.Names.GENERIC); latch.await(); assertThat(getTranslog(indexShard).getGeneration().translogFileGeneration, equalTo(currentTranslogGeneration + 1)); @@ -751,7 +747,7 @@ public void onFailure(final Exception e) { assertThat(e, instanceOf(ShardNotInPrimaryModeException.class)); assertThat(e, hasToString(containsString("shard is not in primary mode"))); } - }, ThreadPool.Names.SAME, "test"); + }, ThreadPool.Names.SAME); final CountDownLatch latch = new CountDownLatch(1); indexShard.acquireAllPrimaryOperationsPermits(new ActionListener<>() { @@ -789,8 +785,7 @@ public void onFailure(Exception e) { fail(); } }, - ThreadPool.Names.WRITE, - "" + ThreadPool.Names.WRITE ) ).getMessage(), containsString("in primary mode cannot be a replication target") @@ -844,7 +839,7 @@ public void onFailure(final Exception e) { throw new RuntimeException(e); } if (singlePermit) { - indexShard.acquirePrimaryOperationPermit(future, ThreadPool.Names.WRITE, ""); + indexShard.acquirePrimaryOperationPermit(future, ThreadPool.Names.WRITE); } else { indexShard.acquireAllPrimaryOperationsPermits(future, TimeValue.timeValueHours(1L)); } @@ -901,7 +896,7 @@ public void onResponse(final Releasable releasable) { private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, ""); + indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE); return fut.get(); } @@ -913,8 +908,7 @@ private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard indexShard.getLastKnownGlobalCheckpoint(), randomNonNegativeLong(), fut, - ThreadPool.Names.WRITE, - "" + ThreadPool.Names.WRITE ); return fut.get(); } @@ -968,7 +962,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { if (shardRouting.primary() == false && Assertions.ENABLED) { AssertionError e = expectThrows( AssertionError.class, - () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, "") + () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE) ); assertThat(e, hasToString(containsString("acquirePrimaryOperationPermit should only be called on primary shard"))); @@ -1060,8 +1054,7 @@ private void finish() { newPrimaryTerm, newGlobalCheckPoint, randomNonNegativeLong(), - listener, - "" + listener ); } catch (Exception e) { listener.onFailure(e); @@ -1140,8 +1133,7 @@ public void onFailure(Exception e) { oldPrimaryTerm, indexShard.getLastKnownGlobalCheckpoint(), randomNonNegativeLong(), - onLockAcquired, - "" + onLockAcquired ); latch.await(); assertFalse(onResponse.get()); @@ -1166,8 +1158,7 @@ public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception replica.getOperationPrimaryTerm(), replica.getLastKnownGlobalCheckpoint(), newMaxSeqNoOfUpdates, - fut, - "" + fut ); try (Releasable ignored = fut.actionGet()) { assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, newMaxSeqNoOfUpdates))); @@ -1320,8 +1311,7 @@ public void onResponse(Releasable releasable) { public void onFailure(Exception e) { } - }, - "" + } ); latch.await(); @@ -1382,8 +1372,7 @@ public void onResponse(final Releasable releasable) { public void onFailure(final Exception e) { } - }, - "" + } ); latch.await(); @@ -1441,8 +1430,7 @@ public void onFailure(Exception e) { latch.countDown(); } }, - ThreadPool.Names.WRITE, - "" + ThreadPool.Names.WRITE ); }; @@ -1947,7 +1935,7 @@ public void onFailure(Exception e) { }); } - shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.WRITE, "i_" + i); + shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.WRITE); } for (final Runnable assertion : assertions) { @@ -4425,7 +4413,7 @@ public void testTypelessGet() throws IOException { } /** - * Randomizes the usage of {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)} and + * Randomizes the usage of {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String)} and * {@link IndexShard#acquireAllReplicaOperationsPermits(long, long, long, ActionListener, TimeValue)} in order to acquire a permit. */ private void randomReplicaOperationPermitAcquisition( @@ -4433,12 +4421,11 @@ private void randomReplicaOperationPermitAcquisition( final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, - final ActionListener listener, - final String info + final ActionListener listener ) { if (randomBoolean()) { final String executor = ThreadPool.Names.WRITE; - indexShard.acquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, executor, info); + indexShard.acquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, executor); } else { final TimeValue timeout = TimeValue.timeValueSeconds(30L); indexShard.acquireAllReplicaOperationsPermits(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, timeout); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 438db5f31ef71..a84dc56cc0511 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -788,7 +788,7 @@ void phase2( public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception { runPrimaryPermitsLeakTest((shard, cancellableThreads) -> { - RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads); + RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, shard, cancellableThreads); }); } @@ -797,7 +797,6 @@ public void testCancellationsDoesNotLeakPrimaryPermitsAsync() throws Exception { PlainActionFuture.get( future -> RecoverySourceHandler.runUnderPrimaryPermit( listener -> listener.onResponse(null), - "test", shard, cancellableThreads, future diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index e4c13e6b2938d..b4d0ed73c4c17 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -619,8 +619,7 @@ public void executeRetentionLeasesSyncRequestOnReplica(RetentionLeaseSyncAction. getPrimary().getLastKnownGlobalCheckpoint(), getPrimary().getMaxSeqNoOfUpdatesOrDeletes(), acquirePermitFuture, - ThreadPool.Names.SAME, - request + ThreadPool.Names.SAME ); try (Releasable ignored = acquirePermitFuture.actionGet()) { replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); @@ -793,8 +792,7 @@ public void performOn( delegatedListener.onFailure(e); } }), - ThreadPool.Names.WRITE, - replicaRequest + ThreadPool.Names.WRITE ); } @@ -885,7 +883,7 @@ private void executeShardBulkOnPrimary( } } final PlainActionFuture permitAcquiredFuture = new PlainActionFuture<>(); - primary.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME, request); + primary.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME); try (Releasable ignored = permitAcquiredFuture.actionGet()) { MappingUpdatePerformer noopMappingUpdater = (_update, _shardId, _listener1) -> {}; TransportShardBulkAction.performOnPrimary( @@ -940,8 +938,7 @@ private void executeShardBulkOnReplica( globalCheckpointOnPrimary, maxSeqNoOfUpdatesOrDeletes, permitAcquiredFuture, - ThreadPool.Names.SAME, - request + ThreadPool.Names.SAME ); final Translog.Location location; try (Releasable ignored = permitAcquiredFuture.actionGet()) { @@ -1086,8 +1083,7 @@ private void executeResyncOnReplica( globalCheckpointOnPrimary, maxSeqNoOfUpdatesOrDeletes, acquirePermitFuture, - ThreadPool.Names.SAME, - request + ThreadPool.Names.SAME ); try (Releasable ignored = acquirePermitFuture.actionGet()) { location = TransportResyncReplicationAction.performOnReplica(request, replica); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 447559137b5ba..8caa2680f5e2a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1312,17 +1312,7 @@ private void assertNoPendingIndexOperations() throws Exception { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - List operations = indexShard.getActiveOperations(); - if (operations.size() > 0) { - throw new AssertionError( - "shard " - + indexShard.shardId() - + " on node [" - + nodeAndClient.name - + "] has pending operations:\n --> " - + String.join("\n --> ", operations) - ); - } + assertEquals(0, indexShard.getActiveOperationsCount()); } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java index 4e2626a2dc6de..01168e447eeb9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java @@ -133,7 +133,7 @@ public void onFailure(Exception e) { onFailure(e); } } - }, ThreadPool.Names.SAME, request); + }, ThreadPool.Names.SAME); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 9f853edc52513..75aec302d6923 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -353,8 +353,7 @@ public void testRetryBulkShardOperations() throws Exception { followingPrimary.getLastKnownGlobalCheckpoint(), followingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), permitFuture, - ThreadPool.Names.SAME, - primaryResult + ThreadPool.Names.SAME ); try (Releasable ignored = permitFuture.get()) { TransportBulkShardOperationsAction.shardOperationOnReplica(primaryResult.replicaRequest(), replica, logger); @@ -797,7 +796,7 @@ class CcrAction extends ReplicationAction listener) { final PlainActionFuture permitFuture = new PlainActionFuture<>(); - primary.acquirePrimaryOperationPermit(permitFuture, ThreadPool.Names.SAME, request); + primary.acquirePrimaryOperationPermit(permitFuture, ThreadPool.Names.SAME); final TransportWriteAction.WritePrimaryResult ccrResult; try (Releasable ignored = permitFuture.get()) { ccrResult = TransportBulkShardOperationsAction.shardOperationOnPrimary( @@ -830,8 +829,7 @@ protected void performOnReplica(BulkShardOperationsRequest request, IndexShard r getPrimaryShard().getLastKnownGlobalCheckpoint(), getPrimaryShard().getMaxSeqNoOfUpdatesOrDeletes(), f, - ThreadPool.Names.SAME, - request + ThreadPool.Names.SAME ) ) ) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java index 00c88d9721acc..851d45a9a1f30 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java @@ -102,7 +102,7 @@ public void testDoNotFillGaps() throws Exception { releasable.close(); latch.countDown(); }, e -> { assert false : "expected no exception, but got [" + e.getMessage() + "]"; }); - indexShard.acquirePrimaryOperationPermit(actionListener, ThreadPool.Names.GENERIC, ""); + indexShard.acquirePrimaryOperationPermit(actionListener, ThreadPool.Names.GENERIC); latch.await(); assertThat(indexShard.getLocalCheckpoint(), equalTo(seqNoBeforeGap)); indexShard.refresh("test");