From 3b45d0ab53bf861cbbd6a85ae345d599a4bdb999 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 12 Oct 2021 13:12:06 -0600 Subject: [PATCH] Do not remove flood block from indices on nodes undergoing replacement (#78942) (#79008) This commit enhances `DiskThresholdMonitor` so that indices that have a flood-stage block will not have the block removed while they reside on a node that is part of a "REPLACE"-type node shutdown. This prevents a situation where a node is blocked due to disk usage, then during the replacement the block is removed while shards are relocating to the target node, indexing occurs, and then the target runs out of space due to the additional documents. Relates to #70338 and #76247 # Conflicts: # server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java --- .../allocation/DiskThresholdMonitor.java | 27 ++- .../allocation/DiskThresholdMonitorTests.java | 158 +++++++++++++++++- 2 files changed, 181 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 1b3f7d126b14f..6d51e518cc967 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RoutingNode; @@ -38,6 +39,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.Index; import java.util.ArrayList; import java.util.HashSet; @@ -49,6 +51,8 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** * Listens for a node to go over the high watermark and kicks off an empty @@ -314,10 +318,29 @@ public void onNewInfo(ClusterInfo info) { logger.trace("no reroute required"); listener.onResponse(null); } - final Set indicesToAutoRelease = state.routingTable().indicesRouting().stream() - .map(Map.Entry::getKey) + + // Generate a map of node name to ID so we can use it to look up node replacement targets + final Map nodeNameToId = StreamSupport.stream(state.getRoutingNodes().spliterator(), false) + .collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2)); + + // Calculate both the source node id and the target node id of a "replace" type shutdown + final Set nodesIdsPartOfReplacement = state.metadata().nodeShutdowns().values().stream() + .filter(meta -> meta.getType() == SingleNodeShutdownMetadata.Type.REPLACE) + .flatMap(meta -> Stream.of(meta.getNodeId(), nodeNameToId.get(meta.getTargetNodeName()))) + .collect(Collectors.toSet()); + + // Generate a set of all the indices that exist on either the target or source of a node replacement + final Set indicesOnReplaceSourceOrTarget = nodesIdsPartOfReplacement.stream() + .flatMap(nodeId -> state.getRoutingNodes().node(nodeId).copyShards().stream() + .map(ShardRouting::index) + .map(Index::getName)) + .collect(Collectors.toSet()); + + final Set indicesToAutoRelease = state.routingTable().indicesRouting().keySet().stream() .filter(index -> indicesNotToAutoRelease.contains(index) == false) .filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)) + // Do not auto release indices that are on either the source or the target of a node replacement + .filter(index -> indicesOnReplaceSourceOrTarget.contains(index) == false) .collect(Collectors.toSet()); if (indicesToAutoRelease.isEmpty() == false) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index e0dedd9c1b986..b726867e063c3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -21,6 +21,8 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -411,6 +413,154 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener assertNull(indicesToRelease.get()); } + public void testNoAutoReleaseOfIndicesOnReplacementNodes() { + AtomicReference> indicesToMarkReadOnly = new AtomicReference<>(); + AtomicReference> indicesToRelease = new AtomicReference<>(); + AtomicReference currentClusterState = new AtomicReference<>(); + AllocationService allocation = createAllocationService(Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test_1").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1)) + .put(IndexMetadata.builder("test_2").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metadata.index("test_1")) + .addAsNew(metadata.index("test_2")) + .build(); + final ClusterState clusterState = applyStartedShardsUntilNoChange( + ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata).routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNormalNode("node1", "my-node1")) + .add(newNormalNode("node2", "my-node2"))).build(), allocation); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(8)); + + final ImmutableOpenMap.Builder reservedSpacesBuilder + = ImmutableOpenMap.builder(); + final int reservedSpaceNode1 = between(0, 10); + reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node1", "/foo/bar"), + new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode1).build()); + final int reservedSpaceNode2 = between(0, 10); + reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node2", "/foo/bar"), + new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode2).build()); + ImmutableOpenMap reservedSpaces = reservedSpacesBuilder.build(); + + currentClusterState.set(clusterState); + + final DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, currentClusterState::get, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L, + (reason, priority, listener) -> { + assertNotNull(listener); + assertThat(priority, equalTo(Priority.HIGH)); + listener.onResponse(currentClusterState.get()); + }) { + @Override + protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { + if (readOnly) { + assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate)); + } else { + assertTrue(indicesToRelease.compareAndSet(null, indicesToUpdate)); + } + listener.onResponse(null); + } + }; + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4))); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); + assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indicesToMarkReadOnly.get()); + assertNull(indicesToRelease.get()); + + // Reserved space is ignored when applying block + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 90))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(5, 90))); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); + assertNull(indicesToMarkReadOnly.get()); + assertNull(indicesToRelease.get()); + + // Change cluster state so that "test_2" index is blocked (read only) + IndexMetadata indexMetadata = IndexMetadata.builder(clusterState.metadata().index("test_2")).settings(Settings.builder() + .put(clusterState.metadata() + .index("test_2").getSettings()) + .put(IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true)).build(); + + final String sourceNode; + final String targetNode; + if (randomBoolean()) { + sourceNode = "node1"; + targetNode = "my-node2"; + } else { + sourceNode = "node2"; + targetNode = "my-node1"; + } + + final ClusterState clusterStateWithBlocks = ClusterState.builder(clusterState) + .metadata(Metadata.builder(clusterState.metadata()) + .put(indexMetadata, true) + .putCustom(NodesShutdownMetadata.TYPE, + new NodesShutdownMetadata(Collections.singletonMap(sourceNode, + SingleNodeShutdownMetadata.builder() + .setNodeId(sourceNode) + .setReason("testing") + .setType(SingleNodeShutdownMetadata.Type.REPLACE) + .setTargetNodeName(targetNode) + .setStartedAtMillis(randomNonNegativeLong()) + .build()))) + .build()) + .blocks(ClusterBlocks.builder().addBlocks(indexMetadata).build()) + .build(); + + assertTrue(clusterStateWithBlocks.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); + + currentClusterState.set(clusterStateWithBlocks); + + // When free disk on any of node1 or node2 goes below 5% flood watermark, then apply index block on indices not having the block + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 100))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4))); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); + assertThat(indicesToMarkReadOnly.get(), contains("test_1")); + assertNull(indicesToRelease.get()); + + // While the REPLACE is ongoing the lock will not be removed from the index + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100))); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); + assertNull(indicesToMarkReadOnly.get()); + assertNull(indicesToRelease.get()); + + final ClusterState clusterStateNoShutdown = ClusterState.builder(clusterState) + .metadata(Metadata.builder(clusterState.metadata()) + .put(indexMetadata, true) + .removeCustom(NodesShutdownMetadata.TYPE) + .build()) + .blocks(ClusterBlocks.builder().addBlocks(indexMetadata).build()) + .build(); + + assertTrue(clusterStateNoShutdown.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); + + currentClusterState.set(clusterStateNoShutdown); + + // Now that the REPLACE is gone, auto-releasing can occur for the index + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100))); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); + assertNull(indicesToMarkReadOnly.get()); + assertThat(indicesToRelease.get(), contains("test_2")); + } + @TestLogging(value="org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor:INFO", reason="testing INFO/WARN logging") public void testDiskMonitorLogging() throws IllegalAccessException { final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) @@ -657,14 +807,18 @@ private static DiscoveryNode newFrozenOnlyNode(String nodeId) { Sets.union(org.elasticsearch.core.Set.of(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE), irrelevantRoles)); } - private static DiscoveryNode newNormalNode(String nodeId) { + private static DiscoveryNode newNormalNode(String nodeId, String nodeName) { Set randomRoles = new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES)); Set roles = Sets.union(randomRoles, org.elasticsearch.core.Set.of(randomFrom(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE, DiscoveryNodeRole.DATA_HOT_NODE_ROLE, DiscoveryNodeRole.DATA_WARM_NODE_ROLE, DiscoveryNodeRole.DATA_COLD_NODE_ROLE))); - return newNode(nodeId, roles); + return newNode(nodeName, nodeId, roles); + } + + private static DiscoveryNode newNormalNode(String nodeId) { + return newNormalNode(nodeId, ""); } // java 11 forward compatibility