diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index de5e372dd1625..1d283cbe004d0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; @@ -39,6 +40,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.function.Consumer; + public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction< TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> { @@ -109,6 +112,30 @@ private void executeShardOperation(final IndexShard indexShard) { logger.debug("{} shard is ready for closing", shardId); } + @Override + protected ReplicationOperation.Replicas newReplicasProxy(final long primaryTerm) { + return new VerifyShardBeforeCloseActionReplicasProxy(primaryTerm); + } + + /** + * A {@link ReplicasProxy} that marks as stale the shards that are unavailable during the verification + * and the flush of the shard. This is done to ensure that such shards won't be later promoted as primary + * or reopened in an unverified state with potential non flushed translog operations. + */ + class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy { + + VerifyShardBeforeCloseActionReplicasProxy(final long primaryTerm) { + super(primaryTerm); + } + + @Override + public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final Runnable onSuccess, + final Consumer onPrimaryDemoted, final Consumer onIgnoredFailure) { + shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, + createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + } + } + public static class ShardRequest extends ReplicationRequest { ShardRequest(){ diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index de0cc5dfd5a37..2764eee798e6b 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -18,27 +18,55 @@ */ package org.elasticsearch.action.admin.indices.close; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.TransportReplicationAction; +import org.elasticsearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -47,9 +75,17 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { + private static ThreadPool threadPool; + private IndexShard indexShard; private TransportVerifyShardBeforeCloseAction action; private ClusterService clusterService; + private CapturingTransport transport; + + @BeforeClass + public static void beforeClass() { + threadPool = new TestThreadPool(getTestClass().getName()); + } @Override @Before @@ -64,13 +100,32 @@ public void setUp() throws Exception { final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3)); when(indexShard.shardId()).thenReturn(shardId); - clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test")) + clusterService = createClusterService(threadPool); + setState(clusterService, new ClusterState.Builder(clusterService.state()) .blocks(ClusterBlocks.builder().addIndexBlock("index", INDEX_CLOSED_BLOCK).build()).build()); - action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, mock(TransportService.class), clusterService, - mock(IndicesService.class), mock(ThreadPool.class), mock(ShardStateAction.class), mock(ActionFilters.class), - mock(IndexNameExpressionResolver.class)); + transport = new CapturingTransport(); + TransportService transportService = transport.createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool); + action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, transportService, clusterService, mock(IndicesService.class), + mock(ThreadPool.class), shardStateAction, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class)); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; } private void executeOnPrimaryOrReplica() throws Exception { @@ -98,7 +153,7 @@ public void testOperationFailsWithOnGoingOps() { } public void testOperationFailsWithNoBlock() { - when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test")).build()); + setState(clusterService, new ClusterState.Builder(new ClusterName("test")).build()); IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); assertThat(exception.getMessage(), @@ -119,4 +174,149 @@ public void testOperationFailsWithGlobalCheckpointNotCaughtUp() { + maxSeqNo + "] on index shard " + indexShard.shardId())); verify(indexShard, times(0)).flush(any(FlushRequest.class)); } + + public void testUnavailableShardsMarkedAsStale() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + + final int nbReplicas = randomIntBetween(1, 10); + final ShardRoutingState[] replicaStates = new ShardRoutingState[nbReplicas]; + for (int i = 0; i < replicaStates.length; i++) { + replicaStates[i] = ShardRoutingState.STARTED; + } + final ClusterState clusterState = state(index, true, ShardRoutingState.STARTED, replicaStates); + setState(clusterService, clusterState); + + IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().index(index).shard(shardId.id()); + final IndexMetaData indexMetaData = clusterState.getMetaData().index(index); + final ShardRouting primaryRouting = shardRoutingTable.primaryShard(); + final long primaryTerm = indexMetaData.primaryTerm(0); + + final Set inSyncAllocationIds = indexMetaData.inSyncAllocationIds(0); + final Set trackedShards = shardRoutingTable.getAllAllocationIds(); + + List unavailableShards = randomSubsetOf(randomIntBetween(1, nbReplicas), shardRoutingTable.replicaShards()); + IndexShardRoutingTable.Builder shardRoutingTableBuilder = new IndexShardRoutingTable.Builder(shardRoutingTable); + unavailableShards.forEach(shardRoutingTableBuilder::removeShard); + shardRoutingTable = shardRoutingTableBuilder.build(); + + final ReplicationGroup replicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards); + assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0)); + + final PlainActionFuture listener = new PlainActionFuture<>(); + TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId); + ReplicationOperation.Replicas proxy = action.newReplicasProxy(primaryTerm); + ReplicationOperation operation = + new ReplicationOperation<>(request, createPrimary(primaryRouting, replicationGroup), listener, proxy, logger, "test"); + operation.execute(); + + final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests.length, equalTo(nbReplicas)); + + for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) { + final String actionName = capturedRequest.action; + if (actionName.startsWith(ShardStateAction.SHARD_FAILED_ACTION_NAME)) { + assertThat(capturedRequest.request, instanceOf(ShardStateAction.FailedShardEntry.class)); + String allocationId = ((ShardStateAction.FailedShardEntry) capturedRequest.request).getAllocationId(); + assertTrue(unavailableShards.stream().anyMatch(shardRouting -> shardRouting.allocationId().getId().equals(allocationId))); + transport.handleResponse(capturedRequest.requestId, TransportResponse.Empty.INSTANCE); + + } else if (actionName.startsWith(TransportVerifyShardBeforeCloseAction.NAME)) { + assertThat(capturedRequest.request, instanceOf(ConcreteShardRequest.class)); + String allocationId = ((ConcreteShardRequest) capturedRequest.request).getTargetAllocationID(); + assertFalse(unavailableShards.stream().anyMatch(shardRouting -> shardRouting.allocationId().getId().equals(allocationId))); + assertTrue(inSyncAllocationIds.stream().anyMatch(inSyncAllocationId -> inSyncAllocationId.equals(allocationId))); + transport.handleResponse(capturedRequest.requestId, new TransportReplicationAction.ReplicaResponse(0L, 0L)); + + } else { + fail("Test does not support action " + capturedRequest.action); + } + } + + final ReplicationResponse.ShardInfo shardInfo = listener.get().getShardInfo(); + assertThat(shardInfo.getFailed(), equalTo(0)); + assertThat(shardInfo.getFailures(), arrayWithSize(0)); + assertThat(shardInfo.getSuccessful(), equalTo(1 + nbReplicas - unavailableShards.size())); + } + + private static ReplicationOperation.Primary< + TransportVerifyShardBeforeCloseAction.ShardRequest, + TransportVerifyShardBeforeCloseAction.ShardRequest, + PrimaryResult> + createPrimary(final ShardRouting primary, final ReplicationGroup replicationGroup) { + return new ReplicationOperation.Primary< + TransportVerifyShardBeforeCloseAction.ShardRequest, + TransportVerifyShardBeforeCloseAction.ShardRequest, + PrimaryResult>() { + @Override + public ShardRouting routingEntry() { + return primary; + } + + @Override + public ReplicationGroup getReplicationGroup() { + return replicationGroup; + } + + @Override + public PrimaryResult perform(TransportVerifyShardBeforeCloseAction.ShardRequest request) throws Exception { + return new PrimaryResult(request); + } + + @Override + public void failShard(String message, Exception exception) { + + } + + @Override + public void updateLocalCheckpointForShard(String allocationId, long checkpoint) { + } + + @Override + public void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint) { + } + + @Override + public long localCheckpoint() { + return 0; + } + + @Override + public long globalCheckpoint() { + return 0; + } + + @Override + public long maxSeqNoOfUpdatesOrDeletes() { + return 0; + } + }; + } + + private static class PrimaryResult implements ReplicationOperation.PrimaryResult { + + private final TransportVerifyShardBeforeCloseAction.ShardRequest replicaRequest; + private final SetOnce shardInfo; + + private PrimaryResult(final TransportVerifyShardBeforeCloseAction.ShardRequest replicaRequest) { + this.replicaRequest = replicaRequest; + this.shardInfo = new SetOnce<>(); + } + + @Override + public TransportVerifyShardBeforeCloseAction.ShardRequest replicaRequest() { + return replicaRequest; + } + + @Override + public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) { + this.shardInfo.set(shardInfo); + } + + public ReplicationResponse.ShardInfo getShardInfo() { + return shardInfo.get(); + } + } + }