Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IndicesClusterStateService should replace an init. replica with an init. primary with the same aId #32374

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,12 @@ private void removeShards(final ClusterState state) {
// state may result in a new shard being initialized while having the same allocation id as the currently started shard.
logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting);
indexService.removeShard(shardId.id(), "removing shard (stale copy)");
} else if (newShardRouting.primary() && currentRoutingEntry.primary() == false && newShardRouting.initializing()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're over-special-casing this, while also possibly missing some more murky cases. May I suggest the following alternative instead, which handles both this case as well as the previous (odd) one:

diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index e6a86d47f55..42a88383937 100644
--- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -219,10 +219,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
 
         removeUnallocatedIndices(event); // also removes shards of removed indices
 
-        failMissingShards(state);
-
         removeShards(state);   // removes any local shards that doesn't match what the master expects
 
+        failMissingShards(state);
+
         updateIndices(event); // can also fail shards, but these are then guaranteed to be in failedShardsCache
 
         createIndices(state);
@@ -414,17 +414,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
                     logger.debug("{} removing shard (stale allocation id, stale {}, new {})", shardId,
                         currentRoutingEntry, newShardRouting);
                     indexService.removeShard(shardId.id(), "removing shard (stale copy)");
-                } else if (newShardRouting.initializing() && currentRoutingEntry.active()) {
+                } else if (newShardRouting.primary() &&
+                    state.metaData().index(shardId.getIndex()).primaryTerm(shardId.id()) != shard.getPrimaryTerm()) {
                     // this can happen if the node was isolated/gc-ed, rejoins the cluster and a new shard with the same allocation id
                     // is assigned to it. Batch cluster state processing or if shard fetching completes before the node gets a new cluster
                     // state may result in a new shard being initialized while having the same allocation id as the currently started shard.
-                    logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting);
-                    indexService.removeShard(shardId.id(), "removing shard (stale copy)");
-                } else if (newShardRouting.primary() && currentRoutingEntry.primary() == false && newShardRouting.initializing()) {
-                    assert currentRoutingEntry.initializing() : currentRoutingEntry; // see above if clause
-                    // this can happen when cluster state batching batches activation of the shard, closing an index, reopening it
-                    // and assigning an initializing primary to this node
-                    logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting);
+                    logger.debug("{} removing shard (stale primary term)", shardId);
                     indexService.removeShard(shardId.id(), "removing shard (stale copy)");
                 }
             }
@@ -728,6 +723,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
          */
         ShardRouting routingEntry();
 
+        /**
+         * Returns the current primary term of this shard
+         */
+        long getPrimaryTerm();
+
         /**
          * Returns the latest internal shard state.
          */
diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
index 5e9ba653d40..bfcca1946a6 100644
--- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
+++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
@@ -383,6 +383,11 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
             return shardRouting;
         }
 
+        @Override
+        public long getPrimaryTerm() {
+            return term;
+        }
+
         @Override
         public IndexShardState state() {
             return null;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion, we agreed this is not quite right. We decided to go with the initial condition proposed here.

assert currentRoutingEntry.initializing() : currentRoutingEntry; // see above if clause
// this can happen when cluster state batching batches activation of the shard, closing an index, reopening it
// and assigning an initializing primary to this node
logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting);
indexService.removeShard(shardId.id(), "removing shard (stale copy)");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable.Builder;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
Expand All @@ -44,6 +46,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
Expand Down Expand Up @@ -93,7 +96,8 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard
IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm).build();
.put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm)
.build();

RoutingTable.Builder routing = new RoutingTable.Builder();
routing.addAsNew(indexMetaData);
Expand Down Expand Up @@ -138,12 +142,19 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard
TestShardRouting.newShardRouting(index, shardId.id(), replicaNode, relocatingNode, false, replicaState,
unassignedInfo));
}
final IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build();

IndexMetaData.Builder indexMetaDataBuilder = new IndexMetaData.Builder(indexMetaData);
indexMetaDataBuilder.putInSyncAllocationIds(0,
indexShardRoutingTable.activeShards().stream().map(ShardRouting::allocationId).map(AllocationId::getId)
.collect(Collectors.toSet())
);

ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded());
state.metaData(MetaData.builder().put(indexMetaDataBuilder.build(), false).generateClusterUuidIfNeeded());
state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(indexMetaData.getIndex())
.addIndexShard(indexShardRoutingBuilder.build())).build());
.addIndexShard(indexShardRoutingTable)).build());
return state.build();
}

Expand Down Expand Up @@ -272,21 +283,21 @@ public static ClusterState stateWithAssignedPrimariesAndOneReplica(String index,
state.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build());
return state.build();
}


/**
* Creates cluster state with several indexes, shards and replicas and all shards STARTED.
*/
public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indices, int numberOfShards, int numberOfReplicas) {

int numberOfDataNodes = numberOfReplicas + 1;
int numberOfDataNodes = numberOfReplicas + 1;
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
for (int i = 0; i < numberOfDataNodes + 1; i++) {
final DiscoveryNode node = newNode(i);
discoBuilder = discoBuilder.add(node);
}
discoBuilder.localNodeId(newNode(0).getId());
discoBuilder.masterNodeId(newNode(numberOfDataNodes + 1).getId());
discoBuilder.masterNodeId(newNode(numberOfDataNodes + 1).getId());
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
Builder routingTableBuilder = RoutingTable.builder();
Expand Down Expand Up @@ -316,7 +327,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice
state.metaData(metadataBuilder);
state.routingTable(routingTableBuilder.build());
return state.build();
}
}

/**
* Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public void injectRandomFailures() {
enableRandomFailures = randomBoolean();
}

protected void disableRandomFailures() {
enableRandomFailures = false;
}

protected void failRandomly() {
if (enableRandomFailures && rarely()) {
throw new RuntimeException("dummy test failure");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.threadpool.TestThreadPool;
Expand All @@ -75,6 +76,7 @@
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -97,7 +99,6 @@ public void tearDown() throws Exception {
terminate(threadPool);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32308")
public void testRandomClusterStateUpdates() {
// we have an IndicesClusterStateService per node in the cluster
final Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap = new HashMap<>();
Expand Down Expand Up @@ -199,6 +200,59 @@ public void testJoiningNewClusterOnlyRemovesInMemoryIndexStructures() {
}
}

/**
* In rare cases it is possible that a nodes gets an instruction to replace a replica
* shard that's in POST_RECOVERY with a new initializing primary with the same allocation id.
* This can happen by batching cluster states that include the starting of the replica, with
* closing of the indices, opening it up again and allocating the primary shard to the node in
* question. The node should then clean it's initializing replica and replace it with a new
* initializing primary.
*/
public void testInitializingPrimaryRemovesInitializingReplicaWithSameAID() {
disableRandomFailures();
String index = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT);
ClusterState state = ClusterStateCreationUtils.state(index, randomBoolean(),
ShardRoutingState.STARTED, ShardRoutingState.INITIALIZING);

// the initial state which is derived from the newly created cluster state but doesn't contain the index
ClusterState previousState = ClusterState.builder(state)
.metaData(MetaData.builder(state.metaData()).remove(index))
.routingTable(RoutingTable.builder().build())
.build();

// pick a data node to simulate the adding an index cluster state change event on, that has shards assigned to it
final ShardRouting shardRouting = state.routingTable().index(index).shard(0).replicaShards().get(0);
final ShardId shardId = shardRouting.shardId();
DiscoveryNode node = state.nodes().get(shardRouting.currentNodeId());

// simulate the cluster state change on the node
ClusterState localState = adaptClusterStateToLocalNode(state, node);
ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node);
IndicesClusterStateService indicesCSSvc = createIndicesClusterStateService(node, RecordingIndicesService::new);
indicesCSSvc.start();
indicesCSSvc.applyClusterState(new ClusterChangedEvent("cluster state change that adds the index", localState, previousLocalState));
previousState = state;

// start the replica
state = cluster.applyStartedShards(state, state.routingTable().index(index).shard(0).replicaShards());

// close the index and open it up again (this will sometimes swap roles between primary and replica)
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(state.metaData().index(index).getIndex().getName());
state = cluster.closeIndices(state, closeIndexRequest);
OpenIndexRequest openIndexRequest = new OpenIndexRequest(state.metaData().index(index).getIndex().getName());
state = cluster.openIndices(state, openIndexRequest);

localState = adaptClusterStateToLocalNode(state, node);
previousLocalState = adaptClusterStateToLocalNode(previousState, node);

indicesCSSvc.applyClusterState(new ClusterChangedEvent("new cluster state", localState, previousLocalState));

final MockIndexShard shardOrNull = ((RecordingIndicesService) indicesCSSvc.indicesService).getShardOrNull(shardId);
assertThat(shardOrNull == null ? null : shardOrNull.routingEntry(),
equalTo(state.getRoutingNodes().node(node.getId()).getByShardId(shardId)));

}

public ClusterState randomInitialClusterState(Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap,
Supplier<MockIndicesService> indicesServiceSupplier) {
List<DiscoveryNode> allNodes = new ArrayList<>();
Expand Down