diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ed4a744d0a025..fc4875db09a93 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -63,6 +63,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; +import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; @@ -548,7 +549,7 @@ public void onFailure(Exception e) { } catch (final AlreadyClosedException e) { // okay, the index was deleted } - }); + }, null); } } // set this last, once we finished updating all internal state. @@ -2316,14 +2317,26 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit()); } - private void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable onBlocked) { + private void bumpPrimaryTerm(final long newPrimaryTerm, + final CheckedRunnable onBlocked, + @Nullable ActionListener combineWithAction) { assert Thread.holdsLock(mutex); - assert newPrimaryTerm > pendingPrimaryTerm; + assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null); assert operationPrimaryTerm <= pendingPrimaryTerm; final CountDownLatch termUpdated = new CountDownLatch(1); indexShardOperationPermits.asyncBlockOperations(new ActionListener() { @Override public void onFailure(final Exception e) { + try { + innerFail(e); + } finally { + if (combineWithAction != null) { + combineWithAction.onFailure(e); + } + } + } + + private void innerFail(final Exception e) { try { failShard("exception during primary term transition", e); } catch (AlreadyClosedException ace) { @@ -2333,7 +2346,8 @@ public void onFailure(final Exception e) { @Override public void onResponse(final Releasable releasable) { - try (Releasable ignored = releasable) { + final RunOnce releaseOnce = new RunOnce(releasable::close); + try { assert operationPrimaryTerm <= pendingPrimaryTerm; termUpdated.await(); // indexShardOperationPermits doesn't guarantee that async submissions are executed @@ -2343,7 +2357,17 @@ public void onResponse(final Releasable releasable) { onBlocked.run(); } } catch (final Exception e) { - onFailure(e); + if (combineWithAction == null) { + // otherwise leave it to combineWithAction to release the permit + releaseOnce.run(); + } + innerFail(e); + } finally { + if (combineWithAction != null) { + combineWithAction.onResponse(releasable); + } else { + releaseOnce.run(); + } } } }, 30, TimeUnit.MINUTES); @@ -2371,7 +2395,7 @@ public void onResponse(final Releasable releasable) { public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener onPermitAcquired, final String executorOnDelay, final Object debugInfo) { - innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, + innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, false, (listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo)); } @@ -2393,7 +2417,7 @@ public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener onPermitAcquired, final TimeValue timeout) { - innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, + innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, true, (listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit())); } @@ -2401,41 +2425,16 @@ private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener onPermitAcquired, - final Consumer> consumer) { + final boolean allowCombineOperationWithPrimaryTermUpdate, + final Consumer> operationExecutor) { verifyNotClosed(); - if (opPrimaryTerm > pendingPrimaryTerm) { - synchronized (mutex) { - if (opPrimaryTerm > pendingPrimaryTerm) { - final IndexShardState shardState = state(); - // only roll translog and update primary term if shard has made it past recovery - // Having a new primary term here means that the old primary failed and that there is a new primary, which again - // means that the master will fail this shard as all initializing shards are failed when a primary is selected - // We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint - if (shardState != IndexShardState.POST_RECOVERY && - shardState != IndexShardState.STARTED) { - throw new IndexShardNotStartedException(shardId, shardState); - } - if (opPrimaryTerm > pendingPrimaryTerm) { - bumpPrimaryTerm(opPrimaryTerm, () -> { - updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); - final long currentGlobalCheckpoint = getGlobalCheckpoint(); - final long maxSeqNo = seqNoStats().getMaxSeqNo(); - logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]", - opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo); - if (currentGlobalCheckpoint < maxSeqNo) { - resetEngineToGlobalCheckpoint(); - } else { - getEngine().rollTranslogGeneration(); - } - }); - } - } - } - } - assert opPrimaryTerm <= pendingPrimaryTerm - : "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]"; - consumer.accept(new ActionListener() { + // This listener is used for the execution of the operation. If the operation requires all the permits for its + // execution and the primary term must be updated first, we can combine the operation execution with the + // primary term update. Since indexShardOperationPermits doesn't guarantee that async submissions are executed + // in the order submitted, combining both operations ensure that the term is updated before the operation is + // executed. It also has the side effect of acquiring all the permits one time instead of two. + final ActionListener operationListener = new ActionListener() { @Override public void onResponse(final Releasable releasable) { if (opPrimaryTerm < operationPrimaryTerm) { @@ -2465,7 +2464,48 @@ public void onResponse(final Releasable releasable) { public void onFailure(final Exception e) { onPermitAcquired.onFailure(e); } - }); + }; + + if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) { + synchronized (mutex) { + if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) { + final IndexShardState shardState = state(); + // only roll translog and update primary term if shard has made it past recovery + // Having a new primary term here means that the old primary failed and that there is a new primary, which again + // means that the master will fail this shard as all initializing shards are failed when a primary is selected + // We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint + if (shardState != IndexShardState.POST_RECOVERY && + shardState != IndexShardState.STARTED) { + throw new IndexShardNotStartedException(shardId, shardState); + } + + bumpPrimaryTerm(opPrimaryTerm, () -> { + updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); + final long currentGlobalCheckpoint = getGlobalCheckpoint(); + final long maxSeqNo = seqNoStats().getMaxSeqNo(); + logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]", + opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo); + if (currentGlobalCheckpoint < maxSeqNo) { + resetEngineToGlobalCheckpoint(); + } else { + getEngine().rollTranslogGeneration(); + } + }, allowCombineOperationWithPrimaryTermUpdate ? operationListener : null); + + if (allowCombineOperationWithPrimaryTermUpdate) { + logger.debug("operation execution has been combined with primary term update"); + return; + } + } + } + } + assert opPrimaryTerm <= pendingPrimaryTerm + : "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]"; + operationExecutor.accept(operationListener); + } + + private boolean requirePrimaryTermUpdate(final long opPrimaryTerm, final boolean allPermits) { + return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > operationPrimaryTerm); } public int getActiveOperationsCount() { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 0791ead6608a3..030646e0f4b9e 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -733,7 +733,6 @@ private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard return fut.get(); } - @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850") public void testOperationPermitOnReplicaShards() throws Exception { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; @@ -1024,7 +1023,6 @@ public void testGlobalCheckpointSync() throws IOException { closeShards(replicaShard, primaryShard); } - @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850") public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); final int operations = 1024 - scaledRandomIntBetween(0, 1024); @@ -1089,7 +1087,6 @@ public void onFailure(Exception e) { closeShard(indexShard, false); } - @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850") public void testRollbackReplicaEngineOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); @@ -3600,6 +3597,67 @@ public void testResetEngine() throws Exception { closeShard(shard, false); } + public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception { + final IndexShard replica = newStartedShard(false); + indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint())); + + final int nbTermUpdates = randomIntBetween(1, 5); + + for (int i = 0; i < nbTermUpdates; i++) { + long opPrimaryTerm = replica.getOperationPrimaryTerm() + 1; + final long globalCheckpoint = replica.getGlobalCheckpoint(); + final long maxSeqNoOfUpdatesOrDeletes = replica.getMaxSeqNoOfUpdatesOrDeletes(); + + final int operations = scaledRandomIntBetween(5, 32); + final CyclicBarrier barrier = new CyclicBarrier(1 + operations); + final CountDownLatch latch = new CountDownLatch(operations); + + final Thread[] threads = new Thread[operations]; + for (int j = 0; j < operations; j++) { + threads[j] = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + replica.acquireAllReplicaOperationsPermits( + opPrimaryTerm, + globalCheckpoint, + maxSeqNoOfUpdatesOrDeletes, + new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignored = releasable) { + assertThat(replica.getPendingPrimaryTerm(), greaterThanOrEqualTo(opPrimaryTerm)); + assertThat(replica.getOperationPrimaryTerm(), equalTo(opPrimaryTerm)); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(final Exception e) { + try { + throw new RuntimeException(e); + } finally { + latch.countDown(); + } + } + }, TimeValue.timeValueMinutes(30L)); + }); + threads[j].start(); + } + barrier.await(); + latch.await(); + + for (Thread thread : threads) { + thread.join(); + } + } + + closeShard(replica, false); + } + @Override public Settings threadPoolSettings() { return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build();