Skip to content

Commit

Permalink
Omit writing index metadata for non-replicated closed indices on data…
Browse files Browse the repository at this point in the history
…-only node (#47285)

Fixes a bug related to how "closed replicated indices" (introduced in 7.2) interact with the index
metadata storage mechanism, which has special handling for closed indices (but incorrectly
handles replicated closed indices). On non-master-eligible data nodes, it's possible for the
node's manifest file (which tracks the relevant metadata state that the node should persist) to
become out of sync with what's actually stored on disk, leading to an inconsistency that is then
detected at startup, refusing for the node to start up.

Closes #47276
  • Loading branch information
ywelsch committed Sep 30, 2019
1 parent d9a7bce commit 4675968
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) t
private Map<Index, Long> writeIndicesMetadata(AtomicClusterStateWriter writer, ClusterState newState, ClusterState previousState)
throws WriteStateException {
Map<Index, Long> previouslyWrittenIndices = previousManifest.getIndexGenerations();
Set<Index> relevantIndices = getRelevantIndices(newState, previousState, previouslyWrittenIndices.keySet());
Set<Index> relevantIndices = getRelevantIndices(newState);

Map<Index, Long> newIndices = new HashMap<>();

Expand Down Expand Up @@ -207,8 +207,7 @@ static List<IndexMetaDataAction> resolveIndexMetaDataActions(Map<Index, Long> pr
return actions;
}

private static Set<Index> getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, Set<Index>
previouslyWrittenIndices) {
private static Set<Index> getRelevantIndicesOnDataOnlyNode(ClusterState state) {
RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
if (newRoutingNode == null) {
throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state");
Expand All @@ -217,20 +216,6 @@ private static Set<Index> getRelevantIndicesOnDataOnlyNode(ClusterState state, C
for (ShardRouting routing : newRoutingNode) {
indices.add(routing.index());
}
// we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if
// we have it written on disk previously
for (IndexMetaData indexMetaData : state.metaData()) {
boolean isOrWasClosed = indexMetaData.getState().equals(IndexMetaData.State.CLOSE);
// if the index is open we might still have to write the state if it just transitioned from closed to open
// so we have to check for that as well.
IndexMetaData previousMetaData = previousState.metaData().index(indexMetaData.getIndex());
if (previousMetaData != null) {
isOrWasClosed = isOrWasClosed || previousMetaData.getState().equals(IndexMetaData.State.CLOSE);
}
if (previouslyWrittenIndices.contains(indexMetaData.getIndex()) && isOrWasClosed) {
indices.add(indexMetaData.getIndex());
}
}
return indices;
}

Expand All @@ -244,20 +229,14 @@ private static Set<Index> getRelevantIndicesForMasterEligibleNode(ClusterState s
}

// exposed for tests
static Set<Index> getRelevantIndices(ClusterState state, ClusterState previousState, Set<Index> previouslyWrittenIndices) {
Set<Index> relevantIndices;
if (isDataOnlyNode(state)) {
relevantIndices = getRelevantIndicesOnDataOnlyNode(state, previousState, previouslyWrittenIndices);
} else if (state.nodes().getLocalNode().isMasterNode()) {
relevantIndices = getRelevantIndicesForMasterEligibleNode(state);
static Set<Index> getRelevantIndices(ClusterState state) {
if (state.nodes().getLocalNode().isMasterNode()) {
return getRelevantIndicesForMasterEligibleNode(state);
} else if (state.nodes().getLocalNode().isDataNode()) {
return getRelevantIndicesOnDataOnlyNode(state);
} else {
relevantIndices = Collections.emptySet();
return Collections.emptySet();
}
return relevantIndices;
}

private static boolean isDataOnlyNode(ClusterState state) {
return state.nodes().getLocalNode().isMasterNode() == false && state.nodes().getLocalNode().isDataNode();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -73,17 +74,6 @@

public class IncrementalClusterStateWriterTests extends ESAllocationTestCase {

private ClusterState noIndexClusterState(boolean masterEligible) {
MetaData metaData = MetaData.builder().build();
RoutingTable routingTable = RoutingTable.builder().build();

return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData)
.routingTable(routingTable)
.nodes(generateDiscoveryNodes(masterEligible))
.build();
}

private ClusterState clusterStateWithUnassignedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
MetaData metaData = MetaData.builder()
.put(indexMetaData, false)
Expand Down Expand Up @@ -119,7 +109,7 @@ private ClusterState clusterStateWithAssignedIndex(IndexMetaData indexMetaData,
.metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build();
}

private ClusterState clusterStateWithClosedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
private ClusterState clusterStateWithNonReplicatedClosedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
ClusterState oldClusterState = clusterStateWithAssignedIndex(indexMetaData, masterEligible);

MetaData metaDataNewClusterState = MetaData.builder()
Expand All @@ -128,23 +118,41 @@ private ClusterState clusterStateWithClosedIndex(IndexMetaData indexMetaData, bo
.version(oldClusterState.metaData().version() + 1)
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaDataNewClusterState.index("test"))
.addAsRecovery(metaDataNewClusterState.index("test"))
.build();

return ClusterState.builder(oldClusterState).routingTable(routingTable)
.metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build();
}

private ClusterState clusterStateWithJustOpenedIndex(IndexMetaData indexMetaData, boolean masterEligible) {
ClusterState oldClusterState = clusterStateWithClosedIndex(indexMetaData, masterEligible);
private ClusterState clusterStateWithReplicatedClosedIndex(IndexMetaData indexMetaData, boolean masterEligible, boolean assigned) {
ClusterState oldClusterState = clusterStateWithAssignedIndex(indexMetaData, masterEligible);

MetaData metaDataNewClusterState = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.OPEN)
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
.put(MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey(), true))
.state(IndexMetaData.State.CLOSE)
.numberOfShards(5).numberOfReplicas(2))
.version(oldClusterState.metaData().version() + 1)
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsRecovery(metaDataNewClusterState.index("test"))
.build();

oldClusterState = ClusterState.builder(oldClusterState).routingTable(routingTable)
.metaData(metaDataNewClusterState).build();
if (assigned) {
AllocationService strategy = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 100)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 100)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 100)
.build());

routingTable = strategy.reroute(oldClusterState, "reroute").routingTable();
}

return ClusterState.builder(oldClusterState)
return ClusterState.builder(oldClusterState).routingTable(routingTable)
.metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build();
}

Expand All @@ -154,14 +162,6 @@ private DiscoveryNodes.Builder generateDiscoveryNodes(boolean masterEligible) {
.add(newNode("master_node", MASTER_DATA_ROLES)).localNodeId("node1").masterNodeId(masterEligible ? "node1" : "master_node");
}

private Set<Index> randomPrevWrittenIndices(IndexMetaData indexMetaData) {
if (randomBoolean()) {
return Collections.singleton(indexMetaData.getIndex());
} else {
return Collections.emptySet();
}
}

private IndexMetaData createIndexMetaData(String name) {
return IndexMetaData.builder(name).
settings(settings(Version.CURRENT)).
Expand All @@ -172,56 +172,41 @@ private IndexMetaData createIndexMetaData(String name) {

public void testGetRelevantIndicesWithUnassignedShardsOnMasterEligibleNode() {
IndexMetaData indexMetaData = createIndexMetaData("test");
Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(
clusterStateWithUnassignedIndex(indexMetaData, true),
noIndexClusterState(true),
randomPrevWrittenIndices(indexMetaData));
Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(clusterStateWithUnassignedIndex(indexMetaData, true));
assertThat(indices.size(), equalTo(1));
}

public void testGetRelevantIndicesWithUnassignedShardsOnDataOnlyNode() {
IndexMetaData indexMetaData = createIndexMetaData("test");
Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(
clusterStateWithUnassignedIndex(indexMetaData, false),
noIndexClusterState(false),
randomPrevWrittenIndices(indexMetaData));
Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(clusterStateWithUnassignedIndex(indexMetaData, false));
assertThat(indices.size(), equalTo(0));
}

public void testGetRelevantIndicesWithAssignedShards() {
IndexMetaData indexMetaData = createIndexMetaData("test");
boolean masterEligible = randomBoolean();
Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(
clusterStateWithAssignedIndex(indexMetaData, masterEligible),
clusterStateWithUnassignedIndex(indexMetaData, masterEligible),
randomPrevWrittenIndices(indexMetaData));
Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(clusterStateWithAssignedIndex(indexMetaData, masterEligible));
assertThat(indices.size(), equalTo(1));
}

public void testGetRelevantIndicesForClosedPrevWrittenIndexOnDataOnlyNode() {
public void testGetRelevantIndicesForNonReplicatedClosedIndexOnDataOnlyNode() {
IndexMetaData indexMetaData = createIndexMetaData("test");
Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(
clusterStateWithClosedIndex(indexMetaData, false),
clusterStateWithAssignedIndex(indexMetaData, false),
Collections.singleton(indexMetaData.getIndex()));
assertThat(indices.size(), equalTo(1));
clusterStateWithNonReplicatedClosedIndex(indexMetaData, false));
assertThat(indices.size(), equalTo(0));
}

public void testGetRelevantIndicesForClosedPrevNotWrittenIndexOnDataOnlyNode() {
public void testGetRelevantIndicesForReplicatedClosedButUnassignedIndexOnDataOnlyNode() {
IndexMetaData indexMetaData = createIndexMetaData("test");
Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(
clusterStateWithJustOpenedIndex(indexMetaData, false),
clusterStateWithClosedIndex(indexMetaData, false),
Collections.emptySet());
clusterStateWithReplicatedClosedIndex(indexMetaData, false, false));
assertThat(indices.size(), equalTo(0));
}

public void testGetRelevantIndicesForWasClosedPrevWrittenIndexOnDataOnlyNode() {
public void testGetRelevantIndicesForReplicatedClosedAndAssignedIndexOnDataOnlyNode() {
IndexMetaData indexMetaData = createIndexMetaData("test");
Set<Index> indices = IncrementalClusterStateWriter.getRelevantIndices(
clusterStateWithJustOpenedIndex(indexMetaData, false),
clusterStateWithClosedIndex(indexMetaData, false),
Collections.singleton(indexMetaData.getIndex()));
clusterStateWithReplicatedClosedIndex(indexMetaData, false, true));
assertThat(indices.size(), equalTo(1));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,31 @@ public Settings onNodeStopped(String nodeName) throws Exception {
}
}

/**
* Test for https://github.com/elastic/elasticsearch/issues/47276 which checks that the persisted metadata on a data node does not
* become inconsistent when using replicated closed indices.
*/
public void testRelocatedClosedIndexIssue() throws Exception {
final String indexName = "closed-index";
final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
// allocate shard to first data node
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.include._name", dataNodes.get(0))
.build());
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
assertAcked(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(ActiveShardCount.ALL));
// move single shard to second node
client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder()
.put("index.routing.allocation.include._name", dataNodes.get(1))).get();
ensureGreen(indexName);
internalCluster().fullRestart();
assertIndexIsClosed(indexName);
ensureGreen(indexName);
}

public void testResyncPropagatePrimaryTerm() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
final String indexName = "closed_indices_promotion";
Expand Down

0 comments on commit 4675968

Please sign in to comment.