From d5b68cb9841cdd80bba24660693c2bfa40a9c7b4 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 25 Jul 2018 21:09:22 +0300 Subject: [PATCH] IndicesClusterStateService should replace an init. replica with an init. primary with the same aId In rare cases it is possible that a nodes gets an instruction to replace a replica shard that's in POST_RECOVERY with a new initializing primary with the same allocation id. This can happen by batching cluster states that include the starting of the replica, with closing of the indices, opening it up again and allocating the primary shard to the node in question. The node should then clean it's initializing replica and replace it with a new initializing primary. Closes #32308 --- .../cluster/IndicesClusterStateService.java | 6 ++ .../ClusterStateCreationUtils.java | 27 ++++++--- ...actIndicesClusterStateServiceTestCase.java | 4 ++ ...ClusterStateServiceRandomUpdatesTests.java | 56 ++++++++++++++++++- 4 files changed, 84 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 472cb04936d64..e6a86d47f55c0 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -420,6 +420,12 @@ private void removeShards(final ClusterState state) { // state may result in a new shard being initialized while having the same allocation id as the currently started shard. logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting); indexService.removeShard(shardId.id(), "removing shard (stale copy)"); + } else if (newShardRouting.primary() && currentRoutingEntry.primary() == false && newShardRouting.initializing()) { + assert currentRoutingEntry.initializing() : currentRoutingEntry; // see above if clause + // this can happen when cluster state batching batches activation of the shard, closing an index, reopening it + // and assigning an initializing primary to this node + logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting); + indexService.removeShard(shardId.id(), "removing shard (stale copy)"); } } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index 59ede535c2f39..60053748d68c9 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -27,10 +27,12 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable.Builder; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; @@ -44,6 +46,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; @@ -93,7 +96,8 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder() .put(SETTING_VERSION_CREATED, Version.CURRENT) .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) - .put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm).build(); + .put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm) + .build(); RoutingTable.Builder routing = new RoutingTable.Builder(); routing.addAsNew(indexMetaData); @@ -138,12 +142,19 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard TestShardRouting.newShardRouting(index, shardId.id(), replicaNode, relocatingNode, false, replicaState, unassignedInfo)); } + final IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build(); + + IndexMetaData.Builder indexMetaDataBuilder = new IndexMetaData.Builder(indexMetaData); + indexMetaDataBuilder.putInSyncAllocationIds(0, + indexShardRoutingTable.activeShards().stream().map(ShardRouting::allocationId).map(AllocationId::getId) + .collect(Collectors.toSet()) + ); ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); state.nodes(discoBuilder); - state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded()); + state.metaData(MetaData.builder().put(indexMetaDataBuilder.build(), false).generateClusterUuidIfNeeded()); state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(indexMetaData.getIndex()) - .addIndexShard(indexShardRoutingBuilder.build())).build()); + .addIndexShard(indexShardRoutingTable)).build()); return state.build(); } @@ -272,21 +283,21 @@ public static ClusterState stateWithAssignedPrimariesAndOneReplica(String index, state.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build()); return state.build(); } - - + + /** * Creates cluster state with several indexes, shards and replicas and all shards STARTED. */ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indices, int numberOfShards, int numberOfReplicas) { - int numberOfDataNodes = numberOfReplicas + 1; + int numberOfDataNodes = numberOfReplicas + 1; DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); for (int i = 0; i < numberOfDataNodes + 1; i++) { final DiscoveryNode node = newNode(i); discoBuilder = discoBuilder.add(node); } discoBuilder.localNodeId(newNode(0).getId()); - discoBuilder.masterNodeId(newNode(numberOfDataNodes + 1).getId()); + discoBuilder.masterNodeId(newNode(numberOfDataNodes + 1).getId()); ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); state.nodes(discoBuilder); Builder routingTableBuilder = RoutingTable.builder(); @@ -316,7 +327,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice state.metaData(metadataBuilder); state.routingTable(routingTableBuilder.build()); return state.build(); - } + } /** * Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas. diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 5c6b000f7e519..5e9ba653d40e6 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -74,6 +74,10 @@ public void injectRandomFailures() { enableRandomFailures = randomBoolean(); } + protected void disalbeRandomFailures() { + enableRandomFailures = false; + } + protected void failRandomly() { if (enableRandomFailures && rarely()) { throw new RuntimeException("dummy test failure"); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 5611421594aa1..ae9b7ea06a418 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.threadpool.TestThreadPool; @@ -75,6 +76,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -97,7 +99,6 @@ public void tearDown() throws Exception { terminate(threadPool); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32308") public void testRandomClusterStateUpdates() { // we have an IndicesClusterStateService per node in the cluster final Map clusterStateServiceMap = new HashMap<>(); @@ -199,6 +200,59 @@ public void testJoiningNewClusterOnlyRemovesInMemoryIndexStructures() { } } + /** + * In rare cases it is possible that a nodes gets an instruction to replace a replica + * shard that's in POST_RECOVERY with a new initializing primary with the same allocation id. + * This can happen by batching cluster states that include the starting of the replica, with + * closing of the indices, opening it up again and allocating the primary shard to the node in + * question. The node should then clean it's initializing replica and replace it with a new + * initializing primary. + */ + public void testInitializingPrimaryRemovesInitializingReplicaWithSameAID() { + disalbeRandomFailures(); + String index = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT); + ClusterState state = ClusterStateCreationUtils.state(index, randomBoolean(), + ShardRoutingState.STARTED, ShardRoutingState.INITIALIZING); + + // the initial state which is derived from the newly created cluster state but doesn't contain the index + ClusterState previousState = ClusterState.builder(state) + .metaData(MetaData.builder(state.metaData()).remove(index)) + .routingTable(RoutingTable.builder().build()) + .build(); + + // pick a data node to simulate the adding an index cluster state change event on, that has shards assigned to it + final ShardRouting shardRouting = state.routingTable().index(index).shard(0).replicaShards().get(0); + final ShardId shardId = shardRouting.shardId(); + DiscoveryNode node = state.nodes().get(shardRouting.currentNodeId()); + + // simulate the cluster state change on the node + ClusterState localState = adaptClusterStateToLocalNode(state, node); + ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node); + IndicesClusterStateService indicesCSSvc = createIndicesClusterStateService(node, RecordingIndicesService::new); + indicesCSSvc.start(); + indicesCSSvc.applyClusterState(new ClusterChangedEvent("cluster state change that adds the index", localState, previousLocalState)); + previousState = state; + + // start the replica + state = cluster.applyStartedShards(state, state.routingTable().index(index).shard(0).replicaShards()); + + // close the index and open it up again (this will sometimes swap roles between primary and replica) + CloseIndexRequest closeIndexRequest = new CloseIndexRequest(state.metaData().index(index).getIndex().getName()); + state = cluster.closeIndices(state, closeIndexRequest); + OpenIndexRequest openIndexRequest = new OpenIndexRequest(state.metaData().index(index).getIndex().getName()); + state = cluster.openIndices(state, openIndexRequest); + + localState = adaptClusterStateToLocalNode(state, node); + previousLocalState = adaptClusterStateToLocalNode(previousState, node); + + indicesCSSvc.applyClusterState(new ClusterChangedEvent("new cluster state", localState, previousLocalState)); + + final MockIndexShard shardOrNull = ((RecordingIndicesService) indicesCSSvc.indicesService).getShardOrNull(shardId); + assertThat(shardOrNull == null ? null : shardOrNull.routingEntry(), + equalTo(state.getRoutingNodes().node(node.getId()).getByShardId(shardId))); + + } + public ClusterState randomInitialClusterState(Map clusterStateServiceMap, Supplier indicesServiceSupplier) { List allNodes = new ArrayList<>();