diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 839d7dbe80c84..961a3b6032177 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -19,8 +19,6 @@ package org.elasticsearch.cluster.routing.allocation.decider; -import java.util.Set; - import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -42,6 +40,8 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import java.util.Set; + import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING; import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING; @@ -138,12 +138,24 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing // subtractLeavingShards is passed as false here, because they still use disk space, and therefore should we should be extra careful // and take the size into account - DiskUsage usage = getDiskUsage(node, allocation, usages, false); + final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, false); // First, check that the node currently over the low watermark double freeDiskPercentage = usage.getFreeDiskAsPercentage(); // Cache the used disk percentage for displaying disk percentages consistent with documentation double usedDiskPercentage = usage.getUsedDiskAsPercentage(); long freeBytes = usage.getFreeBytes(); + if (freeBytes < 0L) { + final long sizeOfRelocatingShards = sizeOfRelocatingShards(node, allocation, false, usage.getPath()); + logger.debug("fewer free bytes remaining than the size of all incoming shards: " + + "usage {} on node {} including {} bytes of relocations, preventing allocation", + usage, node.nodeId(), sizeOfRelocatingShards); + + return allocation.decision(Decision.NO, NAME, + "the node has fewer free bytes remaining than the total size of all incoming shards: " + + "free space [%sB], relocating shards [%sB]", + freeBytes + sizeOfRelocatingShards, sizeOfRelocatingShards); + } + ByteSizeValue freeBytesValue = new ByteSizeValue(freeBytes); if (logger.isTraceEnabled()) { logger.trace("node [{}] has {}% used disk", node.nodeId(), usedDiskPercentage); @@ -240,6 +252,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing // Secondly, check that allocating the shard to this node doesn't put it above the high watermark final long shardSize = getExpectedShardSize(shardRouting, allocation, 0); + assert shardSize >= 0 : shardSize; double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize); long freeBytesAfterShard = freeBytes - shardSize; if (freeBytesAfterShard < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { @@ -266,6 +279,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing diskThresholdSettings.getHighWatermarkRaw(), usedDiskThresholdHigh, freeSpaceAfterShard); } + assert freeBytesAfterShard >= 0 : freeBytesAfterShard; return allocation.decision(Decision.YES, NAME, "enough disk for shard on node, free: [%s], shard size: [%s], free after allocating shard: [%s]", freeBytesValue, @@ -287,7 +301,7 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl // subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk // since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check. - final DiskUsage usage = getDiskUsage(node, allocation, usages, true); + final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, true); final String dataPath = clusterInfo.getDataPath(shardRouting); // If this node is already above the high threshold, the shard cannot remain (get it off!) final double freeDiskPercentage = usage.getFreeDiskAsPercentage(); @@ -299,6 +313,16 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl return allocation.decision(Decision.YES, NAME, "this shard is not allocated on the most utilized disk and can remain"); } + if (freeBytes < 0L) { + final long sizeOfRelocatingShards = sizeOfRelocatingShards(node, allocation, true, usage.getPath()); + logger.debug("fewer free bytes remaining than the size of all incoming shards: " + + "usage {} on node {} including {} bytes of relocations, shard cannot remain", + usage, node.nodeId(), sizeOfRelocatingShards); + return allocation.decision(Decision.NO, NAME, + "the shard cannot remain on this node because the node has fewer free bytes remaining than the total size of all " + + "incoming shards: free space [%s], relocating shards [%s]", + freeBytes + sizeOfRelocatingShards, sizeOfRelocatingShards); + } if (freeBytes < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { if (logger.isDebugEnabled()) { logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain", @@ -328,8 +352,8 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl "there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes)); } - private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, - ImmutableOpenMap usages, boolean subtractLeavingShards) { + private DiskUsageWithRelocations getDiskUsage(RoutingNode node, RoutingAllocation allocation, + ImmutableOpenMap usages, boolean subtractLeavingShards) { DiskUsage usage = usages.get(node.nodeId()); if (usage == null) { // If there is no usage, and we have other nodes in the cluster, @@ -340,18 +364,14 @@ private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeDiskAsPercentage()); } } - - if (diskThresholdSettings.includeRelocations()) { - long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath()); - DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(), - usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); - if (logger.isTraceEnabled()) { - logger.trace("usage without relocations: {}", usage); - logger.trace("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations); - } - usage = usageIncludingRelocations; + final DiskUsageWithRelocations diskUsageWithRelocations = new DiskUsageWithRelocations(usage, + diskThresholdSettings.includeRelocations() + ? sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath()) : 0); + if (logger.isTraceEnabled()) { + logger.trace("getDiskUsage(subtractLeavingShards={}) returning {}", subtractLeavingShards, diskUsageWithRelocations); } - return usage; + + return diskUsageWithRelocations; } /** @@ -381,7 +401,7 @@ DiskUsage averageUsage(RoutingNode node, ImmutableOpenMap usa * @param shardSize Size in bytes of the shard * @return Percentage of free space after the shard is assigned to the node */ - double freeDiskPercentageAfterShardAssigned(DiskUsage usage, Long shardSize) { + double freeDiskPercentageAfterShardAssigned(DiskUsageWithRelocations usage, Long shardSize) { shardSize = (shardSize == null) ? 0 : shardSize; DiskUsage newUsage = new DiskUsage(usage.getNodeId(), usage.getNodeName(), usage.getPath(), usage.getTotalBytes(), usage.getFreeBytes() - shardSize); @@ -450,4 +470,59 @@ public static long getExpectedShardSize(ShardRouting shard, RoutingAllocation al } } + + static class DiskUsageWithRelocations { + + private final DiskUsage diskUsage; + private final long relocatingShardSize; + + DiskUsageWithRelocations(DiskUsage diskUsage, long relocatingShardSize) { + this.diskUsage = diskUsage; + this.relocatingShardSize = relocatingShardSize; + } + + @Override + public String toString() { + return "DiskUsageWithRelocations{" + + "diskUsage=" + diskUsage + + ", relocatingShardSize=" + relocatingShardSize + + '}'; + } + + double getFreeDiskAsPercentage() { + if (getTotalBytes() == 0L) { + return 100.0; + } + return 100.0 * ((double)getFreeBytes() / getTotalBytes()); + } + + double getUsedDiskAsPercentage() { + return 100.0 - getFreeDiskAsPercentage(); + } + + long getFreeBytes() { + try { + return Math.subtractExact(diskUsage.getFreeBytes(), relocatingShardSize); + } catch (ArithmeticException e) { + return Long.MAX_VALUE; + } + } + + String getPath() { + return diskUsage.getPath(); + } + + String getNodeId() { + return diskUsage.getNodeId(); + } + + String getNodeName() { + return diskUsage.getNodeName(); + } + + long getTotalBytes() { + return diskUsage.getTotalBytes(); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index eac201b4f33e8..2c86f26d7104f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -55,6 +55,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; @@ -62,6 +63,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; +import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -645,7 +647,8 @@ public void testFreeDiskPercentageAfterShardAssigned() { usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 50)); // 50% used usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used - Double after = decider.freeDiskPercentageAfterShardAssigned(new DiskUsage("node2", "n2", "/dev/null", 100, 30), 11L); + Double after = decider.freeDiskPercentageAfterShardAssigned( + new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("node2", "n2", "/dev/null", 100, 30), 0L), 11L); assertThat(after, equalTo(19.0)); } @@ -671,21 +674,25 @@ public void testShardRelocationsTakenIntoAccount() { final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes); DiskThresholdDecider decider = makeDecider(diskSettings); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); AllocationDeciders deciders = new AllocationDeciders( - new HashSet<>(Arrays.asList(new SameShardAllocationDecider( - Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) - ), decider))); + new HashSet<>(Arrays.asList( + new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), + new EnableAllocationDecider( + Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none").build(), clusterSettings), + decider))); + final AtomicReference clusterInfoReference = new AtomicReference<>(clusterInfo); ClusterInfoService cis = new ClusterInfoService() { @Override public ClusterInfo getClusterInfo() { logger.info("--> calling fake getClusterInfo"); - return clusterInfo; + return clusterInfoReference.get(); } }; AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(), - new BalancedShardsAllocator(Settings.EMPTY), cis); + new BalancedShardsAllocator(Settings.EMPTY), cis); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) @@ -723,30 +730,66 @@ public ClusterInfo getClusterInfo() { .add(newNode("node3")) ).build(); - AllocationCommand relocate1 = new MoveAllocationCommand("test", 0, "node2", "node3"); - AllocationCommands cmds = new AllocationCommands(relocate1); + { + AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test", 0, "node2", "node3"); + AllocationCommands cmds = new AllocationCommands(moveAllocationCommand); - clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState(); - logShardStates(clusterState); + clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState(); + logShardStates(clusterState); + } + + final ImmutableOpenMap.Builder overfullUsagesBuilder = ImmutableOpenMap.builder(); + overfullUsagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 40)); // 60% used + overfullUsagesBuilder.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 40)); // 60% used + overfullUsagesBuilder.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used + final ImmutableOpenMap overfullUsages = overfullUsagesBuilder.build(); + + final ImmutableOpenMap.Builder largerShardSizesBuilder = ImmutableOpenMap.builder(); + largerShardSizesBuilder.put("[test][0][p]", 14L); + largerShardSizesBuilder.put("[test][0][r]", 14L); + largerShardSizesBuilder.put("[test2][0][p]", 2L); + largerShardSizesBuilder.put("[test2][0][r]", 2L); + final ImmutableOpenMap largerShardSizes = largerShardSizesBuilder.build(); + + final ClusterInfo overfullClusterInfo = new DevNullClusterInfo(overfullUsages, overfullUsages, largerShardSizes); - AllocationCommand relocate2 = new MoveAllocationCommand("test2", 0, "node2", "node3"); - cmds = new AllocationCommands(relocate2); - - try { - // The shard for the "test" index is already being relocated to - // node3, which will put it over the low watermark when it - // completes, with shard relocations taken into account this should - // throw an exception about not being able to complete - strategy.reroute(clusterState, cmds, false, false); - fail("should not have been able to reroute the shard"); - } catch (IllegalArgumentException e) { - assertThat("can't be allocated because there isn't enough room: " + e.getMessage(), - e.getMessage(), - containsString("the node is above the low watermark cluster setting " + - "[cluster.routing.allocation.disk.watermark.low=0.7], using more disk space than the maximum " + - "allowed [70.0%], actual free: [26.0%]")); + { + AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test2", 0, "node2", "node3"); + AllocationCommands cmds = new AllocationCommands(moveAllocationCommand); + + final ClusterState clusterStateThatRejectsCommands = clusterState; + + assertThat(expectThrows(IllegalArgumentException.class, + () -> strategy.reroute(clusterStateThatRejectsCommands, cmds, false, false)).getMessage(), + containsString("the node is above the low watermark cluster setting " + + "[cluster.routing.allocation.disk.watermark.low=0.7], using more disk space than the maximum " + + "allowed [70.0%], actual free: [26.0%]")); + + clusterInfoReference.set(overfullClusterInfo); + + assertThat(expectThrows(IllegalArgumentException.class, + () -> strategy.reroute(clusterStateThatRejectsCommands, cmds, false, false)).getMessage(), + containsString("the node has fewer free bytes remaining than the total size of all incoming shards")); + + clusterInfoReference.set(clusterInfo); } + { + AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test2", 0, "node2", "node3"); + AllocationCommands cmds = new AllocationCommands(moveAllocationCommand); + + logger.info("--> before starting: {}", clusterState); + clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + logger.info("--> after starting: {}", clusterState); + clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState(); + logger.info("--> after running another command: {}", clusterState); + logShardStates(clusterState); + + clusterInfoReference.set(overfullClusterInfo); + + clusterState = strategy.reroute(clusterState, "foo"); + logger.info("--> after another reroute: {}", clusterState); + } } public void testCanRemainWithShardRelocatingAway() { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 10f3d1f2e0d67..ad28204fe7502 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -54,6 +54,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; /** * Unit tests for the DiskThresholdDecider @@ -434,4 +435,33 @@ public void testSizeShrinkIndex() { assertEquals(42L, DiskThresholdDecider.getExpectedShardSize(target2, allocationWithMissingSourceIndex, 42L)); } + public void testDiskUsageWithRelocations() { + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 1000L), 0).getFreeBytes(), + equalTo(1000L)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 1000L), 9).getFreeBytes(), + equalTo(991L)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 1000L), -9).getFreeBytes(), + equalTo(1009L)); + + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 1000L), 0) + .getFreeDiskAsPercentage(), equalTo(100.0)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 500L), 0) + .getFreeDiskAsPercentage(), equalTo(50.0)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 500L), 100) + .getFreeDiskAsPercentage(), equalTo(40.0)); + + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 1000L), 0) + .getUsedDiskAsPercentage(), equalTo(0.0)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 500L), 0) + .getUsedDiskAsPercentage(), equalTo(50.0)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 500L), 100) + .getUsedDiskAsPercentage(), equalTo(60.0)); + + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations( + new DiskUsage("n", "n", "/dev/null", Long.MAX_VALUE, Long.MAX_VALUE), 0).getFreeBytes(), equalTo(Long.MAX_VALUE)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations( + new DiskUsage("n", "n", "/dev/null", Long.MAX_VALUE, Long.MAX_VALUE), 10).getFreeBytes(), equalTo(Long.MAX_VALUE - 10)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations( + new DiskUsage("n", "n", "/dev/null", Long.MAX_VALUE, Long.MAX_VALUE), -10).getFreeBytes(), equalTo(Long.MAX_VALUE)); + } }