diff --git a/docs/changelog/100610.yaml b/docs/changelog/100610.yaml new file mode 100644 index 0000000000000..7423ce9225868 --- /dev/null +++ b/docs/changelog/100610.yaml @@ -0,0 +1,7 @@ +pr: 100610 +summary: Fix interruption of `markAllocationIdAsInSync` +area: Recovery +type: bug +issues: + - 96578 + - 100589 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index d3aed4a3e2bf2..f556486795c2a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -35,11 +35,15 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -70,6 +74,8 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.gateway.ReplicaShardAllocatorIT; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; @@ -85,6 +91,7 @@ import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.GlobalCheckpointListeners; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -122,7 +129,9 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -132,6 +141,7 @@ import static java.util.stream.Collectors.toList; import static org.elasticsearch.action.DocWriteResponse.Result.CREATED; import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED; +import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener; import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING; @@ -1688,6 +1698,104 @@ public void testWaitForClusterStateToBeAppliedOnSourceNode() throws Exception { } } + public void testDeleteIndexDuringFinalization() throws Exception { + internalCluster().startMasterOnlyNode(); + final var primaryNode = internalCluster().startDataOnlyNode(); + String indexName = "test-index"; + createIndex(indexName, indexSettings(1, 0).build()); + ensureGreen(indexName); + final List indexRequests = IntStream.range(0, between(10, 500)) + .mapToObj(n -> client().prepareIndex(indexName).setSource("foo", "bar")) + .toList(); + indexRandom(randomBoolean(), true, true, indexRequests); + assertThat(indicesAdmin().prepareFlush(indexName).get().getFailedShards(), equalTo(0)); + + final var replicaNode = internalCluster().startDataOnlyNode(); + + final SubscribableListener recoveryCompleteListener = new SubscribableListener<>(); + final PlainActionFuture deleteListener = new PlainActionFuture<>(); + + final var threadPool = internalCluster().clusterService().threadPool(); + + final var indexId = internalCluster().clusterService().state().routingTable().index(indexName).getIndex(); + final var primaryIndexShard = internalCluster().getInstance(IndicesService.class, primaryNode) + .indexServiceSafe(indexId) + .getShard(0); + final var globalCheckpointBeforeRecovery = primaryIndexShard.getLastSyncedGlobalCheckpoint(); + + final var replicaNodeTransportService = asInstanceOf( + MockTransportService.class, + internalCluster().getInstance(TransportService.class, replicaNode) + ); + replicaNodeTransportService.addRequestHandlingBehavior( + PeerRecoveryTargetService.Actions.TRANSLOG_OPS, + (handler, request, channel, task) -> handler.messageReceived( + request, + new TestTransportChannel(ActionTestUtils.assertNoFailureListener(response -> { + // Process the TRANSLOG_OPS response on the replica (avoiding failing it due to a concurrent delete) but + // before sending the response back send another document to the primary, advancing the GCP to prevent the replica + // being marked as in-sync (NB below we delay the replica write until after the index is deleted) + client().prepareIndex(indexName).setSource("foo", "baz").execute(ActionListener.noop()); + + primaryIndexShard.addGlobalCheckpointListener( + globalCheckpointBeforeRecovery + 1, + new GlobalCheckpointListeners.GlobalCheckpointListener() { + @Override + public Executor executor() { + return EsExecutors.DIRECT_EXECUTOR_SERVICE; + } + + @Override + public void accept(long globalCheckpoint, Exception e) { + assertNull(e); + + // Now the GCP has advanced the replica won't be marked in-sync so respond to the TRANSLOG_OPS request + // to start recovery finalization + try { + channel.sendResponse(response); + } catch (IOException ex) { + fail(ex); + } + + // Wait a short while for finalization to block on advancing the replica's GCP and then delete the index + threadPool.schedule( + () -> client().admin().indices().prepareDelete(indexName).execute(deleteListener), + TimeValue.timeValueMillis(100), + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + } + }, + TimeValue.timeValueSeconds(10) + ); + })), + task + ) + ); + + // delay the delivery of the replica write until the end of the test so the replica never becomes in-sync + replicaNodeTransportService.addRequestHandlingBehavior( + BulkAction.NAME + "[s][r]", + (handler, request, channel, task) -> recoveryCompleteListener.addListener( + assertNoFailureListener(ignored -> handler.messageReceived(request, channel, task)) + ) + ); + + // Create the replica to trigger the whole process + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + + // Wait for the index to be deleted + assertTrue(deleteListener.get(20, TimeUnit.SECONDS).isAcknowledged()); + + final var peerRecoverySourceService = internalCluster().getInstance(PeerRecoverySourceService.class, primaryNode); + assertBusy(() -> assertEquals(0, peerRecoverySourceService.numberOfOngoingRecoveries())); + recoveryCompleteListener.onResponse(null); + } + private void assertGlobalCheckpointIsStableAndSyncedInAllNodes(String indexName, List nodes, int shard) throws Exception { assertThat(nodes, is(not(empty()))); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index fc5df1a4aa282..81bc226102f62 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -33,7 +33,6 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -426,7 +425,7 @@ public void onFailure(Exception e) { } static void runUnderPrimaryPermit( - CheckedRunnable action, + Runnable action, IndexShard primary, CancellableThreads cancellableThreads, ActionListener listener @@ -1260,7 +1259,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis */ final SubscribableListener markInSyncStep = new SubscribableListener<>(); runUnderPrimaryPermit( - () -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), + () -> cancellableThreads.execute(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint)), shard, cancellableThreads, markInSyncStep diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 540ef4cf1027b..9ccfbd2e25ca6 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -114,6 +114,7 @@ import org.elasticsearch.xcontent.XContentParser.Token; import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -2036,7 +2037,17 @@ protected static boolean isTurkishLocale() { || Locale.getDefault().getLanguage().equals(new Locale("az").getLanguage()); } - public static void fail(Throwable t, String msg, Object... args) { + public static T fail(Throwable t, String msg, Object... args) { throw new AssertionError(org.elasticsearch.common.Strings.format(msg, args), t); } + + public static T fail(Throwable t) { + return fail(t, "unexpected"); + } + + @SuppressWarnings("unchecked") + public static T asInstanceOf(Class clazz, Object o) { + assertThat(o, Matchers.instanceOf(clazz)); + return (T) o; + } }